コンテンツにスキップ

pytilpack.sqlalchemy

必要なextra

pip install pytilpack[sqlalchemy]

pytilpack.sqlalchemy

SQLAlchemy用のユーティリティ集。

AsyncMixin

Bases: AsyncAttrs, _ReprMixin

モデルのベースクラス。SQLAlchemy 2.0スタイル・async前提。

例:

モデル定義例::

class Base(sqlalchemy.orm.DeclarativeBase, pytilpack.sqlalchemy_.AsyncMixin):
    pass

class User(Base):
    __tablename__ = "users"
    ...

Quart例::

@app.before_request
async def _before_request() -> None:
    quart.g.db_session_token = await models.Base.start_session()

@app.teardown_request
async def _teardown_request(_: BaseException | None) -> None:
    if hasattr(quart.g, "db_session_token"):
        await models.Base.close_session(quart.g.db_session_token)
        del quart.g.db_session_token

session_var = contextvars.ContextVar('session_var') class-attribute instance-attribute

セッション。

init(url, pool_size=None, max_overflow=None, pool_recycle=280, pool_pre_ping=True, autoflush=True, expire_on_commit=False, eagerly_init=True, **kwargs) classmethod

DB接続を初期化する。(推奨される既定設定を適用する。)

engineとsessionmakerはスレッドローカルで遅延初期化される。 各スレッドが初めてDB操作を行う際にそのスレッド専用のengineとsessionmakerが生成される。

引数:

名前 タイプ デスクリプション デフォルト
url str | URL

DB接続URL。

必須
pool_size int | None

コネクションプールのサイズ。スレッド数に応じて調整要。負数の場合はプーリングを無効化する。

None
max_overflow int | None

コネクションプールの最大オーバーフロー数。Noneの場合はデフォルト値を使用。

None
pool_recycle int | None

コネクションプールのリサイクル時間。Noneの場合はデフォルト値を使用。

280
pool_pre_ping bool

コネクションプールのプレピン。Noneの場合はデフォルト値を使用。

True
autoflush bool

セッションのautoflushフラグ。デフォルトはTrue。

True
expire_on_commit bool

セッションのexpire_on_commitフラグ。デフォルトはFalse。

False
eagerly_init bool

Trueの場合(デフォルト)、呼び出し元スレッドのengineとsessionmakerをすぐに生成する。

True
**kwargs Any

その他のsqlalchemy.create_async_engine()へのキーワード引数。

{}
ソースコード位置: pytilpack/sqlalchemy/async_.py
@classmethod
def init(
    cls,
    url: str | sqlalchemy.engine.URL,
    pool_size: int | None = None,
    max_overflow: int | None = None,
    pool_recycle: int | None = 280,
    pool_pre_ping: bool = True,
    autoflush: bool = True,
    expire_on_commit: bool = False,
    eagerly_init: bool = True,
    **kwargs: typing.Any,
) -> None:
    """DB接続を初期化する。(推奨される既定設定を適用する。)

    engineとsessionmakerはスレッドローカルで遅延初期化される。
    各スレッドが初めてDB操作を行う際にそのスレッド専用のengineとsessionmakerが生成される。

    Args:
        url: DB接続URL。
        pool_size: コネクションプールのサイズ。スレッド数に応じて調整要。負数の場合はプーリングを無効化する。
        max_overflow: コネクションプールの最大オーバーフロー数。Noneの場合はデフォルト値を使用。
        pool_recycle: コネクションプールのリサイクル時間。Noneの場合はデフォルト値を使用。
        pool_pre_ping: コネクションプールのプレピン。Noneの場合はデフォルト値を使用。
        autoflush: セッションのautoflushフラグ。デフォルトはTrue。
        expire_on_commit: セッションのexpire_on_commitフラグ。デフォルトはFalse。
        eagerly_init: Trueの場合(デフォルト)、呼び出し元スレッドのengineとsessionmakerをすぐに生成する。
        **kwargs: その他のsqlalchemy.create_async_engine()へのキーワード引数。

    """
    assert cls._init_args is None, "DB接続はすでに初期化されています。"

    if pool_size is not None and pool_size < 0:
        engine_kwargs = kwargs.copy()
        engine_kwargs["poolclass"] = sqlalchemy.pool.NullPool
    else:
        if pool_size is not None and max_overflow is None:
            max_overflow = pool_size * 1  # デフォルトで倍まで許可
        engine_kwargs = kwargs.copy()
        if pool_size is not None:
            engine_kwargs["pool_size"] = pool_size
        if max_overflow is not None:
            engine_kwargs["max_overflow"] = max_overflow
    if pool_recycle is not None:
        engine_kwargs["pool_recycle"] = pool_recycle
    if pool_pre_ping is not None:
        engine_kwargs["pool_pre_ping"] = pool_pre_ping

    cls._init_args = _InitArgs(
        url=url,
        autoflush=autoflush,
        expire_on_commit=expire_on_commit,
        engine_kwargs=engine_kwargs,
    )
    cls._thread_local = threading.local()

    if eagerly_init:
        cls._get_engine_and_sessionmaker()

term() async classmethod

現在のスレッドのDB接続を終了する。

スレッドローカルなengineをdisposeして解放する。 スレッド終了時やテスト後の後始末に使用する。

ソースコード位置: pytilpack/sqlalchemy/async_.py
@classmethod
async def term(cls) -> None:
    """現在のスレッドのDB接続を終了する。

    スレッドローカルなengineをdisposeして解放する。
    スレッド終了時やテスト後の後始末に使用する。

    """
    if cls._init_args is None:
        logger.warning("未初期化時のterm()呼び出し")
        return
    if cls._thread_local is None:
        return
    if hasattr(cls._thread_local, "engine") and cls._thread_local.engine is not None:
        await cls._thread_local.engine.dispose()
        cls._thread_local.engine = None
        cls._thread_local.sessionmaker = None

engine() classmethod

現在のスレッドのDB接続を返す。init()後に初めて呼ばれた時点で遅延生成される。

ソースコード位置: pytilpack/sqlalchemy/async_.py
@classmethod
def engine(cls) -> sqlalchemy.ext.asyncio.AsyncEngine:
    """現在のスレッドのDB接続を返す。init()後に初めて呼ばれた時点で遅延生成される。"""
    return cls._get_engine_and_sessionmaker()[0]

sessionmaker() classmethod

現在のスレッドのセッションファクトリを返す。init()後に初めて呼ばれた時点で遅延生成される。

ソースコード位置: pytilpack/sqlalchemy/async_.py
@classmethod
def sessionmaker(
    cls,
) -> sqlalchemy.ext.asyncio.async_sessionmaker[sqlalchemy.ext.asyncio.AsyncSession]:
    """現在のスレッドのセッションファクトリを返す。init()後に初めて呼ばれた時点で遅延生成される。"""
    return cls._get_engine_and_sessionmaker()[1]

connect() classmethod

DBに接続する。

使用例:: async with Base.connect() as conn: await conn.run_sync(Base.metadata.create_all)

ソースコード位置: pytilpack/sqlalchemy/async_.py
@classmethod
def connect(cls) -> sqlalchemy.ext.asyncio.AsyncConnection:
    """DBに接続する。

    使用例::
        async with Base.connect() as conn:
            await conn.run_sync(Base.metadata.create_all)

    """
    return cls.engine().connect()

session_scope(name=None, log_level=logging.DEBUG) async classmethod

セッションを開始するコンテキストマネージャ。

使用例:: async with Base.session_scope() as session: ...

引数:

名前 タイプ デスクリプション デフォルト
name str | None

セッション名。指定時のみログ出力する。

None
log_level int

ログレベル。

DEBUG
ソースコード位置: pytilpack/sqlalchemy/async_.py
@classmethod
@contextlib.asynccontextmanager
async def session_scope(
    cls,
    name: str | None = None,
    log_level: int = logging.DEBUG,
):
    """セッションを開始するコンテキストマネージャ。

    使用例::
        async with Base.session_scope() as session:
            ...

    Args:
        name: セッション名。指定時のみログ出力する。
        log_level: ログレベル。

    """
    token = await cls.start_session(name=name, log_level=log_level)
    try:
        yield cls.session()
    finally:
        await cls.close_session(token, name=name, log_level=log_level)

start_session(name=None, log_level=logging.DEBUG) async classmethod

セッションを開始する。

ソースコード位置: pytilpack/sqlalchemy/async_.py
@classmethod
async def start_session(
    cls, name: str | None = None, log_level: int = logging.DEBUG
) -> contextvars.Token[sqlalchemy.ext.asyncio.AsyncSession]:
    """セッションを開始する。"""
    session = cls.sessionmaker()()  # pylint: disable=not-callable
    token = cls.session_var.set(session)
    if name is not None:
        logger.log(
            log_level,
            f"セッション開始: {name} session={id(session):x},"
            f" thread={threading.get_ident():x},"
            f" task={pytilpack.asyncio.get_task_id_hex()}",
        )
    return token

close_session(token, name=None, log_level=logging.DEBUG) async classmethod

セッションを終了する。

ソースコード位置: pytilpack/sqlalchemy/async_.py
@classmethod
async def close_session(
    cls,
    token: contextvars.Token[sqlalchemy.ext.asyncio.AsyncSession],
    name: str | None = None,
    log_level: int = logging.DEBUG,
) -> None:
    """セッションを終了する。"""
    session = cls.session()
    if name is not None:
        logger.log(
            log_level,
            f"セッション終了: {name} session={id(session):x},"
            f" thread={threading.get_ident():x},"
            f" task={pytilpack.asyncio.get_task_id_hex()}",
        )
    await asafe_close(session)
    cls.session_var.reset(token)

session() classmethod

セッションを取得する。

ソースコード位置: pytilpack/sqlalchemy/async_.py
@classmethod
def session(cls) -> sqlalchemy.ext.asyncio.AsyncSession:
    """セッションを取得する。"""
    sess = cls.session_var.get(None)
    if sess is None:
        raise RuntimeError(f"セッションが開始されていません。{cls.__qualname__}.start_session()を呼び出してください。")
    return sess

select() classmethod

sqlalchemy.Selectを返す。

ソースコード位置: pytilpack/sqlalchemy/async_.py
@classmethod
def select(cls) -> sqlalchemy.Select[tuple[typing.Self]]:
    """sqlalchemy.Selectを返す。"""
    # cls.count()などでfrom句が消えないように明示的にfrom句を指定して返す。
    return sqlalchemy.select(cls).select_from(cls)

insert() classmethod

sqlalchemy.Insertを返す。

ソースコード位置: pytilpack/sqlalchemy/async_.py
@classmethod
def insert(cls) -> sqlalchemy.Insert:
    """sqlalchemy.Insertを返す。"""
    return sqlalchemy.insert(cls)

update() classmethod

sqlalchemy.Updateを返す。

ソースコード位置: pytilpack/sqlalchemy/async_.py
@classmethod
def update(cls) -> sqlalchemy.Update:
    """sqlalchemy.Updateを返す。"""
    return sqlalchemy.update(cls)

delete() classmethod

sqlalchemy.Deleteを返す。

ソースコード位置: pytilpack/sqlalchemy/async_.py
@classmethod
def delete(cls) -> sqlalchemy.Delete:
    """sqlalchemy.Deleteを返す。"""
    return sqlalchemy.delete(cls)

count(query) async classmethod

queryのレコード数を取得する。

ソースコード位置: pytilpack/sqlalchemy/async_.py
@classmethod
async def count(cls, query: sqlalchemy.Select | sqlalchemy.CompoundSelect) -> int:
    """queryのレコード数を取得する。"""
    # pylint: disable=not-callable
    return (
        await cls.scalar_one_or_none(
            sqlalchemy.select(sqlalchemy.func.count()).select_from(query.order_by(None).subquery())
        )
        or 0
    )

scalar_one(query) async classmethod

queryの結果を1件取得する。

引数:

名前 タイプ デスクリプション デフォルト
query Select[tuple[T]] | CompoundSelect[tuple[T]]

クエリ。

必須

戻り値:

タイプ デスクリプション
T

1件のインスタンス。

発生:

タイプ デスクリプション
NoResultFound

結果が0件の場合。

MultipleResultsFound

結果が複数件の場合。

ソースコード位置: pytilpack/sqlalchemy/async_.py
@classmethod
async def scalar_one[T](cls, query: sqlalchemy.Select[tuple[T]] | sqlalchemy.CompoundSelect[tuple[T]]) -> T:
    """queryの結果を1件取得する。

    Args:
        query: クエリ。

    Returns:
        1件のインスタンス。

    Raises:
        sqlalchemy.exc.NoResultFound: 結果が0件の場合。
        sqlalchemy.exc.MultipleResultsFound: 結果が複数件の場合。

    """
    return (await cls.session().execute(query)).scalar_one()

scalar_one_or_none(query) async classmethod

queryの結果を0件または1件取得する。

引数:

名前 タイプ デスクリプション デフォルト
query Select[tuple[T]] | CompoundSelect[tuple[T]]

クエリ。

必須

戻り値:

タイプ デスクリプション
T | None

0件の場合はNone、1件の場合はインスタンス。

発生:

タイプ デスクリプション
MultipleResultsFound

結果が複数件の場合。

ソースコード位置: pytilpack/sqlalchemy/async_.py
@classmethod
async def scalar_one_or_none[T](cls, query: sqlalchemy.Select[tuple[T]] | sqlalchemy.CompoundSelect[tuple[T]]) -> T | None:
    """queryの結果を0件または1件取得する。

    Args:
        query: クエリ。

    Returns:
        0件の場合はNone、1件の場合はインスタンス。

    Raises:
        sqlalchemy.exc.MultipleResultsFound: 結果が複数件の場合。

    """
    return (await cls.session().execute(query)).scalar_one_or_none()

scalars(query) async classmethod

queryの結果を全件取得する。

引数:

名前 タイプ デスクリプション デフォルト
query Select[tuple[T]] | CompoundSelect[tuple[T]]

クエリ。

必須

戻り値:

タイプ デスクリプション
list[T]

全件のインスタンスのリスト。

ソースコード位置: pytilpack/sqlalchemy/async_.py
@classmethod
async def scalars[T](cls, query: sqlalchemy.Select[tuple[T]] | sqlalchemy.CompoundSelect[tuple[T]]) -> list[T]:
    """queryの結果を全件取得する。

    Args:
        query: クエリ。

    Returns:
        全件のインスタンスのリスト。

    """
    return list((await cls.session().execute(query)).scalars().all())

one(query) async classmethod

queryの結果を1件取得する。

引数:

名前 タイプ デスクリプション デフォルト
query Select[TT] | CompoundSelect[TT]

クエリ。

必須

戻り値:

タイプ デスクリプション
Row[TT]

1件のインスタンス。

発生:

タイプ デスクリプション
NoResultFound

結果が0件の場合。

MultipleResultsFound

結果が複数件の場合。

ソースコード位置: pytilpack/sqlalchemy/async_.py
@classmethod
async def one[TT: tuple](cls, query: sqlalchemy.Select[TT] | sqlalchemy.CompoundSelect[TT]) -> sqlalchemy.Row[TT]:
    """queryの結果を1件取得する。

    Args:
        query: クエリ。

    Returns:
        1件のインスタンス。

    Raises:
        sqlalchemy.exc.NoResultFound: 結果が0件の場合。
        sqlalchemy.exc.MultipleResultsFound: 結果が複数件の場合。

    """
    return (await cls.session().execute(query)).one()

one_or_none(query) async classmethod

queryの結果を0件または1件取得する。

引数:

名前 タイプ デスクリプション デフォルト
query Select[TT] | CompoundSelect[TT]

クエリ。

必須

戻り値:

タイプ デスクリプション
Row[TT] | None

0件の場合はNone、1件の場合はインスタンス。

発生:

タイプ デスクリプション
MultipleResultsFound

結果が複数件の場合。

ソースコード位置: pytilpack/sqlalchemy/async_.py
@classmethod
async def one_or_none[TT: tuple](
    cls, query: sqlalchemy.Select[TT] | sqlalchemy.CompoundSelect[TT]
) -> sqlalchemy.Row[TT] | None:
    """queryの結果を0件または1件取得する。

    Args:
        query: クエリ。

    Returns:
        0件の場合はNone、1件の場合はインスタンス。

    Raises:
        sqlalchemy.exc.MultipleResultsFound: 結果が複数件の場合。

    """
    return (await cls.session().execute(query)).one_or_none()

all(query) async classmethod

queryの結果を全件取得する。

引数:

名前 タイプ デスクリプション デフォルト
query Select[TT] | CompoundSelect[TT]

クエリ。

必須

戻り値:

タイプ デスクリプション
list[Row[TT]]

全件のインスタンスのリスト。

ソースコード位置: pytilpack/sqlalchemy/async_.py
@classmethod
async def all[TT: tuple](cls, query: sqlalchemy.Select[TT] | sqlalchemy.CompoundSelect[TT]) -> list[sqlalchemy.Row[TT]]:
    """queryの結果を全件取得する。

    Args:
        query: クエリ。

    Returns:
        全件のインスタンスのリスト。

    """
    return list((await cls.session().execute(query)).all())

get_by_id_not_null(id_, for_update=False, options=None) async classmethod

IDを元にインスタンスを取得。見つからない場合は例外を出す。

引数:

名前 タイプ デスクリプション デフォルト
id_ int

ID。

必須
for_update bool

更新ロックを取得するか否か。

False
options ExecutableOption | None

クエリオプション。eager loadingなどに使用する。

None

戻り値:

タイプ デスクリプション
Self

インスタンス。

Raises: ValueError: 見つからない場合。

ソースコード位置: pytilpack/sqlalchemy/async_.py
@classmethod
async def get_by_id_not_null(
    cls, id_: int, for_update: bool = False, options: sqlalchemy.sql.base.ExecutableOption | None = None
) -> typing.Self:
    """IDを元にインスタンスを取得。見つからない場合は例外を出す。

    Args:
        id_: ID。
        for_update: 更新ロックを取得するか否か。
        options: クエリオプション。eager loadingなどに使用する。

    Returns:
        インスタンス。
    Raises:
        ValueError: 見つからない場合。
    """
    instance = await cls.get_by_id(id_, for_update=for_update, options=options)
    if instance is None:
        raise ValueError(f"{cls.__qualname__}が見つかりませんでした。id={id_}")
    return instance

get_by_id(id_, for_update=False, options=None) async classmethod

IDを元にインスタンスを取得。

引数:

名前 タイプ デスクリプション デフォルト
id_ int

ID。

必須
for_update bool

更新ロックを取得するか否か。

False
options ExecutableOption | None

クエリオプション。eager loadingなどに使用する。

None

戻り値:

タイプ デスクリプション
Self | None

インスタンス。

ソースコード位置: pytilpack/sqlalchemy/async_.py
@classmethod
async def get_by_id(
    cls, id_: int, for_update: bool = False, options: sqlalchemy.sql.base.ExecutableOption | None = None
) -> typing.Self | None:
    """IDを元にインスタンスを取得。

    Args:
        id_: ID。
        for_update: 更新ロックを取得するか否か。
        options: クエリオプション。eager loadingなどに使用する。

    Returns:
        インスタンス。

    """
    q = cls.select().where(cls.id == id_)  # type: ignore  # pylint: disable=no-member
    if options is not None:
        q = q.options(options)
    if for_update:
        q = q.with_for_update()
    return await cls.scalar_one_or_none(q)  # type: ignore[arg-type]

paginate(query, page, per_page, scalar=True) async classmethod

Flask-SQLAlchemy風ページネーション。

引数:

名前 タイプ デスクリプション デフォルト
query Select | CompoundSelect

ページネーションするクエリ。

必須
page int

ページ番号。

必須
per_page int

1ページあたりのアイテム数。

必須
scalar bool

Trueの場合、スカラー値を返す。Falseの場合、全件のインスタンスを返す。

True

戻り値:

タイプ デスクリプション
Paginator

ページネーションされた結果を返すpytilpack.paginator.Paginatorインスタンス。

ソースコード位置: pytilpack/sqlalchemy/async_.py
@classmethod
async def paginate(
    cls,
    query: sqlalchemy.Select | sqlalchemy.CompoundSelect,
    page: int,
    per_page: int,
    scalar: bool = True,
) -> pytilpack.paginator.Paginator:
    """Flask-SQLAlchemy風ページネーション。

    Args:
        query: ページネーションするクエリ。
        page: ページ番号。
        per_page: 1ページあたりのアイテム数。
        scalar: Trueの場合、スカラー値を返す。Falseの場合、全件のインスタンスを返す。

    Returns:
        ページネーションされた結果を返すpytilpack.paginator.Paginatorインスタンス。
    """
    assert page > 0, "ページ番号は1以上でなければなりません。"
    assert per_page > 0, "1ページあたりのアイテム数は1以上でなければなりません。"
    total = await cls.count(query)
    page_query = query.offset((page - 1) * per_page).limit(per_page)
    items = await (cls.scalars(page_query) if scalar else cls.all(page_query))
    # pylint: disable=protected-access
    return pytilpack.paginator.Paginator(page=page, per_page=per_page, items=items, total=total)

to_dict(includes=None, excludes=None, exclude_none=False, value_converter=None, datetime_to_iso=True)

インスタンスを辞書化する。

引数:

名前 タイプ デスクリプション デフォルト
includes list[str] | None

辞書化するフィールド名のリスト。excludesと同時指定不可。

None
excludes list[str] | None

辞書化しないフィールド名のリスト。includesと同時指定不可。

None
exclude_none bool

Noneのフィールドを除外するかどうか。

False
value_converter Callable[[Any], Any] | None

各フィールドの値を変換する関数。引数は値、戻り値は変換後の値。

None
datetime_to_iso bool

datetime型の値をISOフォーマットの文字列に変換するかどうか。

True

戻り値:

タイプ デスクリプション
dict[str, Any]

辞書。

ソースコード位置: pytilpack/sqlalchemy/async_.py
def to_dict(
    self,
    includes: list[str] | None = None,
    excludes: list[str] | None = None,
    exclude_none: bool = False,
    value_converter: typing.Callable[[typing.Any], typing.Any] | None = None,
    datetime_to_iso: bool = True,
) -> dict[str, typing.Any]:
    """インスタンスを辞書化する。

    Args:
        includes: 辞書化するフィールド名のリスト。excludesと同時指定不可。
        excludes: 辞書化しないフィールド名のリスト。includesと同時指定不可。
        exclude_none: Noneのフィールドを除外するかどうか。
        value_converter: 各フィールドの値を変換する関数。引数は値、戻り値は変換後の値。
        datetime_to_iso: datetime型の値をISOフォーマットの文字列に変換するかどうか。

    Returns:
        辞書。

    """
    assert (includes is None) or (excludes is None)
    mapper = sqlalchemy.inspect(self.__class__, raiseerr=True)
    assert mapper is not None
    all_columns = [
        mapper.get_property_by_column(column).key
        for column in self.__table__.columns  # type: ignore[attr-defined]
    ]
    if includes is None:
        includes = all_columns
        if excludes is None:
            pass
        else:
            assert (set(all_columns) & set(excludes)) == set(excludes)
            includes = list(filter(lambda x: x not in excludes, includes))
    else:
        assert excludes is None
        assert (set(all_columns) & set(includes)) == set(includes)

    def convert_value(value: typing.Any) -> typing.Any:
        """値を変換する関数。"""
        if datetime_to_iso and isinstance(value, datetime.datetime | datetime.date):
            return value.isoformat()
        if value_converter is not None:
            return value_converter(value)
        return value

    return {
        column_name: convert_value(getattr(self, column_name))
        for column_name in includes
        if not exclude_none or getattr(self, column_name) is not None
    }

run_with_session(func, *args, **kwargs) classmethod

非同期関数をセッション付きで同期実行する関数。

引数:

名前 タイプ デスクリプション デフォルト
func Callable[P, Awaitable[R]]

デコレート対象の非同期関数

必須
*args P.args

非同期関数への引数

()
**kwargs P.kwargs

非同期関数へのキーワード引数

{}

戻り値:

タイプ デスクリプション
R

非同期関数の戻り値

ソースコード位置: pytilpack/sqlalchemy/async_.py
@classmethod
def run_with_session[**P, R](cls, func: typing.Callable[P, typing.Awaitable[R]], *args: P.args, **kwargs: P.kwargs) -> R:
    """非同期関数をセッション付きで同期実行する関数。

    Args:
        func: デコレート対象の非同期関数
        *args: 非同期関数への引数
        **kwargs: 非同期関数へのキーワード引数

    Returns:
        非同期関数の戻り値
    """

    async def wrapper() -> R:
        async with cls.session_scope():
            return await func(*args, **kwargs)

    return pytilpack.asyncio.run(wrapper())

AsyncUniqueIDMixin

self.unique_idを持つテーブルクラスに便利メソッドを生やすmixin。

generate_unique_id() classmethod

ユニークIDを生成する。

ソースコード位置: pytilpack/sqlalchemy/async_.py
@classmethod
def generate_unique_id(cls) -> str:
    """ユニークIDを生成する。"""
    return secrets.token_urlsafe(32)

get_by_unique_id(unique_id, allow_id=False, for_update=False) async classmethod

ユニークIDを元にインスタンスを取得。

引数:

名前 タイプ デスクリプション デフォルト
unique_id str | int

ユニークID。

必須
allow_id bool

ユニークIDだけでなくID(int)も許可するかどうか。

False
for_update bool

更新ロックを取得するか否か。

False

戻り値:

タイプ デスクリプション
Self | None

インスタンス。

ソースコード位置: pytilpack/sqlalchemy/async_.py
@classmethod
async def get_by_unique_id(
    cls: type[typing.Self],
    unique_id: str | int,
    allow_id: bool = False,
    for_update: bool = False,
) -> typing.Self | None:
    """ユニークIDを元にインスタンスを取得。

    Args:
        unique_id: ユニークID。
        allow_id: ユニークIDだけでなくID(int)も許可するかどうか。
        for_update: 更新ロックを取得するか否か。

    Returns:
        インスタンス。

    """
    assert issubclass(cls, AsyncMixin)
    if allow_id and isinstance(unique_id, int):
        q = cls.select().where(cls.id == unique_id)  # type: ignore
    else:
        q = cls.select().where(cls.unique_id == unique_id)  # type: ignore
    if for_update:
        q = q.with_for_update()
    return await cls.scalar_one_or_none(q)

Mixin

Bases: _ReprMixin

テーブルクラスに色々便利機能を生やすMixin。

get_by_id_not_null(id_, for_update=False, options=None) async classmethod

IDを元にインスタンスを取得。見つからない場合は例外を出す。

引数:

名前 タイプ デスクリプション デフォルト
id_ int

ID。

必須
for_update bool

更新ロックを取得するか否か。

False
options ExecutableOption | None

クエリオプション。eager loadingなどに使用する。

None

戻り値:

タイプ デスクリプション
Self

インスタンス。

Raises: ValueError: 見つからない場合。

ソースコード位置: pytilpack/sqlalchemy/flask.py
@classmethod
async def get_by_id_not_null(
    cls, id_: int, for_update: bool = False, options: sqlalchemy.sql.base.ExecutableOption | None = None
) -> typing.Self:
    """IDを元にインスタンスを取得。見つからない場合は例外を出す。

    Args:
        id_: ID。
        for_update: 更新ロックを取得するか否か。
        options: クエリオプション。eager loadingなどに使用する。

    Returns:
        インスタンス。
    Raises:
        ValueError: 見つからない場合。
    """
    instance = cls.get_by_id(id_, for_update=for_update, options=options)
    if instance is None:
        raise ValueError(f"{cls.__qualname__}が見つかりませんでした。id={id_}")
    return instance

get_by_id(id_, for_update=False, options=None) classmethod

IDを元にインスタンスを取得。

引数:

名前 タイプ デスクリプション デフォルト
id_ int

ID。

必須
for_update bool

更新ロックを取得するか否か。

False
options ExecutableOption | None

クエリオプション。eager loadingなどに使用。

None

戻り値:

タイプ デスクリプション
Self | None

インスタンス。

ソースコード位置: pytilpack/sqlalchemy/flask.py
@classmethod
def get_by_id(
    cls: type[typing.Self], id_: int, for_update: bool = False, options: sqlalchemy.sql.base.ExecutableOption | None = None
) -> typing.Self | None:
    """IDを元にインスタンスを取得。

    Args:
        id_: ID。
        for_update: 更新ロックを取得するか否か。
        options: クエリオプション。eager loadingなどに使用。

    Returns:
        インスタンス。

    """
    q = cls.query.filter(cls.id == id_)  # type: ignore
    if options is not None:
        q = q.options(options)
    if for_update:
        q = q.with_for_update()
    return q.one_or_none()

to_dict(includes=None, excludes=None, exclude_none=False, value_converter=None, datetime_to_iso=True)

インスタンスを辞書化する。

引数:

名前 タイプ デスクリプション デフォルト
includes list[str] | None

辞書化するフィールド名のリスト。excludesと同時指定不可。

None
excludes list[str] | None

辞書化しないフィールド名のリスト。includesと同時指定不可。

None
exclude_none bool

Noneのフィールドを除外するかどうか。

False
value_converter Callable[[Any], Any] | None

各フィールドの値を変換する関数。引数は値、戻り値は変換後の値。

None
datetime_to_iso bool

datetime型の値をISOフォーマットの文字列に変換するかどうか。

True

戻り値:

タイプ デスクリプション
dict[str, Any]

辞書。

ソースコード位置: pytilpack/sqlalchemy/flask.py
def to_dict(
    self,
    includes: list[str] | None = None,
    excludes: list[str] | None = None,
    exclude_none: bool = False,
    value_converter: typing.Callable[[typing.Any], typing.Any] | None = None,
    datetime_to_iso: bool = True,
) -> dict[str, typing.Any]:
    """インスタンスを辞書化する。

    Args:
        includes: 辞書化するフィールド名のリスト。excludesと同時指定不可。
        excludes: 辞書化しないフィールド名のリスト。includesと同時指定不可。
        exclude_none: Noneのフィールドを除外するかどうか。
        value_converter: 各フィールドの値を変換する関数。引数は値、戻り値は変換後の値。
        datetime_to_iso: datetime型の値をISOフォーマットの文字列に変換するかどうか。

    Returns:
        辞書。

    """
    assert (includes is None) or (excludes is None)
    mapper = sqlalchemy.inspect(self.__class__, raiseerr=True)
    assert mapper is not None
    all_columns = [
        mapper.get_property_by_column(column).key
        for column in self.__table__.columns  # type: ignore[attr-defined]
    ]
    if includes is None:
        includes = all_columns
        if excludes is None:
            pass
        else:
            assert (set(all_columns) & set(excludes)) == set(excludes)
            includes = list(filter(lambda x: x not in excludes, includes))
    else:
        assert excludes is None
        assert (set(all_columns) & set(includes)) == set(includes)

    def convert_value(value: typing.Any) -> typing.Any:
        """値を変換する関数。"""
        if datetime_to_iso and isinstance(value, datetime.datetime | datetime.date):
            return value.isoformat()
        if value_converter is not None:
            return value_converter(value)
        return value

    return {
        column_name: convert_value(getattr(self, column_name))
        for column_name in includes
        if not exclude_none or getattr(self, column_name) is not None
    }

UniqueIDMixin

self.unique_idを持つテーブルクラスに便利メソッドを生やすmixin。

generate_unique_id() classmethod

ユニークIDを生成する。

ソースコード位置: pytilpack/sqlalchemy/flask.py
@classmethod
def generate_unique_id(cls) -> str:
    """ユニークIDを生成する。"""
    return secrets.token_urlsafe(32)

get_by_unique_id(unique_id, allow_id=False, for_update=False) classmethod

ユニークIDを元にインスタンスを取得。

引数:

名前 タイプ デスクリプション デフォルト
unique_id str | int

ユニークID。

必須
allow_id bool

ユニークIDだけでなくID(int)も許可するかどうか。

False
for_update bool

更新ロックを取得するか否か。

False

戻り値:

タイプ デスクリプション
Self | None

インスタンス。

ソースコード位置: pytilpack/sqlalchemy/flask.py
@classmethod
def get_by_unique_id(
    cls: type[typing.Self],
    unique_id: str | int,
    allow_id: bool = False,
    for_update: bool = False,
) -> typing.Self | None:
    """ユニークIDを元にインスタンスを取得。

    Args:
        unique_id: ユニークID。
        allow_id: ユニークIDだけでなくID(int)も許可するかどうか。
        for_update: 更新ロックを取得するか否か。

    Returns:
        インスタンス。

    """
    if allow_id and isinstance(unique_id, int):
        q = cls.query.filter(cls.id == unique_id)  # type: ignore
    else:
        q = cls.query.filter(cls.unique_id == unique_id)  # type: ignore
    if for_update:
        q = q.with_for_update()
    return q.one_or_none()

SyncMixin

Bases: _ReprMixin

モデルのベースクラス。SQLAlchemy 2.0スタイル・同期前提。

例:

モデル定義例::

class Base(sqlalchemy.orm.DeclarativeBase, pytilpack.sqlalchemy_.sync_.SyncMixin):
    pass

class User(Base):
    __tablename__ = "users"
    ...

Quart例::

@app.before_request
async def _before_request() -> None:
    quart.g.db_session_token = models.Base.start_session()

@app.teardown_request
async def _teardown_request(_: BaseException | None) -> None:
    if hasattr(quart.g, "db_session_token"):
        models.Base.close_session(quart.g.db_session_token)
        del quart.g.db_session_token

engine = None class-attribute instance-attribute

DB接続。

sessionmaker = None class-attribute instance-attribute

セッションファクトリ。

session_var = contextvars.ContextVar('session_var') class-attribute instance-attribute

セッション。

init(url, pool_size=None, max_overflow=None, pool_recycle=280, pool_pre_ping=True, autoflush=True, expire_on_commit=False, **kwargs) classmethod

DB接続を初期化する。(推奨される既定設定を適用する。)

引数:

名前 タイプ デスクリプション デフォルト
url str | URL

DB接続URL。

必須
pool_size int | None

コネクションプールのサイズ。スレッド数に応じて調整要。

None
max_overflow int | None

コネクションプールの最大オーバーフロー数。Noneの場合はデフォルト値を使用。

None
pool_recycle int | None

コネクションプールのリサイクル時間。Noneの場合はデフォルト値を使用。

280
pool_pre_ping bool

コネクションプールのプレピン。Noneの場合はデフォルト値を使用。

True
autoflush bool

セッションのautoflushフラグ。デフォルトはTrue。

True
expire_on_commit bool

セッションのexpire_on_commitフラグ。デフォルトはFalse。

False
**kwargs Any

sqlalchemy.create_engineに渡す追加のキーワード引数。

{}
ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
def init(
    cls,
    url: str | sqlalchemy.engine.URL,
    pool_size: int | None = None,
    max_overflow: int | None = None,
    pool_recycle: int | None = 280,
    pool_pre_ping: bool = True,
    autoflush: bool = True,
    expire_on_commit: bool = False,
    **kwargs: typing.Any,
) -> None:
    """DB接続を初期化する。(推奨される既定設定を適用する。)

    Args:
        url: DB接続URL。
        pool_size: コネクションプールのサイズ。スレッド数に応じて調整要。
        max_overflow: コネクションプールの最大オーバーフロー数。Noneの場合はデフォルト値を使用。
        pool_recycle: コネクションプールのリサイクル時間。Noneの場合はデフォルト値を使用。
        pool_pre_ping: コネクションプールのプレピン。Noneの場合はデフォルト値を使用。
        autoflush: セッションのautoflushフラグ。デフォルトはTrue。
        expire_on_commit: セッションのexpire_on_commitフラグ。デフォルトはFalse。
        **kwargs: sqlalchemy.create_engineに渡す追加のキーワード引数。

    """
    assert cls.engine is None, "DB接続はすでに初期化されています。"

    if pool_size is not None and max_overflow is None:
        max_overflow = pool_size * 1  # デフォルトで倍まで許可
    kwargs = kwargs.copy()
    if pool_size is not None:
        kwargs["pool_size"] = pool_size
    if max_overflow is not None:
        kwargs["max_overflow"] = max_overflow
    if pool_recycle is not None:
        kwargs["pool_recycle"] = pool_recycle
    if pool_pre_ping is not None:
        kwargs["pool_pre_ping"] = pool_pre_ping

    cls.engine = sqlalchemy.create_engine(url, **kwargs)
    atexit.register(cls.engine.dispose)

    cls.sessionmaker = sqlalchemy.orm.sessionmaker(cls.engine, autoflush=autoflush, expire_on_commit=expire_on_commit)

connect() classmethod

DBに接続する。

使用例:: with Base.connect() as conn: Base.metadata.create_all(conn)

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
def connect(cls) -> sqlalchemy.Connection:
    """DBに接続する。

    使用例::
        with Base.connect() as conn:
            Base.metadata.create_all(conn)

    """
    assert cls.engine is not None
    return cls.engine.connect()

session_scope(name=None, log_level=logging.DEBUG) classmethod

セッションを開始するコンテキストマネージャ。

使用例:: with Base.session_scope() as session: ...

引数:

名前 タイプ デスクリプション デフォルト
name str | None

セッション名。指定時のみログ出力する。

None
log_level int

ログレベル。

DEBUG
ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
@contextlib.contextmanager
def session_scope(
    cls,
    name: str | None = None,
    log_level: int = logging.DEBUG,
) -> typing.Generator[sqlalchemy.orm.Session, None, None]:
    """セッションを開始するコンテキストマネージャ。

    使用例::
        with Base.session_scope() as session:
            ...

    Args:
        name: セッション名。指定時のみログ出力する。
        log_level: ログレベル。

    """
    assert cls.sessionmaker is not None
    token = cls.start_session(name=name, log_level=log_level)
    try:
        yield cls.session()
    finally:
        cls.close_session(token, name=name, log_level=log_level)

asession_scope(name=None, log_level=logging.DEBUG) async classmethod

セッションを開始するコンテキストマネージャ。

使用例:: async with Base.asession_scope() as session: ...

引数:

名前 タイプ デスクリプション デフォルト
name str | None

セッション名。指定時のみログ出力する。

None
log_level int

ログレベル。

DEBUG
ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
@contextlib.asynccontextmanager
async def asession_scope(
    cls,
    name: str | None = None,
    log_level: int = logging.DEBUG,
) -> typing.AsyncGenerator[sqlalchemy.orm.Session, None]:
    """セッションを開始するコンテキストマネージャ。

    使用例::
        async with Base.asession_scope() as session:
            ...

    Args:
        name: セッション名。指定時のみログ出力する。
        log_level: ログレベル。

    """
    assert cls.sessionmaker is not None
    token = cls.start_session(name=name, log_level=log_level)
    try:
        yield cls.session()
    finally:
        cls.close_session(token, name=name, log_level=log_level)

start_session(name=None, log_level=logging.DEBUG) classmethod

セッションを開始する。

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
def start_session(
    cls, name: str | None = None, log_level: int = logging.DEBUG
) -> contextvars.Token[sqlalchemy.orm.Session]:
    """セッションを開始する。"""
    assert cls.sessionmaker is not None
    session = cls.sessionmaker()  # pylint: disable=not-callable
    token = cls.session_var.set(session)
    if name is not None:
        logger.log(
            log_level,
            f"セッション開始: {name} session={id(session):x},"
            f" thread={threading.get_ident():x},"
            f" task={pytilpack.asyncio.get_task_id_hex()}",
        )
    return token

close_session(token, name=None, log_level=logging.DEBUG) classmethod

セッションを終了する。

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
def close_session(
    cls, token: contextvars.Token[sqlalchemy.orm.Session], name: str | None = None, log_level: int = logging.DEBUG
) -> None:
    """セッションを終了する。"""
    session = cls.session()
    if name is not None:
        logger.log(
            log_level,
            f"セッション終了: {name} session={id(session):x},"
            f" thread={threading.get_ident():x},"
            f" task={pytilpack.asyncio.get_task_id_hex()}",
        )
    safe_close(session)
    cls.session_var.reset(token)

session() classmethod

セッションを取得する。

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
def session(cls) -> sqlalchemy.orm.Session:
    """セッションを取得する。"""
    sess = cls.session_var.get(None)
    if sess is None:
        raise RuntimeError(f"セッションが開始されていません。{cls.__qualname__}.start_session()を呼び出してください。")
    return sess

select() classmethod

sqlalchemy.Selectを返す。

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
def select(cls) -> sqlalchemy.Select[tuple[typing.Self]]:
    """sqlalchemy.Selectを返す。"""
    # cls.count()などでfrom句が消えないように明示的にfrom句を指定して返す。
    return sqlalchemy.select(cls).select_from(cls)

insert() classmethod

sqlalchemy.Insertを返す。

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
def insert(cls) -> sqlalchemy.Insert:
    """sqlalchemy.Insertを返す。"""
    return sqlalchemy.insert(cls)

update() classmethod

sqlalchemy.Updateを返す。

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
def update(cls) -> sqlalchemy.Update:
    """sqlalchemy.Updateを返す。"""
    return sqlalchemy.update(cls)

delete() classmethod

sqlalchemy.Deleteを返す。

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
def delete(cls) -> sqlalchemy.Delete:
    """sqlalchemy.Deleteを返す。"""
    return sqlalchemy.delete(cls)

count(query) classmethod

queryのレコード数を取得する。

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
def count(cls, query: sqlalchemy.Select | sqlalchemy.CompoundSelect) -> int:
    """queryのレコード数を取得する。"""
    # pylint: disable=not-callable
    return (
        cls.scalar_one_or_none(sqlalchemy.select(sqlalchemy.func.count()).select_from(query.order_by(None).subquery())) or 0
    )

scalar_one(query) classmethod

queryの結果を1件取得する。

引数:

名前 タイプ デスクリプション デフォルト
query Select[tuple[T]] | CompoundSelect[tuple[T]]

クエリ。

必須

戻り値:

タイプ デスクリプション
T

1件のインスタンス。

発生:

タイプ デスクリプション
NoResultFound

結果が0件の場合。

MultipleResultsFound

結果が複数件の場合。

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
def scalar_one[T](cls, query: sqlalchemy.Select[tuple[T]] | sqlalchemy.CompoundSelect[tuple[T]]) -> T:
    """queryの結果を1件取得する。

    Args:
        query: クエリ。

    Returns:
        1件のインスタンス。

    Raises:
        sqlalchemy.exc.NoResultFound: 結果が0件の場合。
        sqlalchemy.exc.MultipleResultsFound: 結果が複数件の場合。

    """
    return cls.session().execute(query).scalar_one()

scalar_one_or_none(query) classmethod

queryの結果を0件または1件取得する。

引数:

名前 タイプ デスクリプション デフォルト
query Select[tuple[T]] | CompoundSelect[tuple[T]]

クエリ。

必須

戻り値:

タイプ デスクリプション
T | None

0件の場合はNone、1件の場合はインスタンス。

発生:

タイプ デスクリプション
MultipleResultsFound

結果が複数件の場合。

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
def scalar_one_or_none[T](cls, query: sqlalchemy.Select[tuple[T]] | sqlalchemy.CompoundSelect[tuple[T]]) -> T | None:
    """queryの結果を0件または1件取得する。

    Args:
        query: クエリ。

    Returns:
        0件の場合はNone、1件の場合はインスタンス。

    Raises:
        sqlalchemy.exc.MultipleResultsFound: 結果が複数件の場合。

    """
    return cls.session().execute(query).scalar_one_or_none()

scalars(query) classmethod

queryの結果を全件取得する。

引数:

名前 タイプ デスクリプション デフォルト
query Select[tuple[T]] | CompoundSelect[tuple[T]]

クエリ。

必須

戻り値:

タイプ デスクリプション
list[T]

全件のインスタンスのリスト。

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
def scalars[T](cls, query: sqlalchemy.Select[tuple[T]] | sqlalchemy.CompoundSelect[tuple[T]]) -> list[T]:
    """queryの結果を全件取得する。

    Args:
        query: クエリ。

    Returns:
        全件のインスタンスのリスト。

    """
    return list(cls.session().execute(query).scalars().all())

one(query) classmethod

queryの結果を1件取得する。

引数:

名前 タイプ デスクリプション デフォルト
query Select[TT] | CompoundSelect[TT]

クエリ。

必須

戻り値:

タイプ デスクリプション
Row[TT]

1件のインスタンス。

発生:

タイプ デスクリプション
NoResultFound

結果が0件の場合。

MultipleResultsFound

結果が複数件の場合。

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
def one[TT: tuple](cls, query: sqlalchemy.Select[TT] | sqlalchemy.CompoundSelect[TT]) -> sqlalchemy.Row[TT]:
    """queryの結果を1件取得する。

    Args:
        query: クエリ。

    Returns:
        1件のインスタンス。

    Raises:
        sqlalchemy.exc.NoResultFound: 結果が0件の場合。
        sqlalchemy.exc.MultipleResultsFound: 結果が複数件の場合。

    """
    return cls.session().execute(query).one()

one_or_none(query) classmethod

queryの結果を0件または1件取得する。

引数:

名前 タイプ デスクリプション デフォルト
query Select[TT] | CompoundSelect[TT]

クエリ。

必須

戻り値:

タイプ デスクリプション
Row[TT] | None

0件の場合はNone、1件の場合はインスタンス。

発生:

タイプ デスクリプション
MultipleResultsFound

結果が複数件の場合。

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
def one_or_none[TT: tuple](cls, query: sqlalchemy.Select[TT] | sqlalchemy.CompoundSelect[TT]) -> sqlalchemy.Row[TT] | None:
    """queryの結果を0件または1件取得する。

    Args:
        query: クエリ。

    Returns:
        0件の場合はNone、1件の場合はインスタンス。

    Raises:
        sqlalchemy.exc.MultipleResultsFound: 結果が複数件の場合。

    """
    return cls.session().execute(query).one_or_none()

all(query) classmethod

queryの結果を全件取得する。

引数:

名前 タイプ デスクリプション デフォルト
query Select[TT] | CompoundSelect[TT]

クエリ。

必須

戻り値:

タイプ デスクリプション
list[Row[TT]]

全件のインスタンスのリスト。

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
def all[TT: tuple](cls, query: sqlalchemy.Select[TT] | sqlalchemy.CompoundSelect[TT]) -> list[sqlalchemy.Row[TT]]:
    """queryの結果を全件取得する。

    Args:
        query: クエリ。

    Returns:
        全件のインスタンスのリスト。

    """
    return list(cls.session().execute(query).all())

get_by_id_not_null(id_, for_update=False, options=None) async classmethod

IDを元にインスタンスを取得。見つからない場合は例外を出す。

引数:

名前 タイプ デスクリプション デフォルト
id_ int

ID。

必須
for_update bool

更新ロックを取得するか否か。

False
options ExecutableOption | None

クエリオプション。eager loadingなどに使用する。

None

戻り値:

タイプ デスクリプション
Self

インスタンス。

Raises: ValueError: 見つからない場合。

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
async def get_by_id_not_null(
    cls, id_: int, for_update: bool = False, options: sqlalchemy.sql.base.ExecutableOption | None = None
) -> typing.Self:
    """IDを元にインスタンスを取得。見つからない場合は例外を出す。

    Args:
        id_: ID。
        for_update: 更新ロックを取得するか否か。
        options: クエリオプション。eager loadingなどに使用する。

    Returns:
        インスタンス。
    Raises:
        ValueError: 見つからない場合。
    """
    instance = cls.get_by_id(id_, for_update=for_update, options=options)
    if instance is None:
        raise ValueError(f"{cls.__qualname__}が見つかりませんでした。id={id_}")
    return instance

get_by_id(id_, for_update=False, options=None) classmethod

IDを元にインスタンスを取得。

引数:

名前 タイプ デスクリプション デフォルト
id_ int

ID。

必須
for_update bool

更新ロックを取得するか否か。

False
options ExecutableOption | None

クエリオプション。eager loadingなどに使用。

None

戻り値:

タイプ デスクリプション
Self | None

インスタンス。

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
def get_by_id(
    cls, id_: int, for_update: bool = False, options: sqlalchemy.sql.base.ExecutableOption | None = None
) -> typing.Self | None:
    """IDを元にインスタンスを取得。

    Args:
        id_: ID。
        for_update: 更新ロックを取得するか否か。
        options: クエリオプション。eager loadingなどに使用。

    Returns:
        インスタンス。

    """
    q = cls.select().where(cls.id == id_)  # type: ignore  # pylint: disable=no-member
    if options is not None:
        q = q.options(options)
    if for_update:
        q = q.with_for_update()
    return cls.scalar_one_or_none(q)

paginate(query, page, per_page, scalar=True) classmethod

Flask-SQLAlchemy風ページネーション。

引数:

名前 タイプ デスクリプション デフォルト
query Select | CompoundSelect

ページネーションするクエリ。

必須
page int

ページ番号。

必須
per_page int

1ページあたりのアイテム数。

必須
scalar bool

Trueの場合、スカラー値を返す。Falseの場合、全件のインスタンスを返す。

True

戻り値:

タイプ デスクリプション
Paginator

ページネーションされた結果を返すpytilpack.paginator.Paginatorインスタンス。

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
def paginate(
    cls,
    query: sqlalchemy.Select | sqlalchemy.CompoundSelect,
    page: int,
    per_page: int,
    scalar: bool = True,
) -> pytilpack.paginator.Paginator:
    """Flask-SQLAlchemy風ページネーション。

    Args:
        query: ページネーションするクエリ。
        page: ページ番号。
        per_page: 1ページあたりのアイテム数。
        scalar: Trueの場合、スカラー値を返す。Falseの場合、全件のインスタンスを返す。

    Returns:
        ページネーションされた結果を返すpytilpack.paginator.Paginatorインスタンス。
    """
    assert page > 0, "ページ番号は1以上でなければなりません。"
    assert per_page > 0, "1ページあたりのアイテム数は1以上でなければなりません。"
    total = cls.count(query)
    page_query = query.offset((page - 1) * per_page).limit(per_page)
    items = cls.scalars(page_query) if scalar else cls.all(page_query)
    # pylint: disable=protected-access
    return pytilpack.paginator.Paginator(page=page, per_page=per_page, items=items, total=total)

commit() classmethod

セッションをコミットする。

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
def commit(cls) -> None:
    """セッションをコミットする。"""
    cls.session().commit()

acount(query) async classmethod

queryのレコード数を取得する。(非同期版)

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
async def acount(cls, query: sqlalchemy.Select | sqlalchemy.CompoundSelect) -> int:
    """queryのレコード数を取得する。(非同期版)"""
    return await run_sync_with_session(cls.count.__func__)(cls, query)  # type: ignore[attr-defined]

ascalar_one(query) async classmethod

queryの結果を1件取得する。(非同期版)

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
async def ascalar_one[T](cls, query: sqlalchemy.Select[tuple[T]] | sqlalchemy.CompoundSelect[tuple[T]]) -> T:
    """queryの結果を1件取得する。(非同期版)"""
    return await run_sync_with_session(cls.scalar_one.__func__)(cls, query)  # type: ignore[attr-defined]

ascalar_one_or_none(query) async classmethod

queryの結果を0件または1件取得する。(非同期版)

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
async def ascalar_one_or_none[T](cls, query: sqlalchemy.Select[tuple[T]] | sqlalchemy.CompoundSelect[tuple[T]]) -> T | None:
    """queryの結果を0件または1件取得する。(非同期版)"""
    return await run_sync_with_session(cls.scalar_one_or_none.__func__)(cls, query)  # type: ignore[attr-defined]

ascalars(query) async classmethod

queryの結果を全件取得する。(非同期版)

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
async def ascalars[T](cls, query: sqlalchemy.Select[tuple[T]] | sqlalchemy.CompoundSelect[tuple[T]]) -> list[T]:
    """queryの結果を全件取得する。(非同期版)"""
    return await run_sync_with_session(cls.scalars.__func__)(cls, query)  # type: ignore[attr-defined]

aone(query) async classmethod

queryの結果を1件取得する。(非同期版)

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
async def aone[TT: tuple](cls, query: sqlalchemy.Select[TT] | sqlalchemy.CompoundSelect[TT]) -> sqlalchemy.Row[TT]:
    """queryの結果を1件取得する。(非同期版)"""
    return await run_sync_with_session(cls.one.__func__)(cls, query)  # type: ignore[attr-defined]

aone_or_none(query) async classmethod

queryの結果を0件または1件取得する。(非同期版)

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
async def aone_or_none[TT: tuple](
    cls, query: sqlalchemy.Select[TT] | sqlalchemy.CompoundSelect[TT]
) -> sqlalchemy.Row[TT] | None:
    """queryの結果を0件または1件取得する。(非同期版)"""
    return await run_sync_with_session(cls.one_or_none.__func__)(cls, query)  # type: ignore[attr-defined]

aall(query) async classmethod

queryの結果を全件取得する。(非同期版)

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
async def aall[TT: tuple](cls, query: sqlalchemy.Select[TT] | sqlalchemy.CompoundSelect[TT]) -> list[sqlalchemy.Row[TT]]:
    """queryの結果を全件取得する。(非同期版)"""
    return await run_sync_with_session(cls.all.__func__)(cls, query)  # type: ignore[attr-defined]

aget_by_id(id_, for_update=False) async classmethod

IDを元にインスタンスを取得。(非同期版)

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
async def aget_by_id(cls, id_: int, for_update: bool = False) -> typing.Self | None:
    """IDを元にインスタンスを取得。(非同期版)"""
    return await run_sync_with_session(cls.get_by_id.__func__)(cls, id_, for_update)  # type: ignore[attr-defined]

apaginate(query, page, per_page, scalar=True) async classmethod

Flask-SQLAlchemy風ページネーション。(非同期版)

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
async def apaginate(
    cls,
    query: sqlalchemy.Select | sqlalchemy.CompoundSelect,
    page: int,
    per_page: int,
    scalar: bool = True,
) -> pytilpack.paginator.Paginator:
    """Flask-SQLAlchemy風ページネーション。(非同期版)"""
    return await run_sync_with_session(cls.paginate.__func__)(cls, query, page, per_page, scalar)  # type: ignore[attr-defined]

to_dict(includes=None, excludes=None, exclude_none=False, value_converter=None, datetime_to_iso=True)

インスタンスを辞書化する。

引数:

名前 タイプ デスクリプション デフォルト
includes list[str] | None

辞書化するフィールド名のリスト。excludesと同時指定不可。

None
excludes list[str] | None

辞書化しないフィールド名のリスト。includesと同時指定不可。

None
exclude_none bool

Noneのフィールドを除外するかどうか。

False
value_converter Callable[[Any], Any] | None

各フィールドの値を変換する関数。引数は値、戻り値は変換後の値。

None
datetime_to_iso bool

datetime型の値をISOフォーマットの文字列に変換するかどうか。

True

戻り値:

タイプ デスクリプション
dict[str, Any]

辞書。

ソースコード位置: pytilpack/sqlalchemy/sync.py
def to_dict(
    self,
    includes: list[str] | None = None,
    excludes: list[str] | None = None,
    exclude_none: bool = False,
    value_converter: typing.Callable[[typing.Any], typing.Any] | None = None,
    datetime_to_iso: bool = True,
) -> dict[str, typing.Any]:
    """インスタンスを辞書化する。

    Args:
        includes: 辞書化するフィールド名のリスト。excludesと同時指定不可。
        excludes: 辞書化しないフィールド名のリスト。includesと同時指定不可。
        exclude_none: Noneのフィールドを除外するかどうか。
        value_converter: 各フィールドの値を変換する関数。引数は値、戻り値は変換後の値。
        datetime_to_iso: datetime型の値をISOフォーマットの文字列に変換するかどうか。

    Returns:
        辞書。

    """
    assert (includes is None) or (excludes is None)
    mapper = sqlalchemy.inspect(self.__class__, raiseerr=True)
    assert mapper is not None
    all_columns = [
        mapper.get_property_by_column(column).key
        for column in self.__table__.columns  # type: ignore[attr-defined]
    ]
    if includes is None:
        includes = all_columns
        if excludes is None:
            pass
        else:
            assert (set(all_columns) & set(excludes)) == set(excludes)
            includes = list(filter(lambda x: x not in excludes, includes))
    else:
        assert excludes is None
        assert (set(all_columns) & set(includes)) == set(includes)

    def convert_value(value: typing.Any) -> typing.Any:
        """値を変換する関数。"""
        if datetime_to_iso and isinstance(value, datetime.datetime | datetime.date):
            return value.isoformat()
        if value_converter is not None:
            return value_converter(value)
        return value

    return {
        column_name: convert_value(getattr(self, column_name))
        for column_name in includes
        if not exclude_none or getattr(self, column_name) is not None
    }

SyncUniqueIDMixin

self.unique_idを持つテーブルクラスに便利メソッドを生やすmixin。

generate_unique_id() classmethod

ユニークIDを生成する。

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
def generate_unique_id(cls) -> str:
    """ユニークIDを生成する。"""
    return secrets.token_urlsafe(32)

get_by_unique_id(unique_id, allow_id=False, for_update=False) classmethod

ユニークIDを元にインスタンスを取得。

引数:

名前 タイプ デスクリプション デフォルト
unique_id str | int

ユニークID。

必須
allow_id bool

ユニークIDだけでなくID(int)も許可するかどうか。

False
for_update bool

更新ロックを取得するか否か。

False

戻り値:

タイプ デスクリプション
Self | None

インスタンス。

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
def get_by_unique_id(
    cls: type[typing.Self],
    unique_id: str | int,
    allow_id: bool = False,
    for_update: bool = False,
) -> typing.Self | None:
    """ユニークIDを元にインスタンスを取得。

    Args:
        unique_id: ユニークID。
        allow_id: ユニークIDだけでなくID(int)も許可するかどうか。
        for_update: 更新ロックを取得するか否か。

    Returns:
        インスタンス。

    """
    assert issubclass(cls, SyncMixin)
    if allow_id and isinstance(unique_id, int):
        q = cls.select().where(cls.id == unique_id)  # type: ignore
    else:
        q = cls.select().where(cls.unique_id == unique_id)  # type: ignore
    if for_update:
        q = q.with_for_update()
    return cls.scalar_one_or_none(q)

asafe_close(session, log_level=logging.DEBUG) async

例外を出さずにセッションをクローズ。

ソースコード位置: pytilpack/sqlalchemy/async_.py
async def asafe_close(session: sqlalchemy.ext.asyncio.AsyncSession, log_level: int | None = logging.DEBUG):
    """例外を出さずにセッションをクローズ。"""
    try:
        await asyncio.shield(session.close())
    except Exception:
        if log_level is not None:
            logger.log(log_level, "セッションクローズ失敗", exc_info=True)

await_for_connection(url, timeout=180.0) async

DBに接続可能になるまで待機する。

ソースコード位置: pytilpack/sqlalchemy/async_.py
async def await_for_connection(url: str, timeout: float = 180.0) -> None:
    """DBに接続可能になるまで待機する。"""
    failed = False
    start_time = time.time()
    while True:
        try:
            engine = sqlalchemy.ext.asyncio.create_async_engine(url)
            try:
                async with engine.connect() as connection:
                    await connection.execute(sqlalchemy.text("SELECT 1"))
            finally:
                await engine.dispose()
            # 接続成功
            if failed:  # 過去に接続失敗していた場合だけログを出す
                logger.info("DB接続成功")
            break
        except Exception as e:
            # 接続失敗
            if not failed:
                failed = True
                logger.info(f"DB接続待機中 . . . (URL: {url})")
            remain_time = timeout - (time.time() - start_time)
            if remain_time <= 0:
                raise RuntimeError(f"DB接続タイムアウト (URL: {url})") from e
            await asyncio.sleep(min(1, remain_time))

describe_table(table, orm_class, tablefmt='grid')

テーブル構造を文字列化する。

ソースコード位置: pytilpack/sqlalchemy/describe.py
def describe_table(
    table: sqlalchemy.sql.schema.Table,
    orm_class: type[sqlalchemy.orm.DeclarativeBase],
    tablefmt: "str | tabulate.TableFormat" = "grid",
) -> str:
    """テーブル構造を文字列化する。"""
    try:
        class_field_comments = pytilpack.python.class_field_comments(orm_class)
    except Exception as e:
        logger.warning(f"クラスフィールドコメント取得失敗: {e}")
        class_field_comments = {}

    headers = ["Field", "Type", "Null", "Key", "Default", "Extra", "Comment"]
    rows = []
    for column in table.columns:
        key = ""
        if column.primary_key:
            key = "PRI"
        elif column.unique:
            key = "UNI"
        elif column.index:
            key = "MUL"

        extra = ""
        if column.autoincrement and column.primary_key:
            extra = "auto_increment"

        default_value = (
            column.default.arg
            if column.default is not None and isinstance(column.default, sqlalchemy.sql.schema.ColumnDefault)
            else column.default
        )
        default: str
        if default_value is None:
            default = "NULL"
        elif callable(default_value):
            default = "(function)"
        elif isinstance(default_value, sqlalchemy.sql.elements.CompilerElement):  # type: ignore[attr-defined]
            default = str(default_value.compile(compile_kwargs={"literal_binds": True}))
        else:
            default = str(default_value)

        # コメントは以下の優先順位で拾う。
        # doc(DBに反映されないもの) > comment(DBに反映されるもの)
        #  > class_field_comments(ソースコード上のコメント)
        comment: str = ""
        if column.doc:
            comment = column.doc
        elif column.comment:
            comment = column.comment
        elif column.name in class_field_comments:
            comment = class_field_comments[column.name] or ""

        rows.append(
            [
                column.name,
                str(column.type),
                "YES" if column.nullable else "NO",
                key,
                default,
                extra,
                comment,
            ]
        )
    table_description = tabulate.tabulate(rows, headers=headers, tablefmt=tablefmt)

    return f"Table: {table.name}\n{table_description}\n"

get_class_by_table(base, table)

テーブルからクラスを取得する。

ソースコード位置: pytilpack/sqlalchemy/describe.py
def get_class_by_table(
    base: type[sqlalchemy.orm.DeclarativeBase], table: sqlalchemy.sql.schema.Table
) -> type[sqlalchemy.orm.DeclarativeBase]:
    """テーブルからクラスを取得する。"""
    for mapper in base.registry.mappers:
        if mapper.local_table is table:
            return typing.cast(type[sqlalchemy.orm.DeclarativeBase], mapper.class_)
    raise ValueError(f"テーブル {table.name} に対応するクラスが見つかりませんでした。")

register_ping()

コネクションプールの切断対策。

ソースコード位置: pytilpack/sqlalchemy/flask.py
def register_ping():
    """コネクションプールの切断対策。"""

    @sqlalchemy.event.listens_for(sqlalchemy.pool.Pool, "checkout")
    def _ping_connection(dbapi_connection, connection_record, connection_proxy):
        """コネクションプールの切断対策。"""
        _ = connection_record, connection_proxy  # noqa
        cursor = dbapi_connection.cursor()
        try:
            cursor.execute("SELECT 1")
        except Exception as e:
            raise sqlalchemy.exc.DisconnectionError() from e
        finally:
            cursor.close()

run_sync_with_session(func)

同期関数を非同期に実行し、スレッド内でセッションを管理するデコレーター。

別スレッドで実行される関数内で自動的にセッションスコープを作成する。 各呼び出しは独立したセッション・トランザクションを持つ。

引数:

名前 タイプ デスクリプション デフォルト
func Callable[Concatenate[type[SyncMixin], P], R]

デコレート対象の同期関数

必須

戻り値:

タイプ デスクリプション
Callable[Concatenate[type[SyncMixin], P], Awaitable[R]]

非同期版の関数

ソースコード位置: pytilpack/sqlalchemy/sync.py
def run_sync_with_session[**P, R](
    func: "typing.Callable[typing.Concatenate[type[SyncMixin], P], R]",
) -> "typing.Callable[typing.Concatenate[type[SyncMixin], P], typing.Awaitable[R]]":
    """同期関数を非同期に実行し、スレッド内でセッションを管理するデコレーター。

    別スレッドで実行される関数内で自動的にセッションスコープを作成する。
    各呼び出しは独立したセッション・トランザクションを持つ。

    Args:
        func: デコレート対象の同期関数

    Returns:
        非同期版の関数
    """

    @functools.wraps(func)
    async def wrapper(cls: type[SyncMixin], *args: P.args, **kwargs: P.kwargs) -> R:
        def _impl() -> R:
            with cls.session_scope() as session:
                result = func(cls, *args, **kwargs)
                session.commit()
                return result

        return await asyncio.to_thread(_impl)

    return wrapper

safe_close(session, log_level=logging.DEBUG)

例外を出さずにセッションをクローズ。

ソースコード位置: pytilpack/sqlalchemy/sync.py
def safe_close(
    session: sqlalchemy.orm.Session | sqlalchemy.orm.scoped_session,
    log_level: int | None = logging.DEBUG,
):
    """例外を出さずにセッションをクローズ。"""
    try:
        session.close()
    except Exception:
        if log_level is not None:
            logger.log(log_level, "セッションクローズ失敗", exc_info=True)

wait_for_connection(url, timeout=180.0)

DBに接続可能になるまで待機する。

ソースコード位置: pytilpack/sqlalchemy/sync.py
def wait_for_connection(url: str, timeout: float = 180.0) -> None:
    """DBに接続可能になるまで待機する。"""
    failed = False
    start_time = time.time()
    while True:
        try:
            engine = sqlalchemy.create_engine(url)
            try:
                with engine.connect() as connection:
                    result = connection.execute(sqlalchemy.text("SELECT 1"))
                    result.close()
            finally:
                engine.dispose()
            # 接続成功
            if failed:  # 過去に接続失敗していた場合だけログを出す
                logger.info("DB接続成功")
            break
        except Exception as e:
            # 接続失敗
            if not failed:
                failed = True
                logger.info(f"DB接続待機中 . . . (URL: {url})")
            remain_time = timeout - (time.time() - start_time)
            if remain_time <= 0:
                raise RuntimeError(f"DB接続タイムアウト (URL: {url})") from e
            time.sleep(min(1, remain_time))

async_

SQLAlchemy用のユーティリティ集(async版)。

AsyncMixin

Bases: AsyncAttrs, _ReprMixin

モデルのベースクラス。SQLAlchemy 2.0スタイル・async前提。

例:

モデル定義例::

class Base(sqlalchemy.orm.DeclarativeBase, pytilpack.sqlalchemy_.AsyncMixin):
    pass

class User(Base):
    __tablename__ = "users"
    ...

Quart例::

@app.before_request
async def _before_request() -> None:
    quart.g.db_session_token = await models.Base.start_session()

@app.teardown_request
async def _teardown_request(_: BaseException | None) -> None:
    if hasattr(quart.g, "db_session_token"):
        await models.Base.close_session(quart.g.db_session_token)
        del quart.g.db_session_token
session_var = contextvars.ContextVar('session_var') class-attribute instance-attribute

セッション。

init(url, pool_size=None, max_overflow=None, pool_recycle=280, pool_pre_ping=True, autoflush=True, expire_on_commit=False, eagerly_init=True, **kwargs) classmethod

DB接続を初期化する。(推奨される既定設定を適用する。)

engineとsessionmakerはスレッドローカルで遅延初期化される。 各スレッドが初めてDB操作を行う際にそのスレッド専用のengineとsessionmakerが生成される。

引数:

名前 タイプ デスクリプション デフォルト
url str | URL

DB接続URL。

必須
pool_size int | None

コネクションプールのサイズ。スレッド数に応じて調整要。負数の場合はプーリングを無効化する。

None
max_overflow int | None

コネクションプールの最大オーバーフロー数。Noneの場合はデフォルト値を使用。

None
pool_recycle int | None

コネクションプールのリサイクル時間。Noneの場合はデフォルト値を使用。

280
pool_pre_ping bool

コネクションプールのプレピン。Noneの場合はデフォルト値を使用。

True
autoflush bool

セッションのautoflushフラグ。デフォルトはTrue。

True
expire_on_commit bool

セッションのexpire_on_commitフラグ。デフォルトはFalse。

False
eagerly_init bool

Trueの場合(デフォルト)、呼び出し元スレッドのengineとsessionmakerをすぐに生成する。

True
**kwargs Any

その他のsqlalchemy.create_async_engine()へのキーワード引数。

{}
ソースコード位置: pytilpack/sqlalchemy/async_.py
@classmethod
def init(
    cls,
    url: str | sqlalchemy.engine.URL,
    pool_size: int | None = None,
    max_overflow: int | None = None,
    pool_recycle: int | None = 280,
    pool_pre_ping: bool = True,
    autoflush: bool = True,
    expire_on_commit: bool = False,
    eagerly_init: bool = True,
    **kwargs: typing.Any,
) -> None:
    """DB接続を初期化する。(推奨される既定設定を適用する。)

    engineとsessionmakerはスレッドローカルで遅延初期化される。
    各スレッドが初めてDB操作を行う際にそのスレッド専用のengineとsessionmakerが生成される。

    Args:
        url: DB接続URL。
        pool_size: コネクションプールのサイズ。スレッド数に応じて調整要。負数の場合はプーリングを無効化する。
        max_overflow: コネクションプールの最大オーバーフロー数。Noneの場合はデフォルト値を使用。
        pool_recycle: コネクションプールのリサイクル時間。Noneの場合はデフォルト値を使用。
        pool_pre_ping: コネクションプールのプレピン。Noneの場合はデフォルト値を使用。
        autoflush: セッションのautoflushフラグ。デフォルトはTrue。
        expire_on_commit: セッションのexpire_on_commitフラグ。デフォルトはFalse。
        eagerly_init: Trueの場合(デフォルト)、呼び出し元スレッドのengineとsessionmakerをすぐに生成する。
        **kwargs: その他のsqlalchemy.create_async_engine()へのキーワード引数。

    """
    assert cls._init_args is None, "DB接続はすでに初期化されています。"

    if pool_size is not None and pool_size < 0:
        engine_kwargs = kwargs.copy()
        engine_kwargs["poolclass"] = sqlalchemy.pool.NullPool
    else:
        if pool_size is not None and max_overflow is None:
            max_overflow = pool_size * 1  # デフォルトで倍まで許可
        engine_kwargs = kwargs.copy()
        if pool_size is not None:
            engine_kwargs["pool_size"] = pool_size
        if max_overflow is not None:
            engine_kwargs["max_overflow"] = max_overflow
    if pool_recycle is not None:
        engine_kwargs["pool_recycle"] = pool_recycle
    if pool_pre_ping is not None:
        engine_kwargs["pool_pre_ping"] = pool_pre_ping

    cls._init_args = _InitArgs(
        url=url,
        autoflush=autoflush,
        expire_on_commit=expire_on_commit,
        engine_kwargs=engine_kwargs,
    )
    cls._thread_local = threading.local()

    if eagerly_init:
        cls._get_engine_and_sessionmaker()
term() async classmethod

現在のスレッドのDB接続を終了する。

スレッドローカルなengineをdisposeして解放する。 スレッド終了時やテスト後の後始末に使用する。

ソースコード位置: pytilpack/sqlalchemy/async_.py
@classmethod
async def term(cls) -> None:
    """現在のスレッドのDB接続を終了する。

    スレッドローカルなengineをdisposeして解放する。
    スレッド終了時やテスト後の後始末に使用する。

    """
    if cls._init_args is None:
        logger.warning("未初期化時のterm()呼び出し")
        return
    if cls._thread_local is None:
        return
    if hasattr(cls._thread_local, "engine") and cls._thread_local.engine is not None:
        await cls._thread_local.engine.dispose()
        cls._thread_local.engine = None
        cls._thread_local.sessionmaker = None
engine() classmethod

現在のスレッドのDB接続を返す。init()後に初めて呼ばれた時点で遅延生成される。

ソースコード位置: pytilpack/sqlalchemy/async_.py
@classmethod
def engine(cls) -> sqlalchemy.ext.asyncio.AsyncEngine:
    """現在のスレッドのDB接続を返す。init()後に初めて呼ばれた時点で遅延生成される。"""
    return cls._get_engine_and_sessionmaker()[0]
sessionmaker() classmethod

現在のスレッドのセッションファクトリを返す。init()後に初めて呼ばれた時点で遅延生成される。

ソースコード位置: pytilpack/sqlalchemy/async_.py
@classmethod
def sessionmaker(
    cls,
) -> sqlalchemy.ext.asyncio.async_sessionmaker[sqlalchemy.ext.asyncio.AsyncSession]:
    """現在のスレッドのセッションファクトリを返す。init()後に初めて呼ばれた時点で遅延生成される。"""
    return cls._get_engine_and_sessionmaker()[1]
connect() classmethod

DBに接続する。

使用例:: async with Base.connect() as conn: await conn.run_sync(Base.metadata.create_all)

ソースコード位置: pytilpack/sqlalchemy/async_.py
@classmethod
def connect(cls) -> sqlalchemy.ext.asyncio.AsyncConnection:
    """DBに接続する。

    使用例::
        async with Base.connect() as conn:
            await conn.run_sync(Base.metadata.create_all)

    """
    return cls.engine().connect()
session_scope(name=None, log_level=logging.DEBUG) async classmethod

セッションを開始するコンテキストマネージャ。

使用例:: async with Base.session_scope() as session: ...

引数:

名前 タイプ デスクリプション デフォルト
name str | None

セッション名。指定時のみログ出力する。

None
log_level int

ログレベル。

DEBUG
ソースコード位置: pytilpack/sqlalchemy/async_.py
@classmethod
@contextlib.asynccontextmanager
async def session_scope(
    cls,
    name: str | None = None,
    log_level: int = logging.DEBUG,
):
    """セッションを開始するコンテキストマネージャ。

    使用例::
        async with Base.session_scope() as session:
            ...

    Args:
        name: セッション名。指定時のみログ出力する。
        log_level: ログレベル。

    """
    token = await cls.start_session(name=name, log_level=log_level)
    try:
        yield cls.session()
    finally:
        await cls.close_session(token, name=name, log_level=log_level)
start_session(name=None, log_level=logging.DEBUG) async classmethod

セッションを開始する。

ソースコード位置: pytilpack/sqlalchemy/async_.py
@classmethod
async def start_session(
    cls, name: str | None = None, log_level: int = logging.DEBUG
) -> contextvars.Token[sqlalchemy.ext.asyncio.AsyncSession]:
    """セッションを開始する。"""
    session = cls.sessionmaker()()  # pylint: disable=not-callable
    token = cls.session_var.set(session)
    if name is not None:
        logger.log(
            log_level,
            f"セッション開始: {name} session={id(session):x},"
            f" thread={threading.get_ident():x},"
            f" task={pytilpack.asyncio.get_task_id_hex()}",
        )
    return token
close_session(token, name=None, log_level=logging.DEBUG) async classmethod

セッションを終了する。

ソースコード位置: pytilpack/sqlalchemy/async_.py
@classmethod
async def close_session(
    cls,
    token: contextvars.Token[sqlalchemy.ext.asyncio.AsyncSession],
    name: str | None = None,
    log_level: int = logging.DEBUG,
) -> None:
    """セッションを終了する。"""
    session = cls.session()
    if name is not None:
        logger.log(
            log_level,
            f"セッション終了: {name} session={id(session):x},"
            f" thread={threading.get_ident():x},"
            f" task={pytilpack.asyncio.get_task_id_hex()}",
        )
    await asafe_close(session)
    cls.session_var.reset(token)
session() classmethod

セッションを取得する。

ソースコード位置: pytilpack/sqlalchemy/async_.py
@classmethod
def session(cls) -> sqlalchemy.ext.asyncio.AsyncSession:
    """セッションを取得する。"""
    sess = cls.session_var.get(None)
    if sess is None:
        raise RuntimeError(f"セッションが開始されていません。{cls.__qualname__}.start_session()を呼び出してください。")
    return sess
select() classmethod

sqlalchemy.Selectを返す。

ソースコード位置: pytilpack/sqlalchemy/async_.py
@classmethod
def select(cls) -> sqlalchemy.Select[tuple[typing.Self]]:
    """sqlalchemy.Selectを返す。"""
    # cls.count()などでfrom句が消えないように明示的にfrom句を指定して返す。
    return sqlalchemy.select(cls).select_from(cls)
insert() classmethod

sqlalchemy.Insertを返す。

ソースコード位置: pytilpack/sqlalchemy/async_.py
@classmethod
def insert(cls) -> sqlalchemy.Insert:
    """sqlalchemy.Insertを返す。"""
    return sqlalchemy.insert(cls)
update() classmethod

sqlalchemy.Updateを返す。

ソースコード位置: pytilpack/sqlalchemy/async_.py
@classmethod
def update(cls) -> sqlalchemy.Update:
    """sqlalchemy.Updateを返す。"""
    return sqlalchemy.update(cls)
delete() classmethod

sqlalchemy.Deleteを返す。

ソースコード位置: pytilpack/sqlalchemy/async_.py
@classmethod
def delete(cls) -> sqlalchemy.Delete:
    """sqlalchemy.Deleteを返す。"""
    return sqlalchemy.delete(cls)
count(query) async classmethod

queryのレコード数を取得する。

ソースコード位置: pytilpack/sqlalchemy/async_.py
@classmethod
async def count(cls, query: sqlalchemy.Select | sqlalchemy.CompoundSelect) -> int:
    """queryのレコード数を取得する。"""
    # pylint: disable=not-callable
    return (
        await cls.scalar_one_or_none(
            sqlalchemy.select(sqlalchemy.func.count()).select_from(query.order_by(None).subquery())
        )
        or 0
    )
scalar_one(query) async classmethod

queryの結果を1件取得する。

引数:

名前 タイプ デスクリプション デフォルト
query Select[tuple[T]] | CompoundSelect[tuple[T]]

クエリ。

必須

戻り値:

タイプ デスクリプション
T

1件のインスタンス。

発生:

タイプ デスクリプション
NoResultFound

結果が0件の場合。

MultipleResultsFound

結果が複数件の場合。

ソースコード位置: pytilpack/sqlalchemy/async_.py
@classmethod
async def scalar_one[T](cls, query: sqlalchemy.Select[tuple[T]] | sqlalchemy.CompoundSelect[tuple[T]]) -> T:
    """queryの結果を1件取得する。

    Args:
        query: クエリ。

    Returns:
        1件のインスタンス。

    Raises:
        sqlalchemy.exc.NoResultFound: 結果が0件の場合。
        sqlalchemy.exc.MultipleResultsFound: 結果が複数件の場合。

    """
    return (await cls.session().execute(query)).scalar_one()
scalar_one_or_none(query) async classmethod

queryの結果を0件または1件取得する。

引数:

名前 タイプ デスクリプション デフォルト
query Select[tuple[T]] | CompoundSelect[tuple[T]]

クエリ。

必須

戻り値:

タイプ デスクリプション
T | None

0件の場合はNone、1件の場合はインスタンス。

発生:

タイプ デスクリプション
MultipleResultsFound

結果が複数件の場合。

ソースコード位置: pytilpack/sqlalchemy/async_.py
@classmethod
async def scalar_one_or_none[T](cls, query: sqlalchemy.Select[tuple[T]] | sqlalchemy.CompoundSelect[tuple[T]]) -> T | None:
    """queryの結果を0件または1件取得する。

    Args:
        query: クエリ。

    Returns:
        0件の場合はNone、1件の場合はインスタンス。

    Raises:
        sqlalchemy.exc.MultipleResultsFound: 結果が複数件の場合。

    """
    return (await cls.session().execute(query)).scalar_one_or_none()
scalars(query) async classmethod

queryの結果を全件取得する。

引数:

名前 タイプ デスクリプション デフォルト
query Select[tuple[T]] | CompoundSelect[tuple[T]]

クエリ。

必須

戻り値:

タイプ デスクリプション
list[T]

全件のインスタンスのリスト。

ソースコード位置: pytilpack/sqlalchemy/async_.py
@classmethod
async def scalars[T](cls, query: sqlalchemy.Select[tuple[T]] | sqlalchemy.CompoundSelect[tuple[T]]) -> list[T]:
    """queryの結果を全件取得する。

    Args:
        query: クエリ。

    Returns:
        全件のインスタンスのリスト。

    """
    return list((await cls.session().execute(query)).scalars().all())
one(query) async classmethod

queryの結果を1件取得する。

引数:

名前 タイプ デスクリプション デフォルト
query Select[TT] | CompoundSelect[TT]

クエリ。

必須

戻り値:

タイプ デスクリプション
Row[TT]

1件のインスタンス。

発生:

タイプ デスクリプション
NoResultFound

結果が0件の場合。

MultipleResultsFound

結果が複数件の場合。

ソースコード位置: pytilpack/sqlalchemy/async_.py
@classmethod
async def one[TT: tuple](cls, query: sqlalchemy.Select[TT] | sqlalchemy.CompoundSelect[TT]) -> sqlalchemy.Row[TT]:
    """queryの結果を1件取得する。

    Args:
        query: クエリ。

    Returns:
        1件のインスタンス。

    Raises:
        sqlalchemy.exc.NoResultFound: 結果が0件の場合。
        sqlalchemy.exc.MultipleResultsFound: 結果が複数件の場合。

    """
    return (await cls.session().execute(query)).one()
one_or_none(query) async classmethod

queryの結果を0件または1件取得する。

引数:

名前 タイプ デスクリプション デフォルト
query Select[TT] | CompoundSelect[TT]

クエリ。

必須

戻り値:

タイプ デスクリプション
Row[TT] | None

0件の場合はNone、1件の場合はインスタンス。

発生:

タイプ デスクリプション
MultipleResultsFound

結果が複数件の場合。

ソースコード位置: pytilpack/sqlalchemy/async_.py
@classmethod
async def one_or_none[TT: tuple](
    cls, query: sqlalchemy.Select[TT] | sqlalchemy.CompoundSelect[TT]
) -> sqlalchemy.Row[TT] | None:
    """queryの結果を0件または1件取得する。

    Args:
        query: クエリ。

    Returns:
        0件の場合はNone、1件の場合はインスタンス。

    Raises:
        sqlalchemy.exc.MultipleResultsFound: 結果が複数件の場合。

    """
    return (await cls.session().execute(query)).one_or_none()
all(query) async classmethod

queryの結果を全件取得する。

引数:

名前 タイプ デスクリプション デフォルト
query Select[TT] | CompoundSelect[TT]

クエリ。

必須

戻り値:

タイプ デスクリプション
list[Row[TT]]

全件のインスタンスのリスト。

ソースコード位置: pytilpack/sqlalchemy/async_.py
@classmethod
async def all[TT: tuple](cls, query: sqlalchemy.Select[TT] | sqlalchemy.CompoundSelect[TT]) -> list[sqlalchemy.Row[TT]]:
    """queryの結果を全件取得する。

    Args:
        query: クエリ。

    Returns:
        全件のインスタンスのリスト。

    """
    return list((await cls.session().execute(query)).all())
get_by_id_not_null(id_, for_update=False, options=None) async classmethod

IDを元にインスタンスを取得。見つからない場合は例外を出す。

引数:

名前 タイプ デスクリプション デフォルト
id_ int

ID。

必須
for_update bool

更新ロックを取得するか否か。

False
options ExecutableOption | None

クエリオプション。eager loadingなどに使用する。

None

戻り値:

タイプ デスクリプション
Self

インスタンス。

Raises: ValueError: 見つからない場合。

ソースコード位置: pytilpack/sqlalchemy/async_.py
@classmethod
async def get_by_id_not_null(
    cls, id_: int, for_update: bool = False, options: sqlalchemy.sql.base.ExecutableOption | None = None
) -> typing.Self:
    """IDを元にインスタンスを取得。見つからない場合は例外を出す。

    Args:
        id_: ID。
        for_update: 更新ロックを取得するか否か。
        options: クエリオプション。eager loadingなどに使用する。

    Returns:
        インスタンス。
    Raises:
        ValueError: 見つからない場合。
    """
    instance = await cls.get_by_id(id_, for_update=for_update, options=options)
    if instance is None:
        raise ValueError(f"{cls.__qualname__}が見つかりませんでした。id={id_}")
    return instance
get_by_id(id_, for_update=False, options=None) async classmethod

IDを元にインスタンスを取得。

引数:

名前 タイプ デスクリプション デフォルト
id_ int

ID。

必須
for_update bool

更新ロックを取得するか否か。

False
options ExecutableOption | None

クエリオプション。eager loadingなどに使用する。

None

戻り値:

タイプ デスクリプション
Self | None

インスタンス。

ソースコード位置: pytilpack/sqlalchemy/async_.py
@classmethod
async def get_by_id(
    cls, id_: int, for_update: bool = False, options: sqlalchemy.sql.base.ExecutableOption | None = None
) -> typing.Self | None:
    """IDを元にインスタンスを取得。

    Args:
        id_: ID。
        for_update: 更新ロックを取得するか否か。
        options: クエリオプション。eager loadingなどに使用する。

    Returns:
        インスタンス。

    """
    q = cls.select().where(cls.id == id_)  # type: ignore  # pylint: disable=no-member
    if options is not None:
        q = q.options(options)
    if for_update:
        q = q.with_for_update()
    return await cls.scalar_one_or_none(q)  # type: ignore[arg-type]
paginate(query, page, per_page, scalar=True) async classmethod

Flask-SQLAlchemy風ページネーション。

引数:

名前 タイプ デスクリプション デフォルト
query Select | CompoundSelect

ページネーションするクエリ。

必須
page int

ページ番号。

必須
per_page int

1ページあたりのアイテム数。

必須
scalar bool

Trueの場合、スカラー値を返す。Falseの場合、全件のインスタンスを返す。

True

戻り値:

タイプ デスクリプション
Paginator

ページネーションされた結果を返すpytilpack.paginator.Paginatorインスタンス。

ソースコード位置: pytilpack/sqlalchemy/async_.py
@classmethod
async def paginate(
    cls,
    query: sqlalchemy.Select | sqlalchemy.CompoundSelect,
    page: int,
    per_page: int,
    scalar: bool = True,
) -> pytilpack.paginator.Paginator:
    """Flask-SQLAlchemy風ページネーション。

    Args:
        query: ページネーションするクエリ。
        page: ページ番号。
        per_page: 1ページあたりのアイテム数。
        scalar: Trueの場合、スカラー値を返す。Falseの場合、全件のインスタンスを返す。

    Returns:
        ページネーションされた結果を返すpytilpack.paginator.Paginatorインスタンス。
    """
    assert page > 0, "ページ番号は1以上でなければなりません。"
    assert per_page > 0, "1ページあたりのアイテム数は1以上でなければなりません。"
    total = await cls.count(query)
    page_query = query.offset((page - 1) * per_page).limit(per_page)
    items = await (cls.scalars(page_query) if scalar else cls.all(page_query))
    # pylint: disable=protected-access
    return pytilpack.paginator.Paginator(page=page, per_page=per_page, items=items, total=total)
to_dict(includes=None, excludes=None, exclude_none=False, value_converter=None, datetime_to_iso=True)

インスタンスを辞書化する。

引数:

名前 タイプ デスクリプション デフォルト
includes list[str] | None

辞書化するフィールド名のリスト。excludesと同時指定不可。

None
excludes list[str] | None

辞書化しないフィールド名のリスト。includesと同時指定不可。

None
exclude_none bool

Noneのフィールドを除外するかどうか。

False
value_converter Callable[[Any], Any] | None

各フィールドの値を変換する関数。引数は値、戻り値は変換後の値。

None
datetime_to_iso bool

datetime型の値をISOフォーマットの文字列に変換するかどうか。

True

戻り値:

タイプ デスクリプション
dict[str, Any]

辞書。

ソースコード位置: pytilpack/sqlalchemy/async_.py
def to_dict(
    self,
    includes: list[str] | None = None,
    excludes: list[str] | None = None,
    exclude_none: bool = False,
    value_converter: typing.Callable[[typing.Any], typing.Any] | None = None,
    datetime_to_iso: bool = True,
) -> dict[str, typing.Any]:
    """インスタンスを辞書化する。

    Args:
        includes: 辞書化するフィールド名のリスト。excludesと同時指定不可。
        excludes: 辞書化しないフィールド名のリスト。includesと同時指定不可。
        exclude_none: Noneのフィールドを除外するかどうか。
        value_converter: 各フィールドの値を変換する関数。引数は値、戻り値は変換後の値。
        datetime_to_iso: datetime型の値をISOフォーマットの文字列に変換するかどうか。

    Returns:
        辞書。

    """
    assert (includes is None) or (excludes is None)
    mapper = sqlalchemy.inspect(self.__class__, raiseerr=True)
    assert mapper is not None
    all_columns = [
        mapper.get_property_by_column(column).key
        for column in self.__table__.columns  # type: ignore[attr-defined]
    ]
    if includes is None:
        includes = all_columns
        if excludes is None:
            pass
        else:
            assert (set(all_columns) & set(excludes)) == set(excludes)
            includes = list(filter(lambda x: x not in excludes, includes))
    else:
        assert excludes is None
        assert (set(all_columns) & set(includes)) == set(includes)

    def convert_value(value: typing.Any) -> typing.Any:
        """値を変換する関数。"""
        if datetime_to_iso and isinstance(value, datetime.datetime | datetime.date):
            return value.isoformat()
        if value_converter is not None:
            return value_converter(value)
        return value

    return {
        column_name: convert_value(getattr(self, column_name))
        for column_name in includes
        if not exclude_none or getattr(self, column_name) is not None
    }
run_with_session(func, *args, **kwargs) classmethod

非同期関数をセッション付きで同期実行する関数。

引数:

名前 タイプ デスクリプション デフォルト
func Callable[P, Awaitable[R]]

デコレート対象の非同期関数

必須
*args P.args

非同期関数への引数

()
**kwargs P.kwargs

非同期関数へのキーワード引数

{}

戻り値:

タイプ デスクリプション
R

非同期関数の戻り値

ソースコード位置: pytilpack/sqlalchemy/async_.py
@classmethod
def run_with_session[**P, R](cls, func: typing.Callable[P, typing.Awaitable[R]], *args: P.args, **kwargs: P.kwargs) -> R:
    """非同期関数をセッション付きで同期実行する関数。

    Args:
        func: デコレート対象の非同期関数
        *args: 非同期関数への引数
        **kwargs: 非同期関数へのキーワード引数

    Returns:
        非同期関数の戻り値
    """

    async def wrapper() -> R:
        async with cls.session_scope():
            return await func(*args, **kwargs)

    return pytilpack.asyncio.run(wrapper())

AsyncUniqueIDMixin

self.unique_idを持つテーブルクラスに便利メソッドを生やすmixin。

generate_unique_id() classmethod

ユニークIDを生成する。

ソースコード位置: pytilpack/sqlalchemy/async_.py
@classmethod
def generate_unique_id(cls) -> str:
    """ユニークIDを生成する。"""
    return secrets.token_urlsafe(32)
get_by_unique_id(unique_id, allow_id=False, for_update=False) async classmethod

ユニークIDを元にインスタンスを取得。

引数:

名前 タイプ デスクリプション デフォルト
unique_id str | int

ユニークID。

必須
allow_id bool

ユニークIDだけでなくID(int)も許可するかどうか。

False
for_update bool

更新ロックを取得するか否か。

False

戻り値:

タイプ デスクリプション
Self | None

インスタンス。

ソースコード位置: pytilpack/sqlalchemy/async_.py
@classmethod
async def get_by_unique_id(
    cls: type[typing.Self],
    unique_id: str | int,
    allow_id: bool = False,
    for_update: bool = False,
) -> typing.Self | None:
    """ユニークIDを元にインスタンスを取得。

    Args:
        unique_id: ユニークID。
        allow_id: ユニークIDだけでなくID(int)も許可するかどうか。
        for_update: 更新ロックを取得するか否か。

    Returns:
        インスタンス。

    """
    assert issubclass(cls, AsyncMixin)
    if allow_id and isinstance(unique_id, int):
        q = cls.select().where(cls.id == unique_id)  # type: ignore
    else:
        q = cls.select().where(cls.unique_id == unique_id)  # type: ignore
    if for_update:
        q = q.with_for_update()
    return await cls.scalar_one_or_none(q)

await_for_connection(url, timeout=180.0) async

DBに接続可能になるまで待機する。

ソースコード位置: pytilpack/sqlalchemy/async_.py
async def await_for_connection(url: str, timeout: float = 180.0) -> None:
    """DBに接続可能になるまで待機する。"""
    failed = False
    start_time = time.time()
    while True:
        try:
            engine = sqlalchemy.ext.asyncio.create_async_engine(url)
            try:
                async with engine.connect() as connection:
                    await connection.execute(sqlalchemy.text("SELECT 1"))
            finally:
                await engine.dispose()
            # 接続成功
            if failed:  # 過去に接続失敗していた場合だけログを出す
                logger.info("DB接続成功")
            break
        except Exception as e:
            # 接続失敗
            if not failed:
                failed = True
                logger.info(f"DB接続待機中 . . . (URL: {url})")
            remain_time = timeout - (time.time() - start_time)
            if remain_time <= 0:
                raise RuntimeError(f"DB接続タイムアウト (URL: {url})") from e
            await asyncio.sleep(min(1, remain_time))

asafe_close(session, log_level=logging.DEBUG) async

例外を出さずにセッションをクローズ。

ソースコード位置: pytilpack/sqlalchemy/async_.py
async def asafe_close(session: sqlalchemy.ext.asyncio.AsyncSession, log_level: int | None = logging.DEBUG):
    """例外を出さずにセッションをクローズ。"""
    try:
        await asyncio.shield(session.close())
    except Exception:
        if log_level is not None:
            logger.log(log_level, "セッションクローズ失敗", exc_info=True)

describe

SQLAlchemy用のユーティリティ集。

describe(Base, tablefmt='grid')

DBのテーブル構造を文字列化する。

ソースコード位置: pytilpack/sqlalchemy/describe.py
def describe(
    Base: type[sqlalchemy.orm.DeclarativeBase],
    tablefmt: "str | tabulate.TableFormat" = "grid",
) -> str:
    """DBのテーブル構造を文字列化する。"""
    return "\n".join(
        [describe_table(table, get_class_by_table(Base, table), tablefmt=tablefmt) for table in Base.metadata.tables.values()]
    )

get_class_by_table(base, table)

テーブルからクラスを取得する。

ソースコード位置: pytilpack/sqlalchemy/describe.py
def get_class_by_table(
    base: type[sqlalchemy.orm.DeclarativeBase], table: sqlalchemy.sql.schema.Table
) -> type[sqlalchemy.orm.DeclarativeBase]:
    """テーブルからクラスを取得する。"""
    for mapper in base.registry.mappers:
        if mapper.local_table is table:
            return typing.cast(type[sqlalchemy.orm.DeclarativeBase], mapper.class_)
    raise ValueError(f"テーブル {table.name} に対応するクラスが見つかりませんでした。")

describe_table(table, orm_class, tablefmt='grid')

テーブル構造を文字列化する。

ソースコード位置: pytilpack/sqlalchemy/describe.py
def describe_table(
    table: sqlalchemy.sql.schema.Table,
    orm_class: type[sqlalchemy.orm.DeclarativeBase],
    tablefmt: "str | tabulate.TableFormat" = "grid",
) -> str:
    """テーブル構造を文字列化する。"""
    try:
        class_field_comments = pytilpack.python.class_field_comments(orm_class)
    except Exception as e:
        logger.warning(f"クラスフィールドコメント取得失敗: {e}")
        class_field_comments = {}

    headers = ["Field", "Type", "Null", "Key", "Default", "Extra", "Comment"]
    rows = []
    for column in table.columns:
        key = ""
        if column.primary_key:
            key = "PRI"
        elif column.unique:
            key = "UNI"
        elif column.index:
            key = "MUL"

        extra = ""
        if column.autoincrement and column.primary_key:
            extra = "auto_increment"

        default_value = (
            column.default.arg
            if column.default is not None and isinstance(column.default, sqlalchemy.sql.schema.ColumnDefault)
            else column.default
        )
        default: str
        if default_value is None:
            default = "NULL"
        elif callable(default_value):
            default = "(function)"
        elif isinstance(default_value, sqlalchemy.sql.elements.CompilerElement):  # type: ignore[attr-defined]
            default = str(default_value.compile(compile_kwargs={"literal_binds": True}))
        else:
            default = str(default_value)

        # コメントは以下の優先順位で拾う。
        # doc(DBに反映されないもの) > comment(DBに反映されるもの)
        #  > class_field_comments(ソースコード上のコメント)
        comment: str = ""
        if column.doc:
            comment = column.doc
        elif column.comment:
            comment = column.comment
        elif column.name in class_field_comments:
            comment = class_field_comments[column.name] or ""

        rows.append(
            [
                column.name,
                str(column.type),
                "YES" if column.nullable else "NO",
                key,
                default,
                extra,
                comment,
            ]
        )
    table_description = tabulate.tabulate(rows, headers=headers, tablefmt=tablefmt)

    return f"Table: {table.name}\n{table_description}\n"

flask

SQLAlchemy用のユーティリティ集(Flask-SQLAlchemy版)。

Mixin

Bases: _ReprMixin

テーブルクラスに色々便利機能を生やすMixin。

get_by_id_not_null(id_, for_update=False, options=None) async classmethod

IDを元にインスタンスを取得。見つからない場合は例外を出す。

引数:

名前 タイプ デスクリプション デフォルト
id_ int

ID。

必須
for_update bool

更新ロックを取得するか否か。

False
options ExecutableOption | None

クエリオプション。eager loadingなどに使用する。

None

戻り値:

タイプ デスクリプション
Self

インスタンス。

Raises: ValueError: 見つからない場合。

ソースコード位置: pytilpack/sqlalchemy/flask.py
@classmethod
async def get_by_id_not_null(
    cls, id_: int, for_update: bool = False, options: sqlalchemy.sql.base.ExecutableOption | None = None
) -> typing.Self:
    """IDを元にインスタンスを取得。見つからない場合は例外を出す。

    Args:
        id_: ID。
        for_update: 更新ロックを取得するか否か。
        options: クエリオプション。eager loadingなどに使用する。

    Returns:
        インスタンス。
    Raises:
        ValueError: 見つからない場合。
    """
    instance = cls.get_by_id(id_, for_update=for_update, options=options)
    if instance is None:
        raise ValueError(f"{cls.__qualname__}が見つかりませんでした。id={id_}")
    return instance
get_by_id(id_, for_update=False, options=None) classmethod

IDを元にインスタンスを取得。

引数:

名前 タイプ デスクリプション デフォルト
id_ int

ID。

必須
for_update bool

更新ロックを取得するか否か。

False
options ExecutableOption | None

クエリオプション。eager loadingなどに使用。

None

戻り値:

タイプ デスクリプション
Self | None

インスタンス。

ソースコード位置: pytilpack/sqlalchemy/flask.py
@classmethod
def get_by_id(
    cls: type[typing.Self], id_: int, for_update: bool = False, options: sqlalchemy.sql.base.ExecutableOption | None = None
) -> typing.Self | None:
    """IDを元にインスタンスを取得。

    Args:
        id_: ID。
        for_update: 更新ロックを取得するか否か。
        options: クエリオプション。eager loadingなどに使用。

    Returns:
        インスタンス。

    """
    q = cls.query.filter(cls.id == id_)  # type: ignore
    if options is not None:
        q = q.options(options)
    if for_update:
        q = q.with_for_update()
    return q.one_or_none()
to_dict(includes=None, excludes=None, exclude_none=False, value_converter=None, datetime_to_iso=True)

インスタンスを辞書化する。

引数:

名前 タイプ デスクリプション デフォルト
includes list[str] | None

辞書化するフィールド名のリスト。excludesと同時指定不可。

None
excludes list[str] | None

辞書化しないフィールド名のリスト。includesと同時指定不可。

None
exclude_none bool

Noneのフィールドを除外するかどうか。

False
value_converter Callable[[Any], Any] | None

各フィールドの値を変換する関数。引数は値、戻り値は変換後の値。

None
datetime_to_iso bool

datetime型の値をISOフォーマットの文字列に変換するかどうか。

True

戻り値:

タイプ デスクリプション
dict[str, Any]

辞書。

ソースコード位置: pytilpack/sqlalchemy/flask.py
def to_dict(
    self,
    includes: list[str] | None = None,
    excludes: list[str] | None = None,
    exclude_none: bool = False,
    value_converter: typing.Callable[[typing.Any], typing.Any] | None = None,
    datetime_to_iso: bool = True,
) -> dict[str, typing.Any]:
    """インスタンスを辞書化する。

    Args:
        includes: 辞書化するフィールド名のリスト。excludesと同時指定不可。
        excludes: 辞書化しないフィールド名のリスト。includesと同時指定不可。
        exclude_none: Noneのフィールドを除外するかどうか。
        value_converter: 各フィールドの値を変換する関数。引数は値、戻り値は変換後の値。
        datetime_to_iso: datetime型の値をISOフォーマットの文字列に変換するかどうか。

    Returns:
        辞書。

    """
    assert (includes is None) or (excludes is None)
    mapper = sqlalchemy.inspect(self.__class__, raiseerr=True)
    assert mapper is not None
    all_columns = [
        mapper.get_property_by_column(column).key
        for column in self.__table__.columns  # type: ignore[attr-defined]
    ]
    if includes is None:
        includes = all_columns
        if excludes is None:
            pass
        else:
            assert (set(all_columns) & set(excludes)) == set(excludes)
            includes = list(filter(lambda x: x not in excludes, includes))
    else:
        assert excludes is None
        assert (set(all_columns) & set(includes)) == set(includes)

    def convert_value(value: typing.Any) -> typing.Any:
        """値を変換する関数。"""
        if datetime_to_iso and isinstance(value, datetime.datetime | datetime.date):
            return value.isoformat()
        if value_converter is not None:
            return value_converter(value)
        return value

    return {
        column_name: convert_value(getattr(self, column_name))
        for column_name in includes
        if not exclude_none or getattr(self, column_name) is not None
    }

UniqueIDMixin

self.unique_idを持つテーブルクラスに便利メソッドを生やすmixin。

generate_unique_id() classmethod

ユニークIDを生成する。

ソースコード位置: pytilpack/sqlalchemy/flask.py
@classmethod
def generate_unique_id(cls) -> str:
    """ユニークIDを生成する。"""
    return secrets.token_urlsafe(32)
get_by_unique_id(unique_id, allow_id=False, for_update=False) classmethod

ユニークIDを元にインスタンスを取得。

引数:

名前 タイプ デスクリプション デフォルト
unique_id str | int

ユニークID。

必須
allow_id bool

ユニークIDだけでなくID(int)も許可するかどうか。

False
for_update bool

更新ロックを取得するか否か。

False

戻り値:

タイプ デスクリプション
Self | None

インスタンス。

ソースコード位置: pytilpack/sqlalchemy/flask.py
@classmethod
def get_by_unique_id(
    cls: type[typing.Self],
    unique_id: str | int,
    allow_id: bool = False,
    for_update: bool = False,
) -> typing.Self | None:
    """ユニークIDを元にインスタンスを取得。

    Args:
        unique_id: ユニークID。
        allow_id: ユニークIDだけでなくID(int)も許可するかどうか。
        for_update: 更新ロックを取得するか否か。

    Returns:
        インスタンス。

    """
    if allow_id and isinstance(unique_id, int):
        q = cls.query.filter(cls.id == unique_id)  # type: ignore
    else:
        q = cls.query.filter(cls.unique_id == unique_id)  # type: ignore
    if for_update:
        q = q.with_for_update()
    return q.one_or_none()

register_ping()

コネクションプールの切断対策。

ソースコード位置: pytilpack/sqlalchemy/flask.py
def register_ping():
    """コネクションプールの切断対策。"""

    @sqlalchemy.event.listens_for(sqlalchemy.pool.Pool, "checkout")
    def _ping_connection(dbapi_connection, connection_record, connection_proxy):
        """コネクションプールの切断対策。"""
        _ = connection_record, connection_proxy  # noqa
        cursor = dbapi_connection.cursor()
        try:
            cursor.execute("SELECT 1")
        except Exception as e:
            raise sqlalchemy.exc.DisconnectionError() from e
        finally:
            cursor.close()

sync

SQLAlchemy用のユーティリティ集(同期版)。

SyncMixin

Bases: _ReprMixin

モデルのベースクラス。SQLAlchemy 2.0スタイル・同期前提。

例:

モデル定義例::

class Base(sqlalchemy.orm.DeclarativeBase, pytilpack.sqlalchemy_.sync_.SyncMixin):
    pass

class User(Base):
    __tablename__ = "users"
    ...

Quart例::

@app.before_request
async def _before_request() -> None:
    quart.g.db_session_token = models.Base.start_session()

@app.teardown_request
async def _teardown_request(_: BaseException | None) -> None:
    if hasattr(quart.g, "db_session_token"):
        models.Base.close_session(quart.g.db_session_token)
        del quart.g.db_session_token
engine = None class-attribute instance-attribute

DB接続。

sessionmaker = None class-attribute instance-attribute

セッションファクトリ。

session_var = contextvars.ContextVar('session_var') class-attribute instance-attribute

セッション。

init(url, pool_size=None, max_overflow=None, pool_recycle=280, pool_pre_ping=True, autoflush=True, expire_on_commit=False, **kwargs) classmethod

DB接続を初期化する。(推奨される既定設定を適用する。)

引数:

名前 タイプ デスクリプション デフォルト
url str | URL

DB接続URL。

必須
pool_size int | None

コネクションプールのサイズ。スレッド数に応じて調整要。

None
max_overflow int | None

コネクションプールの最大オーバーフロー数。Noneの場合はデフォルト値を使用。

None
pool_recycle int | None

コネクションプールのリサイクル時間。Noneの場合はデフォルト値を使用。

280
pool_pre_ping bool

コネクションプールのプレピン。Noneの場合はデフォルト値を使用。

True
autoflush bool

セッションのautoflushフラグ。デフォルトはTrue。

True
expire_on_commit bool

セッションのexpire_on_commitフラグ。デフォルトはFalse。

False
**kwargs Any

sqlalchemy.create_engineに渡す追加のキーワード引数。

{}
ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
def init(
    cls,
    url: str | sqlalchemy.engine.URL,
    pool_size: int | None = None,
    max_overflow: int | None = None,
    pool_recycle: int | None = 280,
    pool_pre_ping: bool = True,
    autoflush: bool = True,
    expire_on_commit: bool = False,
    **kwargs: typing.Any,
) -> None:
    """DB接続を初期化する。(推奨される既定設定を適用する。)

    Args:
        url: DB接続URL。
        pool_size: コネクションプールのサイズ。スレッド数に応じて調整要。
        max_overflow: コネクションプールの最大オーバーフロー数。Noneの場合はデフォルト値を使用。
        pool_recycle: コネクションプールのリサイクル時間。Noneの場合はデフォルト値を使用。
        pool_pre_ping: コネクションプールのプレピン。Noneの場合はデフォルト値を使用。
        autoflush: セッションのautoflushフラグ。デフォルトはTrue。
        expire_on_commit: セッションのexpire_on_commitフラグ。デフォルトはFalse。
        **kwargs: sqlalchemy.create_engineに渡す追加のキーワード引数。

    """
    assert cls.engine is None, "DB接続はすでに初期化されています。"

    if pool_size is not None and max_overflow is None:
        max_overflow = pool_size * 1  # デフォルトで倍まで許可
    kwargs = kwargs.copy()
    if pool_size is not None:
        kwargs["pool_size"] = pool_size
    if max_overflow is not None:
        kwargs["max_overflow"] = max_overflow
    if pool_recycle is not None:
        kwargs["pool_recycle"] = pool_recycle
    if pool_pre_ping is not None:
        kwargs["pool_pre_ping"] = pool_pre_ping

    cls.engine = sqlalchemy.create_engine(url, **kwargs)
    atexit.register(cls.engine.dispose)

    cls.sessionmaker = sqlalchemy.orm.sessionmaker(cls.engine, autoflush=autoflush, expire_on_commit=expire_on_commit)
connect() classmethod

DBに接続する。

使用例:: with Base.connect() as conn: Base.metadata.create_all(conn)

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
def connect(cls) -> sqlalchemy.Connection:
    """DBに接続する。

    使用例::
        with Base.connect() as conn:
            Base.metadata.create_all(conn)

    """
    assert cls.engine is not None
    return cls.engine.connect()
session_scope(name=None, log_level=logging.DEBUG) classmethod

セッションを開始するコンテキストマネージャ。

使用例:: with Base.session_scope() as session: ...

引数:

名前 タイプ デスクリプション デフォルト
name str | None

セッション名。指定時のみログ出力する。

None
log_level int

ログレベル。

DEBUG
ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
@contextlib.contextmanager
def session_scope(
    cls,
    name: str | None = None,
    log_level: int = logging.DEBUG,
) -> typing.Generator[sqlalchemy.orm.Session, None, None]:
    """セッションを開始するコンテキストマネージャ。

    使用例::
        with Base.session_scope() as session:
            ...

    Args:
        name: セッション名。指定時のみログ出力する。
        log_level: ログレベル。

    """
    assert cls.sessionmaker is not None
    token = cls.start_session(name=name, log_level=log_level)
    try:
        yield cls.session()
    finally:
        cls.close_session(token, name=name, log_level=log_level)
asession_scope(name=None, log_level=logging.DEBUG) async classmethod

セッションを開始するコンテキストマネージャ。

使用例:: async with Base.asession_scope() as session: ...

引数:

名前 タイプ デスクリプション デフォルト
name str | None

セッション名。指定時のみログ出力する。

None
log_level int

ログレベル。

DEBUG
ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
@contextlib.asynccontextmanager
async def asession_scope(
    cls,
    name: str | None = None,
    log_level: int = logging.DEBUG,
) -> typing.AsyncGenerator[sqlalchemy.orm.Session, None]:
    """セッションを開始するコンテキストマネージャ。

    使用例::
        async with Base.asession_scope() as session:
            ...

    Args:
        name: セッション名。指定時のみログ出力する。
        log_level: ログレベル。

    """
    assert cls.sessionmaker is not None
    token = cls.start_session(name=name, log_level=log_level)
    try:
        yield cls.session()
    finally:
        cls.close_session(token, name=name, log_level=log_level)
start_session(name=None, log_level=logging.DEBUG) classmethod

セッションを開始する。

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
def start_session(
    cls, name: str | None = None, log_level: int = logging.DEBUG
) -> contextvars.Token[sqlalchemy.orm.Session]:
    """セッションを開始する。"""
    assert cls.sessionmaker is not None
    session = cls.sessionmaker()  # pylint: disable=not-callable
    token = cls.session_var.set(session)
    if name is not None:
        logger.log(
            log_level,
            f"セッション開始: {name} session={id(session):x},"
            f" thread={threading.get_ident():x},"
            f" task={pytilpack.asyncio.get_task_id_hex()}",
        )
    return token
close_session(token, name=None, log_level=logging.DEBUG) classmethod

セッションを終了する。

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
def close_session(
    cls, token: contextvars.Token[sqlalchemy.orm.Session], name: str | None = None, log_level: int = logging.DEBUG
) -> None:
    """セッションを終了する。"""
    session = cls.session()
    if name is not None:
        logger.log(
            log_level,
            f"セッション終了: {name} session={id(session):x},"
            f" thread={threading.get_ident():x},"
            f" task={pytilpack.asyncio.get_task_id_hex()}",
        )
    safe_close(session)
    cls.session_var.reset(token)
session() classmethod

セッションを取得する。

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
def session(cls) -> sqlalchemy.orm.Session:
    """セッションを取得する。"""
    sess = cls.session_var.get(None)
    if sess is None:
        raise RuntimeError(f"セッションが開始されていません。{cls.__qualname__}.start_session()を呼び出してください。")
    return sess
select() classmethod

sqlalchemy.Selectを返す。

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
def select(cls) -> sqlalchemy.Select[tuple[typing.Self]]:
    """sqlalchemy.Selectを返す。"""
    # cls.count()などでfrom句が消えないように明示的にfrom句を指定して返す。
    return sqlalchemy.select(cls).select_from(cls)
insert() classmethod

sqlalchemy.Insertを返す。

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
def insert(cls) -> sqlalchemy.Insert:
    """sqlalchemy.Insertを返す。"""
    return sqlalchemy.insert(cls)
update() classmethod

sqlalchemy.Updateを返す。

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
def update(cls) -> sqlalchemy.Update:
    """sqlalchemy.Updateを返す。"""
    return sqlalchemy.update(cls)
delete() classmethod

sqlalchemy.Deleteを返す。

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
def delete(cls) -> sqlalchemy.Delete:
    """sqlalchemy.Deleteを返す。"""
    return sqlalchemy.delete(cls)
count(query) classmethod

queryのレコード数を取得する。

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
def count(cls, query: sqlalchemy.Select | sqlalchemy.CompoundSelect) -> int:
    """queryのレコード数を取得する。"""
    # pylint: disable=not-callable
    return (
        cls.scalar_one_or_none(sqlalchemy.select(sqlalchemy.func.count()).select_from(query.order_by(None).subquery())) or 0
    )
scalar_one(query) classmethod

queryの結果を1件取得する。

引数:

名前 タイプ デスクリプション デフォルト
query Select[tuple[T]] | CompoundSelect[tuple[T]]

クエリ。

必須

戻り値:

タイプ デスクリプション
T

1件のインスタンス。

発生:

タイプ デスクリプション
NoResultFound

結果が0件の場合。

MultipleResultsFound

結果が複数件の場合。

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
def scalar_one[T](cls, query: sqlalchemy.Select[tuple[T]] | sqlalchemy.CompoundSelect[tuple[T]]) -> T:
    """queryの結果を1件取得する。

    Args:
        query: クエリ。

    Returns:
        1件のインスタンス。

    Raises:
        sqlalchemy.exc.NoResultFound: 結果が0件の場合。
        sqlalchemy.exc.MultipleResultsFound: 結果が複数件の場合。

    """
    return cls.session().execute(query).scalar_one()
scalar_one_or_none(query) classmethod

queryの結果を0件または1件取得する。

引数:

名前 タイプ デスクリプション デフォルト
query Select[tuple[T]] | CompoundSelect[tuple[T]]

クエリ。

必須

戻り値:

タイプ デスクリプション
T | None

0件の場合はNone、1件の場合はインスタンス。

発生:

タイプ デスクリプション
MultipleResultsFound

結果が複数件の場合。

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
def scalar_one_or_none[T](cls, query: sqlalchemy.Select[tuple[T]] | sqlalchemy.CompoundSelect[tuple[T]]) -> T | None:
    """queryの結果を0件または1件取得する。

    Args:
        query: クエリ。

    Returns:
        0件の場合はNone、1件の場合はインスタンス。

    Raises:
        sqlalchemy.exc.MultipleResultsFound: 結果が複数件の場合。

    """
    return cls.session().execute(query).scalar_one_or_none()
scalars(query) classmethod

queryの結果を全件取得する。

引数:

名前 タイプ デスクリプション デフォルト
query Select[tuple[T]] | CompoundSelect[tuple[T]]

クエリ。

必須

戻り値:

タイプ デスクリプション
list[T]

全件のインスタンスのリスト。

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
def scalars[T](cls, query: sqlalchemy.Select[tuple[T]] | sqlalchemy.CompoundSelect[tuple[T]]) -> list[T]:
    """queryの結果を全件取得する。

    Args:
        query: クエリ。

    Returns:
        全件のインスタンスのリスト。

    """
    return list(cls.session().execute(query).scalars().all())
one(query) classmethod

queryの結果を1件取得する。

引数:

名前 タイプ デスクリプション デフォルト
query Select[TT] | CompoundSelect[TT]

クエリ。

必須

戻り値:

タイプ デスクリプション
Row[TT]

1件のインスタンス。

発生:

タイプ デスクリプション
NoResultFound

結果が0件の場合。

MultipleResultsFound

結果が複数件の場合。

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
def one[TT: tuple](cls, query: sqlalchemy.Select[TT] | sqlalchemy.CompoundSelect[TT]) -> sqlalchemy.Row[TT]:
    """queryの結果を1件取得する。

    Args:
        query: クエリ。

    Returns:
        1件のインスタンス。

    Raises:
        sqlalchemy.exc.NoResultFound: 結果が0件の場合。
        sqlalchemy.exc.MultipleResultsFound: 結果が複数件の場合。

    """
    return cls.session().execute(query).one()
one_or_none(query) classmethod

queryの結果を0件または1件取得する。

引数:

名前 タイプ デスクリプション デフォルト
query Select[TT] | CompoundSelect[TT]

クエリ。

必須

戻り値:

タイプ デスクリプション
Row[TT] | None

0件の場合はNone、1件の場合はインスタンス。

発生:

タイプ デスクリプション
MultipleResultsFound

結果が複数件の場合。

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
def one_or_none[TT: tuple](cls, query: sqlalchemy.Select[TT] | sqlalchemy.CompoundSelect[TT]) -> sqlalchemy.Row[TT] | None:
    """queryの結果を0件または1件取得する。

    Args:
        query: クエリ。

    Returns:
        0件の場合はNone、1件の場合はインスタンス。

    Raises:
        sqlalchemy.exc.MultipleResultsFound: 結果が複数件の場合。

    """
    return cls.session().execute(query).one_or_none()
all(query) classmethod

queryの結果を全件取得する。

引数:

名前 タイプ デスクリプション デフォルト
query Select[TT] | CompoundSelect[TT]

クエリ。

必須

戻り値:

タイプ デスクリプション
list[Row[TT]]

全件のインスタンスのリスト。

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
def all[TT: tuple](cls, query: sqlalchemy.Select[TT] | sqlalchemy.CompoundSelect[TT]) -> list[sqlalchemy.Row[TT]]:
    """queryの結果を全件取得する。

    Args:
        query: クエリ。

    Returns:
        全件のインスタンスのリスト。

    """
    return list(cls.session().execute(query).all())
get_by_id_not_null(id_, for_update=False, options=None) async classmethod

IDを元にインスタンスを取得。見つからない場合は例外を出す。

引数:

名前 タイプ デスクリプション デフォルト
id_ int

ID。

必須
for_update bool

更新ロックを取得するか否か。

False
options ExecutableOption | None

クエリオプション。eager loadingなどに使用する。

None

戻り値:

タイプ デスクリプション
Self

インスタンス。

Raises: ValueError: 見つからない場合。

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
async def get_by_id_not_null(
    cls, id_: int, for_update: bool = False, options: sqlalchemy.sql.base.ExecutableOption | None = None
) -> typing.Self:
    """IDを元にインスタンスを取得。見つからない場合は例外を出す。

    Args:
        id_: ID。
        for_update: 更新ロックを取得するか否か。
        options: クエリオプション。eager loadingなどに使用する。

    Returns:
        インスタンス。
    Raises:
        ValueError: 見つからない場合。
    """
    instance = cls.get_by_id(id_, for_update=for_update, options=options)
    if instance is None:
        raise ValueError(f"{cls.__qualname__}が見つかりませんでした。id={id_}")
    return instance
get_by_id(id_, for_update=False, options=None) classmethod

IDを元にインスタンスを取得。

引数:

名前 タイプ デスクリプション デフォルト
id_ int

ID。

必須
for_update bool

更新ロックを取得するか否か。

False
options ExecutableOption | None

クエリオプション。eager loadingなどに使用。

None

戻り値:

タイプ デスクリプション
Self | None

インスタンス。

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
def get_by_id(
    cls, id_: int, for_update: bool = False, options: sqlalchemy.sql.base.ExecutableOption | None = None
) -> typing.Self | None:
    """IDを元にインスタンスを取得。

    Args:
        id_: ID。
        for_update: 更新ロックを取得するか否か。
        options: クエリオプション。eager loadingなどに使用。

    Returns:
        インスタンス。

    """
    q = cls.select().where(cls.id == id_)  # type: ignore  # pylint: disable=no-member
    if options is not None:
        q = q.options(options)
    if for_update:
        q = q.with_for_update()
    return cls.scalar_one_or_none(q)
paginate(query, page, per_page, scalar=True) classmethod

Flask-SQLAlchemy風ページネーション。

引数:

名前 タイプ デスクリプション デフォルト
query Select | CompoundSelect

ページネーションするクエリ。

必須
page int

ページ番号。

必須
per_page int

1ページあたりのアイテム数。

必須
scalar bool

Trueの場合、スカラー値を返す。Falseの場合、全件のインスタンスを返す。

True

戻り値:

タイプ デスクリプション
Paginator

ページネーションされた結果を返すpytilpack.paginator.Paginatorインスタンス。

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
def paginate(
    cls,
    query: sqlalchemy.Select | sqlalchemy.CompoundSelect,
    page: int,
    per_page: int,
    scalar: bool = True,
) -> pytilpack.paginator.Paginator:
    """Flask-SQLAlchemy風ページネーション。

    Args:
        query: ページネーションするクエリ。
        page: ページ番号。
        per_page: 1ページあたりのアイテム数。
        scalar: Trueの場合、スカラー値を返す。Falseの場合、全件のインスタンスを返す。

    Returns:
        ページネーションされた結果を返すpytilpack.paginator.Paginatorインスタンス。
    """
    assert page > 0, "ページ番号は1以上でなければなりません。"
    assert per_page > 0, "1ページあたりのアイテム数は1以上でなければなりません。"
    total = cls.count(query)
    page_query = query.offset((page - 1) * per_page).limit(per_page)
    items = cls.scalars(page_query) if scalar else cls.all(page_query)
    # pylint: disable=protected-access
    return pytilpack.paginator.Paginator(page=page, per_page=per_page, items=items, total=total)
commit() classmethod

セッションをコミットする。

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
def commit(cls) -> None:
    """セッションをコミットする。"""
    cls.session().commit()
acount(query) async classmethod

queryのレコード数を取得する。(非同期版)

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
async def acount(cls, query: sqlalchemy.Select | sqlalchemy.CompoundSelect) -> int:
    """queryのレコード数を取得する。(非同期版)"""
    return await run_sync_with_session(cls.count.__func__)(cls, query)  # type: ignore[attr-defined]
ascalar_one(query) async classmethod

queryの結果を1件取得する。(非同期版)

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
async def ascalar_one[T](cls, query: sqlalchemy.Select[tuple[T]] | sqlalchemy.CompoundSelect[tuple[T]]) -> T:
    """queryの結果を1件取得する。(非同期版)"""
    return await run_sync_with_session(cls.scalar_one.__func__)(cls, query)  # type: ignore[attr-defined]
ascalar_one_or_none(query) async classmethod

queryの結果を0件または1件取得する。(非同期版)

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
async def ascalar_one_or_none[T](cls, query: sqlalchemy.Select[tuple[T]] | sqlalchemy.CompoundSelect[tuple[T]]) -> T | None:
    """queryの結果を0件または1件取得する。(非同期版)"""
    return await run_sync_with_session(cls.scalar_one_or_none.__func__)(cls, query)  # type: ignore[attr-defined]
ascalars(query) async classmethod

queryの結果を全件取得する。(非同期版)

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
async def ascalars[T](cls, query: sqlalchemy.Select[tuple[T]] | sqlalchemy.CompoundSelect[tuple[T]]) -> list[T]:
    """queryの結果を全件取得する。(非同期版)"""
    return await run_sync_with_session(cls.scalars.__func__)(cls, query)  # type: ignore[attr-defined]
aone(query) async classmethod

queryの結果を1件取得する。(非同期版)

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
async def aone[TT: tuple](cls, query: sqlalchemy.Select[TT] | sqlalchemy.CompoundSelect[TT]) -> sqlalchemy.Row[TT]:
    """queryの結果を1件取得する。(非同期版)"""
    return await run_sync_with_session(cls.one.__func__)(cls, query)  # type: ignore[attr-defined]
aone_or_none(query) async classmethod

queryの結果を0件または1件取得する。(非同期版)

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
async def aone_or_none[TT: tuple](
    cls, query: sqlalchemy.Select[TT] | sqlalchemy.CompoundSelect[TT]
) -> sqlalchemy.Row[TT] | None:
    """queryの結果を0件または1件取得する。(非同期版)"""
    return await run_sync_with_session(cls.one_or_none.__func__)(cls, query)  # type: ignore[attr-defined]
aall(query) async classmethod

queryの結果を全件取得する。(非同期版)

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
async def aall[TT: tuple](cls, query: sqlalchemy.Select[TT] | sqlalchemy.CompoundSelect[TT]) -> list[sqlalchemy.Row[TT]]:
    """queryの結果を全件取得する。(非同期版)"""
    return await run_sync_with_session(cls.all.__func__)(cls, query)  # type: ignore[attr-defined]
aget_by_id(id_, for_update=False) async classmethod

IDを元にインスタンスを取得。(非同期版)

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
async def aget_by_id(cls, id_: int, for_update: bool = False) -> typing.Self | None:
    """IDを元にインスタンスを取得。(非同期版)"""
    return await run_sync_with_session(cls.get_by_id.__func__)(cls, id_, for_update)  # type: ignore[attr-defined]
apaginate(query, page, per_page, scalar=True) async classmethod

Flask-SQLAlchemy風ページネーション。(非同期版)

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
async def apaginate(
    cls,
    query: sqlalchemy.Select | sqlalchemy.CompoundSelect,
    page: int,
    per_page: int,
    scalar: bool = True,
) -> pytilpack.paginator.Paginator:
    """Flask-SQLAlchemy風ページネーション。(非同期版)"""
    return await run_sync_with_session(cls.paginate.__func__)(cls, query, page, per_page, scalar)  # type: ignore[attr-defined]
to_dict(includes=None, excludes=None, exclude_none=False, value_converter=None, datetime_to_iso=True)

インスタンスを辞書化する。

引数:

名前 タイプ デスクリプション デフォルト
includes list[str] | None

辞書化するフィールド名のリスト。excludesと同時指定不可。

None
excludes list[str] | None

辞書化しないフィールド名のリスト。includesと同時指定不可。

None
exclude_none bool

Noneのフィールドを除外するかどうか。

False
value_converter Callable[[Any], Any] | None

各フィールドの値を変換する関数。引数は値、戻り値は変換後の値。

None
datetime_to_iso bool

datetime型の値をISOフォーマットの文字列に変換するかどうか。

True

戻り値:

タイプ デスクリプション
dict[str, Any]

辞書。

ソースコード位置: pytilpack/sqlalchemy/sync.py
def to_dict(
    self,
    includes: list[str] | None = None,
    excludes: list[str] | None = None,
    exclude_none: bool = False,
    value_converter: typing.Callable[[typing.Any], typing.Any] | None = None,
    datetime_to_iso: bool = True,
) -> dict[str, typing.Any]:
    """インスタンスを辞書化する。

    Args:
        includes: 辞書化するフィールド名のリスト。excludesと同時指定不可。
        excludes: 辞書化しないフィールド名のリスト。includesと同時指定不可。
        exclude_none: Noneのフィールドを除外するかどうか。
        value_converter: 各フィールドの値を変換する関数。引数は値、戻り値は変換後の値。
        datetime_to_iso: datetime型の値をISOフォーマットの文字列に変換するかどうか。

    Returns:
        辞書。

    """
    assert (includes is None) or (excludes is None)
    mapper = sqlalchemy.inspect(self.__class__, raiseerr=True)
    assert mapper is not None
    all_columns = [
        mapper.get_property_by_column(column).key
        for column in self.__table__.columns  # type: ignore[attr-defined]
    ]
    if includes is None:
        includes = all_columns
        if excludes is None:
            pass
        else:
            assert (set(all_columns) & set(excludes)) == set(excludes)
            includes = list(filter(lambda x: x not in excludes, includes))
    else:
        assert excludes is None
        assert (set(all_columns) & set(includes)) == set(includes)

    def convert_value(value: typing.Any) -> typing.Any:
        """値を変換する関数。"""
        if datetime_to_iso and isinstance(value, datetime.datetime | datetime.date):
            return value.isoformat()
        if value_converter is not None:
            return value_converter(value)
        return value

    return {
        column_name: convert_value(getattr(self, column_name))
        for column_name in includes
        if not exclude_none or getattr(self, column_name) is not None
    }

SyncUniqueIDMixin

self.unique_idを持つテーブルクラスに便利メソッドを生やすmixin。

generate_unique_id() classmethod

ユニークIDを生成する。

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
def generate_unique_id(cls) -> str:
    """ユニークIDを生成する。"""
    return secrets.token_urlsafe(32)
get_by_unique_id(unique_id, allow_id=False, for_update=False) classmethod

ユニークIDを元にインスタンスを取得。

引数:

名前 タイプ デスクリプション デフォルト
unique_id str | int

ユニークID。

必須
allow_id bool

ユニークIDだけでなくID(int)も許可するかどうか。

False
for_update bool

更新ロックを取得するか否か。

False

戻り値:

タイプ デスクリプション
Self | None

インスタンス。

ソースコード位置: pytilpack/sqlalchemy/sync.py
@classmethod
def get_by_unique_id(
    cls: type[typing.Self],
    unique_id: str | int,
    allow_id: bool = False,
    for_update: bool = False,
) -> typing.Self | None:
    """ユニークIDを元にインスタンスを取得。

    Args:
        unique_id: ユニークID。
        allow_id: ユニークIDだけでなくID(int)も許可するかどうか。
        for_update: 更新ロックを取得するか否か。

    Returns:
        インスタンス。

    """
    assert issubclass(cls, SyncMixin)
    if allow_id and isinstance(unique_id, int):
        q = cls.select().where(cls.id == unique_id)  # type: ignore
    else:
        q = cls.select().where(cls.unique_id == unique_id)  # type: ignore
    if for_update:
        q = q.with_for_update()
    return cls.scalar_one_or_none(q)

wait_for_connection(url, timeout=180.0)

DBに接続可能になるまで待機する。

ソースコード位置: pytilpack/sqlalchemy/sync.py
def wait_for_connection(url: str, timeout: float = 180.0) -> None:
    """DBに接続可能になるまで待機する。"""
    failed = False
    start_time = time.time()
    while True:
        try:
            engine = sqlalchemy.create_engine(url)
            try:
                with engine.connect() as connection:
                    result = connection.execute(sqlalchemy.text("SELECT 1"))
                    result.close()
            finally:
                engine.dispose()
            # 接続成功
            if failed:  # 過去に接続失敗していた場合だけログを出す
                logger.info("DB接続成功")
            break
        except Exception as e:
            # 接続失敗
            if not failed:
                failed = True
                logger.info(f"DB接続待機中 . . . (URL: {url})")
            remain_time = timeout - (time.time() - start_time)
            if remain_time <= 0:
                raise RuntimeError(f"DB接続タイムアウト (URL: {url})") from e
            time.sleep(min(1, remain_time))

safe_close(session, log_level=logging.DEBUG)

例外を出さずにセッションをクローズ。

ソースコード位置: pytilpack/sqlalchemy/sync.py
def safe_close(
    session: sqlalchemy.orm.Session | sqlalchemy.orm.scoped_session,
    log_level: int | None = logging.DEBUG,
):
    """例外を出さずにセッションをクローズ。"""
    try:
        session.close()
    except Exception:
        if log_level is not None:
            logger.log(log_level, "セッションクローズ失敗", exc_info=True)

run_sync_with_session(func)

同期関数を非同期に実行し、スレッド内でセッションを管理するデコレーター。

別スレッドで実行される関数内で自動的にセッションスコープを作成する。 各呼び出しは独立したセッション・トランザクションを持つ。

引数:

名前 タイプ デスクリプション デフォルト
func Callable[Concatenate[type[SyncMixin], P], R]

デコレート対象の同期関数

必須

戻り値:

タイプ デスクリプション
Callable[Concatenate[type[SyncMixin], P], Awaitable[R]]

非同期版の関数

ソースコード位置: pytilpack/sqlalchemy/sync.py
def run_sync_with_session[**P, R](
    func: "typing.Callable[typing.Concatenate[type[SyncMixin], P], R]",
) -> "typing.Callable[typing.Concatenate[type[SyncMixin], P], typing.Awaitable[R]]":
    """同期関数を非同期に実行し、スレッド内でセッションを管理するデコレーター。

    別スレッドで実行される関数内で自動的にセッションスコープを作成する。
    各呼び出しは独立したセッション・トランザクションを持つ。

    Args:
        func: デコレート対象の同期関数

    Returns:
        非同期版の関数
    """

    @functools.wraps(func)
    async def wrapper(cls: type[SyncMixin], *args: P.args, **kwargs: P.kwargs) -> R:
        def _impl() -> R:
            with cls.session_scope() as session:
                result = func(cls, *args, **kwargs)
                session.commit()
                return result

        return await asyncio.to_thread(_impl)

    return wrapper