コンテンツにスキップ

pytilpack.asyncio

pytilpack.asyncio

非同期I/O関連。

Job()

非同期ジョブ。

ソースコード位置: pytilpack/asyncio/jobrunner.py
def __init__(self) -> None:
    self.status: JobStatus = "waiting"

run() abstractmethod async

ジョブの処理。内部でブロッキング処理がある場合は適宜 asyncio.to_thread 等を利用する。

ソースコード位置: pytilpack/asyncio/jobrunner.py
@abc.abstractmethod
async def run(self) -> None:
    """ジョブの処理。内部でブロッキング処理がある場合は適宜 asyncio.to_thread 等を利用する。"""

on_finished() async

ジョブが完了した場合に呼ばれる処理。必要に応じてサブクラスで追加の処理を行う。

ソースコード位置: pytilpack/asyncio/jobrunner.py
async def on_finished(self) -> None:
    """ジョブが完了した場合に呼ばれる処理。必要に応じてサブクラスで追加の処理を行う。"""
    self.status = "finished"

on_canceled() async

ジョブが完了する前にキャンセルされた場合に呼ばれる処理。必要に応じてサブクラスで追加の処理を行う。

ソースコード位置: pytilpack/asyncio/jobrunner.py
async def on_canceled(self) -> None:
    """ジョブが完了する前にキャンセルされた場合に呼ばれる処理。必要に応じてサブクラスで追加の処理を行う。"""
    self.status = "canceled"

on_errored() async

ジョブがエラー終了した場合に呼ばれる処理。必要に応じてサブクラスで追加の処理を行う。

ソースコード位置: pytilpack/asyncio/jobrunner.py
async def on_errored(self) -> None:
    """ジョブがエラー終了した場合に呼ばれる処理。必要に応じてサブクラスで追加の処理を行う。"""
    self.status = "errored"

on_finally() async

ジョブの終了時に必ず呼ばれる処理。必要に応じてサブクラスで追加の処理を行う。

ソースコード位置: pytilpack/asyncio/jobrunner.py
async def on_finally(self) -> None:
    """ジョブの終了時に必ず呼ばれる処理。必要に応じてサブクラスで追加の処理を行う。"""
    del self  # noqa

JobRunner(max_job_concurrency=8, poll_interval=1.0)

非同期ジョブを最大 max_job_concurrency 並列で実行するクラス。

引数:

名前 タイプ デスクリプション デフォルト
max_job_concurrency int

ジョブの最大同時実行数

8
poll_interval float

ジョブ取得のポーリング間隔(秒)

1.0
ソースコード位置: pytilpack/asyncio/jobrunner.py
def __init__(self, max_job_concurrency: int = 8, poll_interval: float = 1.0) -> None:
    self.poll_interval = poll_interval
    self.max_job_concurrency = max_job_concurrency
    self.running = True
    self.semaphore = asyncio.Semaphore(max_job_concurrency)
    self.tasks: set[asyncio.Task] = set()  # 実行中ジョブのタスクを管理

run() async

poll()でジョブを取得し、並列実行上限内でジョブを実行する。

ソースコード位置: pytilpack/asyncio/jobrunner.py
async def run(self) -> None:
    """poll()でジョブを取得し、並列実行上限内でジョブを実行する。"""
    while self.running:
        # セマフォを取得して実行可能なジョブがあるか確認
        await self.semaphore.acquire()
        # 再度self.runningをチェック (graceful_shutdown()対策)
        if not self.running:
            self.semaphore.release()
            break
        job = await self._poll()
        if job is None:
            # ジョブがなければセマフォを解放して一定時間待機
            self.semaphore.release()
            await asyncio.sleep(self.poll_interval)
        else:
            # ジョブがあれば実行
            task = asyncio.create_task(self._run_job(job))
            task.add_done_callback(self.tasks.discard)
            self.tasks.add(task)

shutdown()

停止処理。

ソースコード位置: pytilpack/asyncio/jobrunner.py
def shutdown(self) -> None:
    """停止処理。"""
    self.running = False
    # 現在実行中のタスクにキャンセルを通知
    for task in list(self.tasks):
        task.cancel()

graceful_shutdown() async

新規ジョブ取得を停止し、実行中のジョブ完了を待ってから戻る

ソースコード位置: pytilpack/asyncio/jobrunner.py
async def graceful_shutdown(self) -> None:
    """新規ジョブ取得を停止し、実行中のジョブ完了を待ってから戻る"""
    self.running = False
    await asyncio.sleep(0)
    if len(self.tasks) > 0:
        await asyncio.gather(*list(self.tasks), return_exceptions=True)

poll() abstractmethod async

次のジョブを返す。ジョブがなければ None を返す。

ソースコード位置: pytilpack/asyncio/jobrunner.py
@abc.abstractmethod
async def poll(self) -> Job | None:
    """次のジョブを返す。ジョブがなければ None を返す。"""

read_json(path, encoding='utf-8', errors='replace', strict=False, **kwargs) async

JSONファイルから非同期で読み取る。

ソースコード位置: pytilpack/asyncio/io_.py
async def read_json(
    path: pytilpack.io.PathOrIO,
    encoding: str = "utf-8",
    errors: str = "replace",
    strict: bool = False,
    **kwargs,
) -> typing.Any:
    """JSONファイルから非同期で読み取る。"""
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(
        None,
        functools.partial(
            pytilpack.json.load,
            path,
            encoding,
            errors,
            strict,
            **kwargs,
        ),
    )

write_json(path, data, ensure_ascii=False, indent=None, separators=None, sort_keys=False, default=pytilpack.json.converter, encoding='utf-8', **kwargs) async

JSONファイルに非同期で書き込む。

ソースコード位置: pytilpack/asyncio/io_.py
async def write_json(
    path: pytilpack.io.PathOrIO,
    data: typing.Any,
    ensure_ascii: bool = False,
    indent: int | str | None = None,
    separators: tuple[str, str] | None = None,
    sort_keys: bool = False,
    default: typing.Callable[[typing.Any], typing.Any] = pytilpack.json.converter,
    encoding: str = "utf-8",
    **kwargs,
) -> None:
    """JSONファイルに非同期で書き込む。"""
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(
        None,
        functools.partial(
            pytilpack.json.save,
            path,
            data,
            ensure_ascii,
            indent,
            separators,
            sort_keys,
            default,
            encoding,
            **kwargs,
        ),
    )

read_jsonc(path, encoding='utf-8', errors='replace', strict=False, **kwargs) async

JSONCファイルから非同期で読み取る。

ソースコード位置: pytilpack/asyncio/io_.py
async def read_jsonc(
    path: pytilpack.io.PathOrIO,
    encoding: str = "utf-8",
    errors: str = "replace",
    strict: bool = False,
    **kwargs,
) -> typing.Any:
    """JSONCファイルから非同期で読み取る。"""
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(
        None,
        functools.partial(
            pytilpack.jsonc.load,
            path,
            encoding,
            errors,
            strict,
            **kwargs,
        ),
    )

read_yaml(path, encoding='utf-8', errors='replace', strict=False, Loader=None) async

YAMLファイルから非同期で読み取る。

ソースコード位置: pytilpack/asyncio/io_.py
async def read_yaml(
    path: pytilpack.io.PathOrIO,
    encoding: str = "utf-8",
    errors: str = "replace",
    strict: bool = False,
    Loader: typing.Any = None,
) -> typing.Any:
    """YAMLファイルから非同期で読み取る。"""
    import yaml  # pylint: disable=import-outside-toplevel

    import pytilpack.yaml  # pylint: disable=import-outside-toplevel,redefined-outer-name

    if Loader is None:
        Loader = yaml.SafeLoader
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(
        None,
        functools.partial(
            pytilpack.yaml.load,
            path,
            encoding,
            errors,
            strict,
            Loader,
        ),
    )

read_yaml_all(path, encoding='utf-8', errors='replace', strict=False, Loader=None) async

YAMLファイルから非同期で読み取る。

ソースコード位置: pytilpack/asyncio/io_.py
async def read_yaml_all(
    path: pytilpack.io.PathOrIO,
    encoding: str = "utf-8",
    errors: str = "replace",
    strict: bool = False,
    Loader: typing.Any = None,
) -> list[typing.Any]:
    """YAMLファイルから非同期で読み取る。"""
    import yaml  # pylint: disable=import-outside-toplevel

    import pytilpack.yaml  # pylint: disable=import-outside-toplevel,redefined-outer-name

    if Loader is None:
        Loader = yaml.SafeLoader
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(
        None,
        functools.partial(
            pytilpack.yaml.load_all,
            path,
            encoding,
            errors,
            strict,
            Loader,
        ),
    )

write_yaml(path, data, allow_unicode=True, width=99, default_style=None, default_flow_style=False, sort_keys=False, Dumper=None, encoding='utf-8', **kwargs) async

YAMLファイルに非同期で書き込む。

ソースコード位置: pytilpack/asyncio/io_.py
async def write_yaml(
    path: pytilpack.io.PathOrIO,
    data: typing.Any,
    allow_unicode: bool | None = True,
    width: int = 99,
    default_style: str | None = None,
    default_flow_style: bool | None = False,
    sort_keys: bool = False,
    Dumper: typing.Any = None,
    encoding: str = "utf-8",
    **kwargs,
) -> None:
    """YAMLファイルに非同期で書き込む。"""
    import pytilpack.yaml  # pylint: disable=import-outside-toplevel,redefined-outer-name

    if Dumper is None:
        Dumper = pytilpack.yaml.CustomDumper
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(
        None,
        functools.partial(
            pytilpack.yaml.save,
            path,
            data,
            allow_unicode,
            width,
            default_style,
            default_flow_style,
            sort_keys,
            Dumper,
            encoding,
            **kwargs,
        ),
    )

write_yaml_all(path, data, allow_unicode=True, width=99, default_style=None, default_flow_style=False, sort_keys=False, Dumper=None, encoding='utf-8', **kwargs) async

YAMLファイルに非同期で書き込む。

ソースコード位置: pytilpack/asyncio/io_.py
async def write_yaml_all(
    path: pytilpack.io.PathOrIO,
    data: list[typing.Any],
    allow_unicode: bool | None = True,
    width: int = 99,
    default_style: str | None = None,
    default_flow_style: bool | None = False,
    sort_keys: bool = False,
    Dumper: typing.Any = None,
    encoding: str = "utf-8",
    **kwargs,
) -> None:
    """YAMLファイルに非同期で書き込む。"""
    import pytilpack.yaml  # pylint: disable=import-outside-toplevel,redefined-outer-name

    if Dumper is None:
        Dumper = pytilpack.yaml.CustomDumper
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(
        None,
        functools.partial(
            pytilpack.yaml.save_all,
            path,
            data,
            allow_unicode,
            width,
            default_style,
            default_flow_style,
            sort_keys,
            Dumper,
            encoding,
            **kwargs,
        ),
    )

read_text(path, encoding='utf-8', errors='strict') async

ファイルからテキストを非同期で読み取る。

ソースコード位置: pytilpack/asyncio/io_.py
async def read_text(path: pathlib.Path | str, encoding: str = "utf-8", errors: str = "strict") -> str:
    """ファイルからテキストを非同期で読み取る。"""
    path = pathlib.Path(path)
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(
        None,
        functools.partial(
            path.read_text,
            encoding,
            errors,
        ),
    )

write_text(path, data, encoding='utf-8', errors='strict') async

ファイルにテキストを非同期で書き込む。

ソースコード位置: pytilpack/asyncio/io_.py
async def write_text(path: pathlib.Path | str, data: str, encoding: str = "utf-8", errors: str = "strict") -> None:
    """ファイルにテキストを非同期で書き込む。"""
    path = pathlib.Path(path)
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(
        None,
        functools.partial(
            path.write_text,
            data,
            encoding,
            errors,
        ),
    )

append_text(path, data, encoding='utf-8', errors='strict') async

ファイルにテキストを非同期で追記する。

ソースコード位置: pytilpack/asyncio/io_.py
async def append_text(path: pathlib.Path | str, data: str, encoding: str = "utf-8", errors: str = "strict") -> None:
    """ファイルにテキストを非同期で追記する。"""
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(
        None,
        functools.partial(
            pytilpack.pathlib.append_text,
            path,
            data,
            encoding,
            errors,
        ),
    )

read_bytes(path) async

ファイルからバイトを非同期で読み取る。

ソースコード位置: pytilpack/asyncio/io_.py
async def read_bytes(path: pathlib.Path | str) -> bytes:
    """ファイルからバイトを非同期で読み取る。"""
    path = pathlib.Path(path)
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(
        None,
        functools.partial(path.read_bytes),
    )

write_bytes(path, data) async

ファイルにバイトを非同期で書き込む。

ソースコード位置: pytilpack/asyncio/io_.py
async def write_bytes(path: pathlib.Path | str, data: bytes) -> None:
    """ファイルにバイトを非同期で書き込む。"""
    path = pathlib.Path(path)
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(
        None,
        functools.partial(
            path.write_bytes,
            data,
        ),
    )

append_bytes(path, data) async

ファイルにバイトを非同期で追記する。

ソースコード位置: pytilpack/asyncio/io_.py
async def append_bytes(path: pathlib.Path | str, data: bytes) -> None:
    """ファイルにバイトを非同期で追記する。"""
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(
        None,
        functools.partial(
            pytilpack.pathlib.append_bytes,
            path,
            data,
        ),
    )

copy2(src, dst) async

ファイルを非同期でメタデータごとコピーする。

ソースコード位置: pytilpack/asyncio/io_.py
async def copy2(src: pathlib.Path | str, dst: pathlib.Path | str) -> None:
    """ファイルを非同期でメタデータごとコピーする。"""
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(
        None,
        functools.partial(
            shutil.copy2,
            src,
            dst,
        ),
    )

copytree(src, dst, dirs_exist_ok=False) async

ディレクトリツリーを非同期で再帰的にコピーする。

ソースコード位置: pytilpack/asyncio/io_.py
async def copytree(
    src: pathlib.Path | str,
    dst: pathlib.Path | str,
    dirs_exist_ok: bool = False,
) -> None:
    """ディレクトリツリーを非同期で再帰的にコピーする。"""
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(
        None,
        functools.partial(
            shutil.copytree,
            src,
            dst,
            dirs_exist_ok=dirs_exist_ok,
        ),
    )

move(src, dst) async

ファイルまたはディレクトリを非同期で移動する。

ソースコード位置: pytilpack/asyncio/io_.py
async def move(src: pathlib.Path | str, dst: pathlib.Path | str) -> None:
    """ファイルまたはディレクトリを非同期で移動する。"""
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(
        None,
        functools.partial(
            shutil.move,
            src,
            dst,
        ),
    )

delete_file(path) async

ファイルを非同期で削除する。

ソースコード位置: pytilpack/asyncio/io_.py
async def delete_file(path: pathlib.Path | str) -> None:
    """ファイルを非同期で削除する。"""
    path = pathlib.Path(path)
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(
        None,
        functools.partial(
            pytilpack.pathlib.delete_file,
            path,
        ),
    )

rmtree(path, ignore_errors=False) async

ディレクトリを非同期で再帰的に削除する。読み取り専用ファイルも削除する。

パスが存在しない場合は何もしない。

ソースコード位置: pytilpack/asyncio/io_.py
async def rmtree(path: pathlib.Path | str, ignore_errors: bool = False) -> None:
    """ディレクトリを非同期で再帰的に削除する。読み取り専用ファイルも削除する。

    パスが存在しない場合は何もしない。
    """
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(
        None,
        functools.partial(
            pytilpack.pathlib.rmtree,
            path,
            ignore_errors,
        ),
    )

disk_usage(path) async

ディスク使用量を非同期で取得する。

ソースコード位置: pytilpack/asyncio/io_.py
async def disk_usage(path: pathlib.Path | str) -> shutil._ntuple_diskusage:
    """ディスク使用量を非同期で取得する。"""
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(
        None,
        functools.partial(
            shutil.disk_usage,
            path,
        ),
    )

get_size(path) async

ファイル・ディレクトリのサイズを非同期で取得する。

ソースコード位置: pytilpack/asyncio/io_.py
async def get_size(path: pathlib.Path | str) -> int:
    """ファイル・ディレクトリのサイズを非同期で取得する。"""
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(
        None,
        functools.partial(
            pytilpack.pathlib.get_size,
            path,
        ),
    )

delete_empty_dirs(path, keep_root=True) async

指定したパス以下の空ディレクトリを削除する。

引数:

名前 タイプ デスクリプション デフォルト
path str | Path

対象のパス

必須
keep_root bool

Trueの場合、指定したディレクトリ自体は削除しない

True
ソースコード位置: pytilpack/asyncio/io_.py
async def delete_empty_dirs(path: str | pathlib.Path, keep_root: bool = True) -> None:
    """指定したパス以下の空ディレクトリを削除する。

    Args:
        path: 対象のパス
        keep_root: Trueの場合、指定したディレクトリ自体は削除しない
    """
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(
        None,
        functools.partial(
            pytilpack.pathlib.delete_empty_dirs,
            path,
            keep_root,
        ),
    )

sync(src, dst, delete=False) async

コピー元からコピー先へ同期する。

引数:

名前 タイプ デスクリプション デフォルト
src str | Path

コピー元のパス

必須
dst str | Path

コピー先のパス

必須
delete bool

Trueの場合、コピー元に存在しないファイル・ディレクトリをコピー先から削除する

False
ソースコード位置: pytilpack/asyncio/io_.py
async def sync(src: str | pathlib.Path, dst: str | pathlib.Path, delete: bool = False) -> None:
    """コピー元からコピー先へ同期する。

    Args:
        src: コピー元のパス
        dst: コピー先のパス
        delete: Trueの場合、コピー元に存在しないファイル・ディレクトリをコピー先から削除する
    """
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(
        None,
        functools.partial(
            pytilpack.pathlib.sync,
            src,
            dst,
            delete,
        ),
    )

delete_old_files(path, before, delete_empty_dirs=True, keep_root_empty_dir=True) async

指定した日時より古いファイルを削除し、空になったディレクトリも削除する。

引数:

名前 タイプ デスクリプション デフォルト
path str | Path

対象のパス

必須
before datetime

この日時より前に更新されたファイルを削除

必須
delete_empty_dirs bool

Trueの場合、空になったディレクトリを削除

True
keep_root_empty_dir bool

Trueの場合、指定したディレクトリ自体は削除しない

True
ソースコード位置: pytilpack/asyncio/io_.py
async def delete_old_files(
    path: str | pathlib.Path,
    before: datetime.datetime,
    delete_empty_dirs: bool = True,  # pylint: disable=redefined-outer-name
    keep_root_empty_dir: bool = True,
) -> None:
    """指定した日時より古いファイルを削除し、空になったディレクトリも削除する。

    Args:
        path: 対象のパス
        before: この日時より前に更新されたファイルを削除
        delete_empty_dirs: Trueの場合、空になったディレクトリを削除
        keep_root_empty_dir: Trueの場合、指定したディレクトリ自体は削除しない
    """
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(
        None,
        functools.partial(
            pytilpack.pathlib.delete_old_files,
            path,
            before,
            delete_empty_dirs,
            keep_root_empty_dir,
        ),
    )

ensure_async(func)

関数が非同期関数でない場合、非同期関数に変換するデコレーター。

ソースコード位置: pytilpack/asyncio/functools_.py
def ensure_async[**P, R](
    func: typing.Callable[P, typing.Awaitable[R] | R],
) -> typing.Callable[P, typing.Awaitable[R]]:
    """関数が非同期関数でない場合、非同期関数に変換するデコレーター。"""
    if inspect.iscoroutinefunction(func):
        return typing.cast(typing.Callable[P, typing.Awaitable[R]], func)
    else:
        return run_sync(typing.cast(typing.Callable[P, R], func))

run_sync(func)

同期関数を非同期に実行するデコレーター。

quart.utils.run_syncのquart関係ない版。

ソースコード位置: pytilpack/asyncio/functools_.py
def run_sync[**P, R](
    func: typing.Callable[P, R],
) -> typing.Callable[P, typing.Awaitable[R]]:
    """同期関数を非同期に実行するデコレーター。

    quart.utils.run_syncのquart関係ない版。
    """

    @functools.wraps(func)
    async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
        return await asyncio.to_thread(func, *args, **kwargs)

    return wrapper

run_in_thread(func)

非同期関数を非同期に実行するデコレーター。

awaitも使うけどブロッキング処理も含まれるような関数を雑に何とかするためのもの。 安全重視でスレッドを新たに作成するのでオーバーヘッドは大きめ。

ソースコード位置: pytilpack/asyncio/functools_.py
def run_in_thread[**P, R](
    func: typing.Callable[P, typing.Coroutine[typing.Any, typing.Any, R]],
) -> typing.Callable[P, typing.Awaitable[R]]:
    """非同期関数を非同期に実行するデコレーター。

    awaitも使うけどブロッキング処理も含まれるような関数を雑に何とかするためのもの。
    安全重視でスレッドを新たに作成するのでオーバーヘッドは大きめ。
    """

    @functools.wraps(func)
    async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
        ctx = contextvars.copy_context()
        with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
            future = pool.submit(ctx.run, lambda: asyncio.run(func(*args, **kwargs)))
            return await asyncio.wrap_future(future)

    return wrapper

acquire_with_timeout(lock, timeout) async

ロックを取得し、タイムアウト時間内に取得できなかった場合はFalseを返す。

引数:

名前 タイプ デスクリプション デフォルト
lock Lock | Semaphore

取得するロック。

必須
timeout float

タイムアウト時間(秒)。

必須

戻り値:

タイプ デスクリプション
AsyncGenerator[bool, None]

ロックが取得できた場合はTrue、取得できなかった場合はFalse。

ソースコード位置: pytilpack/asyncio/functools_.py
@contextlib.asynccontextmanager
async def acquire_with_timeout(lock: asyncio.Lock | asyncio.Semaphore, timeout: float) -> typing.AsyncGenerator[bool, None]:
    """ロックを取得し、タイムアウト時間内に取得できなかった場合はFalseを返す。

    Args:
        lock: 取得するロック。
        timeout: タイムアウト時間(秒)。

    Returns:
        ロックが取得できた場合はTrue、取得できなかった場合はFalse。

    """
    try:
        await asyncio.wait_for(lock.acquire(), timeout=timeout)
        acquired = True
    except TimeoutError:
        acquired = False

    try:
        yield acquired
    finally:
        if acquired:
            lock.release()

run(coro)

非同期関数を実行する。

ソースコード位置: pytilpack/asyncio/functools_.py
def run[T](coro: typing.Coroutine[typing.Any, typing.Any, T]) -> T:
    """非同期関数を実行する。"""
    # https://github.com/microsoftgraph/msgraph-sdk-python/issues/366#issuecomment-1830756182
    loop: asyncio.AbstractEventLoop | None
    try:
        loop = asyncio.get_running_loop()
    except RuntimeError:
        loop = None

    # 非同期環境でない場合
    # (スタックトレースをシンプルにするためexceptの外で実行)
    if loop is None:
        return asyncio.run(coro)

    # 何らかの理由でイベントループは存在するが動いてない場合 (謎)
    if not loop.is_running():
        return loop.run_until_complete(coro)

    # 現在のスレッドでイベントループが実行されている場合
    # 別スレッド・別イベントループで実行する
    ctx = contextvars.copy_context()
    with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
        future = pool.submit(ctx.run, lambda: asyncio.run(coro))
        return future.result()

get_task_id()

現在のタスクIDを取得する。

戻り値:

タイプ デスクリプション
int | None

タスクID。タスクが存在しない場合はNone。

ソースコード位置: pytilpack/asyncio/misc.py
def get_task_id() -> int | None:
    """現在のタスクIDを取得する。

    Returns:
        タスクID。タスクが存在しない場合はNone。

    """
    task = asyncio.current_task()
    return id(task) if task is not None else None

get_task_id_hex()

現在のタスクIDを16進数文字列で取得する。

戻り値:

タイプ デスクリプション
str

タスクIDの16進数文字列。タスクが存在しない場合は"None"。

ソースコード位置: pytilpack/asyncio/misc.py
def get_task_id_hex() -> str:
    """現在のタスクIDを16進数文字列で取得する。

    Returns:
        タスクIDの16進数文字列。タスクが存在しない場合は"None"。

    """
    task_id = get_task_id()
    return f"{task_id:x}" if task_id is not None else "None"

functools_

非同期I/O関連。

ensure_async(func)

関数が非同期関数でない場合、非同期関数に変換するデコレーター。

ソースコード位置: pytilpack/asyncio/functools_.py
def ensure_async[**P, R](
    func: typing.Callable[P, typing.Awaitable[R] | R],
) -> typing.Callable[P, typing.Awaitable[R]]:
    """関数が非同期関数でない場合、非同期関数に変換するデコレーター。"""
    if inspect.iscoroutinefunction(func):
        return typing.cast(typing.Callable[P, typing.Awaitable[R]], func)
    else:
        return run_sync(typing.cast(typing.Callable[P, R], func))

run_sync(func)

同期関数を非同期に実行するデコレーター。

quart.utils.run_syncのquart関係ない版。

ソースコード位置: pytilpack/asyncio/functools_.py
def run_sync[**P, R](
    func: typing.Callable[P, R],
) -> typing.Callable[P, typing.Awaitable[R]]:
    """同期関数を非同期に実行するデコレーター。

    quart.utils.run_syncのquart関係ない版。
    """

    @functools.wraps(func)
    async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
        return await asyncio.to_thread(func, *args, **kwargs)

    return wrapper

run_in_thread(func)

非同期関数を非同期に実行するデコレーター。

awaitも使うけどブロッキング処理も含まれるような関数を雑に何とかするためのもの。 安全重視でスレッドを新たに作成するのでオーバーヘッドは大きめ。

ソースコード位置: pytilpack/asyncio/functools_.py
def run_in_thread[**P, R](
    func: typing.Callable[P, typing.Coroutine[typing.Any, typing.Any, R]],
) -> typing.Callable[P, typing.Awaitable[R]]:
    """非同期関数を非同期に実行するデコレーター。

    awaitも使うけどブロッキング処理も含まれるような関数を雑に何とかするためのもの。
    安全重視でスレッドを新たに作成するのでオーバーヘッドは大きめ。
    """

    @functools.wraps(func)
    async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
        ctx = contextvars.copy_context()
        with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
            future = pool.submit(ctx.run, lambda: asyncio.run(func(*args, **kwargs)))
            return await asyncio.wrap_future(future)

    return wrapper

acquire_with_timeout(lock, timeout) async

ロックを取得し、タイムアウト時間内に取得できなかった場合はFalseを返す。

引数:

名前 タイプ デスクリプション デフォルト
lock Lock | Semaphore

取得するロック。

必須
timeout float

タイムアウト時間(秒)。

必須

戻り値:

タイプ デスクリプション
AsyncGenerator[bool, None]

ロックが取得できた場合はTrue、取得できなかった場合はFalse。

ソースコード位置: pytilpack/asyncio/functools_.py
@contextlib.asynccontextmanager
async def acquire_with_timeout(lock: asyncio.Lock | asyncio.Semaphore, timeout: float) -> typing.AsyncGenerator[bool, None]:
    """ロックを取得し、タイムアウト時間内に取得できなかった場合はFalseを返す。

    Args:
        lock: 取得するロック。
        timeout: タイムアウト時間(秒)。

    Returns:
        ロックが取得できた場合はTrue、取得できなかった場合はFalse。

    """
    try:
        await asyncio.wait_for(lock.acquire(), timeout=timeout)
        acquired = True
    except TimeoutError:
        acquired = False

    try:
        yield acquired
    finally:
        if acquired:
            lock.release()

run(coro)

非同期関数を実行する。

ソースコード位置: pytilpack/asyncio/functools_.py
def run[T](coro: typing.Coroutine[typing.Any, typing.Any, T]) -> T:
    """非同期関数を実行する。"""
    # https://github.com/microsoftgraph/msgraph-sdk-python/issues/366#issuecomment-1830756182
    loop: asyncio.AbstractEventLoop | None
    try:
        loop = asyncio.get_running_loop()
    except RuntimeError:
        loop = None

    # 非同期環境でない場合
    # (スタックトレースをシンプルにするためexceptの外で実行)
    if loop is None:
        return asyncio.run(coro)

    # 何らかの理由でイベントループは存在するが動いてない場合 (謎)
    if not loop.is_running():
        return loop.run_until_complete(coro)

    # 現在のスレッドでイベントループが実行されている場合
    # 別スレッド・別イベントループで実行する
    ctx = contextvars.copy_context()
    with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
        future = pool.submit(ctx.run, lambda: asyncio.run(coro))
        return future.result()

io_

非同期I/O関連。

read_json(path, encoding='utf-8', errors='replace', strict=False, **kwargs) async

JSONファイルから非同期で読み取る。

ソースコード位置: pytilpack/asyncio/io_.py
async def read_json(
    path: pytilpack.io.PathOrIO,
    encoding: str = "utf-8",
    errors: str = "replace",
    strict: bool = False,
    **kwargs,
) -> typing.Any:
    """JSONファイルから非同期で読み取る。"""
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(
        None,
        functools.partial(
            pytilpack.json.load,
            path,
            encoding,
            errors,
            strict,
            **kwargs,
        ),
    )

write_json(path, data, ensure_ascii=False, indent=None, separators=None, sort_keys=False, default=pytilpack.json.converter, encoding='utf-8', **kwargs) async

JSONファイルに非同期で書き込む。

ソースコード位置: pytilpack/asyncio/io_.py
async def write_json(
    path: pytilpack.io.PathOrIO,
    data: typing.Any,
    ensure_ascii: bool = False,
    indent: int | str | None = None,
    separators: tuple[str, str] | None = None,
    sort_keys: bool = False,
    default: typing.Callable[[typing.Any], typing.Any] = pytilpack.json.converter,
    encoding: str = "utf-8",
    **kwargs,
) -> None:
    """JSONファイルに非同期で書き込む。"""
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(
        None,
        functools.partial(
            pytilpack.json.save,
            path,
            data,
            ensure_ascii,
            indent,
            separators,
            sort_keys,
            default,
            encoding,
            **kwargs,
        ),
    )

read_jsonc(path, encoding='utf-8', errors='replace', strict=False, **kwargs) async

JSONCファイルから非同期で読み取る。

ソースコード位置: pytilpack/asyncio/io_.py
async def read_jsonc(
    path: pytilpack.io.PathOrIO,
    encoding: str = "utf-8",
    errors: str = "replace",
    strict: bool = False,
    **kwargs,
) -> typing.Any:
    """JSONCファイルから非同期で読み取る。"""
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(
        None,
        functools.partial(
            pytilpack.jsonc.load,
            path,
            encoding,
            errors,
            strict,
            **kwargs,
        ),
    )

read_yaml(path, encoding='utf-8', errors='replace', strict=False, Loader=None) async

YAMLファイルから非同期で読み取る。

ソースコード位置: pytilpack/asyncio/io_.py
async def read_yaml(
    path: pytilpack.io.PathOrIO,
    encoding: str = "utf-8",
    errors: str = "replace",
    strict: bool = False,
    Loader: typing.Any = None,
) -> typing.Any:
    """YAMLファイルから非同期で読み取る。"""
    import yaml  # pylint: disable=import-outside-toplevel

    import pytilpack.yaml  # pylint: disable=import-outside-toplevel,redefined-outer-name

    if Loader is None:
        Loader = yaml.SafeLoader
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(
        None,
        functools.partial(
            pytilpack.yaml.load,
            path,
            encoding,
            errors,
            strict,
            Loader,
        ),
    )

read_yaml_all(path, encoding='utf-8', errors='replace', strict=False, Loader=None) async

YAMLファイルから非同期で読み取る。

ソースコード位置: pytilpack/asyncio/io_.py
async def read_yaml_all(
    path: pytilpack.io.PathOrIO,
    encoding: str = "utf-8",
    errors: str = "replace",
    strict: bool = False,
    Loader: typing.Any = None,
) -> list[typing.Any]:
    """YAMLファイルから非同期で読み取る。"""
    import yaml  # pylint: disable=import-outside-toplevel

    import pytilpack.yaml  # pylint: disable=import-outside-toplevel,redefined-outer-name

    if Loader is None:
        Loader = yaml.SafeLoader
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(
        None,
        functools.partial(
            pytilpack.yaml.load_all,
            path,
            encoding,
            errors,
            strict,
            Loader,
        ),
    )

write_yaml(path, data, allow_unicode=True, width=99, default_style=None, default_flow_style=False, sort_keys=False, Dumper=None, encoding='utf-8', **kwargs) async

YAMLファイルに非同期で書き込む。

ソースコード位置: pytilpack/asyncio/io_.py
async def write_yaml(
    path: pytilpack.io.PathOrIO,
    data: typing.Any,
    allow_unicode: bool | None = True,
    width: int = 99,
    default_style: str | None = None,
    default_flow_style: bool | None = False,
    sort_keys: bool = False,
    Dumper: typing.Any = None,
    encoding: str = "utf-8",
    **kwargs,
) -> None:
    """YAMLファイルに非同期で書き込む。"""
    import pytilpack.yaml  # pylint: disable=import-outside-toplevel,redefined-outer-name

    if Dumper is None:
        Dumper = pytilpack.yaml.CustomDumper
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(
        None,
        functools.partial(
            pytilpack.yaml.save,
            path,
            data,
            allow_unicode,
            width,
            default_style,
            default_flow_style,
            sort_keys,
            Dumper,
            encoding,
            **kwargs,
        ),
    )

write_yaml_all(path, data, allow_unicode=True, width=99, default_style=None, default_flow_style=False, sort_keys=False, Dumper=None, encoding='utf-8', **kwargs) async

YAMLファイルに非同期で書き込む。

ソースコード位置: pytilpack/asyncio/io_.py
async def write_yaml_all(
    path: pytilpack.io.PathOrIO,
    data: list[typing.Any],
    allow_unicode: bool | None = True,
    width: int = 99,
    default_style: str | None = None,
    default_flow_style: bool | None = False,
    sort_keys: bool = False,
    Dumper: typing.Any = None,
    encoding: str = "utf-8",
    **kwargs,
) -> None:
    """YAMLファイルに非同期で書き込む。"""
    import pytilpack.yaml  # pylint: disable=import-outside-toplevel,redefined-outer-name

    if Dumper is None:
        Dumper = pytilpack.yaml.CustomDumper
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(
        None,
        functools.partial(
            pytilpack.yaml.save_all,
            path,
            data,
            allow_unicode,
            width,
            default_style,
            default_flow_style,
            sort_keys,
            Dumper,
            encoding,
            **kwargs,
        ),
    )

read_text(path, encoding='utf-8', errors='strict') async

ファイルからテキストを非同期で読み取る。

ソースコード位置: pytilpack/asyncio/io_.py
async def read_text(path: pathlib.Path | str, encoding: str = "utf-8", errors: str = "strict") -> str:
    """ファイルからテキストを非同期で読み取る。"""
    path = pathlib.Path(path)
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(
        None,
        functools.partial(
            path.read_text,
            encoding,
            errors,
        ),
    )

write_text(path, data, encoding='utf-8', errors='strict') async

ファイルにテキストを非同期で書き込む。

ソースコード位置: pytilpack/asyncio/io_.py
async def write_text(path: pathlib.Path | str, data: str, encoding: str = "utf-8", errors: str = "strict") -> None:
    """ファイルにテキストを非同期で書き込む。"""
    path = pathlib.Path(path)
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(
        None,
        functools.partial(
            path.write_text,
            data,
            encoding,
            errors,
        ),
    )

append_text(path, data, encoding='utf-8', errors='strict') async

ファイルにテキストを非同期で追記する。

ソースコード位置: pytilpack/asyncio/io_.py
async def append_text(path: pathlib.Path | str, data: str, encoding: str = "utf-8", errors: str = "strict") -> None:
    """ファイルにテキストを非同期で追記する。"""
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(
        None,
        functools.partial(
            pytilpack.pathlib.append_text,
            path,
            data,
            encoding,
            errors,
        ),
    )

read_bytes(path) async

ファイルからバイトを非同期で読み取る。

ソースコード位置: pytilpack/asyncio/io_.py
async def read_bytes(path: pathlib.Path | str) -> bytes:
    """ファイルからバイトを非同期で読み取る。"""
    path = pathlib.Path(path)
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(
        None,
        functools.partial(path.read_bytes),
    )

write_bytes(path, data) async

ファイルにバイトを非同期で書き込む。

ソースコード位置: pytilpack/asyncio/io_.py
async def write_bytes(path: pathlib.Path | str, data: bytes) -> None:
    """ファイルにバイトを非同期で書き込む。"""
    path = pathlib.Path(path)
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(
        None,
        functools.partial(
            path.write_bytes,
            data,
        ),
    )

append_bytes(path, data) async

ファイルにバイトを非同期で追記する。

ソースコード位置: pytilpack/asyncio/io_.py
async def append_bytes(path: pathlib.Path | str, data: bytes) -> None:
    """ファイルにバイトを非同期で追記する。"""
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(
        None,
        functools.partial(
            pytilpack.pathlib.append_bytes,
            path,
            data,
        ),
    )

copy2(src, dst) async

ファイルを非同期でメタデータごとコピーする。

ソースコード位置: pytilpack/asyncio/io_.py
async def copy2(src: pathlib.Path | str, dst: pathlib.Path | str) -> None:
    """ファイルを非同期でメタデータごとコピーする。"""
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(
        None,
        functools.partial(
            shutil.copy2,
            src,
            dst,
        ),
    )

copytree(src, dst, dirs_exist_ok=False) async

ディレクトリツリーを非同期で再帰的にコピーする。

ソースコード位置: pytilpack/asyncio/io_.py
async def copytree(
    src: pathlib.Path | str,
    dst: pathlib.Path | str,
    dirs_exist_ok: bool = False,
) -> None:
    """ディレクトリツリーを非同期で再帰的にコピーする。"""
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(
        None,
        functools.partial(
            shutil.copytree,
            src,
            dst,
            dirs_exist_ok=dirs_exist_ok,
        ),
    )

move(src, dst) async

ファイルまたはディレクトリを非同期で移動する。

ソースコード位置: pytilpack/asyncio/io_.py
async def move(src: pathlib.Path | str, dst: pathlib.Path | str) -> None:
    """ファイルまたはディレクトリを非同期で移動する。"""
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(
        None,
        functools.partial(
            shutil.move,
            src,
            dst,
        ),
    )

delete_file(path) async

ファイルを非同期で削除する。

ソースコード位置: pytilpack/asyncio/io_.py
async def delete_file(path: pathlib.Path | str) -> None:
    """ファイルを非同期で削除する。"""
    path = pathlib.Path(path)
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(
        None,
        functools.partial(
            pytilpack.pathlib.delete_file,
            path,
        ),
    )

rmtree(path, ignore_errors=False) async

ディレクトリを非同期で再帰的に削除する。読み取り専用ファイルも削除する。

パスが存在しない場合は何もしない。

ソースコード位置: pytilpack/asyncio/io_.py
async def rmtree(path: pathlib.Path | str, ignore_errors: bool = False) -> None:
    """ディレクトリを非同期で再帰的に削除する。読み取り専用ファイルも削除する。

    パスが存在しない場合は何もしない。
    """
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(
        None,
        functools.partial(
            pytilpack.pathlib.rmtree,
            path,
            ignore_errors,
        ),
    )

disk_usage(path) async

ディスク使用量を非同期で取得する。

ソースコード位置: pytilpack/asyncio/io_.py
async def disk_usage(path: pathlib.Path | str) -> shutil._ntuple_diskusage:
    """ディスク使用量を非同期で取得する。"""
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(
        None,
        functools.partial(
            shutil.disk_usage,
            path,
        ),
    )

get_size(path) async

ファイル・ディレクトリのサイズを非同期で取得する。

ソースコード位置: pytilpack/asyncio/io_.py
async def get_size(path: pathlib.Path | str) -> int:
    """ファイル・ディレクトリのサイズを非同期で取得する。"""
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(
        None,
        functools.partial(
            pytilpack.pathlib.get_size,
            path,
        ),
    )

delete_empty_dirs(path, keep_root=True) async

指定したパス以下の空ディレクトリを削除する。

引数:

名前 タイプ デスクリプション デフォルト
path str | Path

対象のパス

必須
keep_root bool

Trueの場合、指定したディレクトリ自体は削除しない

True
ソースコード位置: pytilpack/asyncio/io_.py
async def delete_empty_dirs(path: str | pathlib.Path, keep_root: bool = True) -> None:
    """指定したパス以下の空ディレクトリを削除する。

    Args:
        path: 対象のパス
        keep_root: Trueの場合、指定したディレクトリ自体は削除しない
    """
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(
        None,
        functools.partial(
            pytilpack.pathlib.delete_empty_dirs,
            path,
            keep_root,
        ),
    )

sync(src, dst, delete=False) async

コピー元からコピー先へ同期する。

引数:

名前 タイプ デスクリプション デフォルト
src str | Path

コピー元のパス

必須
dst str | Path

コピー先のパス

必須
delete bool

Trueの場合、コピー元に存在しないファイル・ディレクトリをコピー先から削除する

False
ソースコード位置: pytilpack/asyncio/io_.py
async def sync(src: str | pathlib.Path, dst: str | pathlib.Path, delete: bool = False) -> None:
    """コピー元からコピー先へ同期する。

    Args:
        src: コピー元のパス
        dst: コピー先のパス
        delete: Trueの場合、コピー元に存在しないファイル・ディレクトリをコピー先から削除する
    """
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(
        None,
        functools.partial(
            pytilpack.pathlib.sync,
            src,
            dst,
            delete,
        ),
    )

delete_old_files(path, before, delete_empty_dirs=True, keep_root_empty_dir=True) async

指定した日時より古いファイルを削除し、空になったディレクトリも削除する。

引数:

名前 タイプ デスクリプション デフォルト
path str | Path

対象のパス

必須
before datetime

この日時より前に更新されたファイルを削除

必須
delete_empty_dirs bool

Trueの場合、空になったディレクトリを削除

True
keep_root_empty_dir bool

Trueの場合、指定したディレクトリ自体は削除しない

True
ソースコード位置: pytilpack/asyncio/io_.py
async def delete_old_files(
    path: str | pathlib.Path,
    before: datetime.datetime,
    delete_empty_dirs: bool = True,  # pylint: disable=redefined-outer-name
    keep_root_empty_dir: bool = True,
) -> None:
    """指定した日時より古いファイルを削除し、空になったディレクトリも削除する。

    Args:
        path: 対象のパス
        before: この日時より前に更新されたファイルを削除
        delete_empty_dirs: Trueの場合、空になったディレクトリを削除
        keep_root_empty_dir: Trueの場合、指定したディレクトリ自体は削除しない
    """
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(
        None,
        functools.partial(
            pytilpack.pathlib.delete_old_files,
            path,
            before,
            delete_empty_dirs,
            keep_root_empty_dir,
        ),
    )

jobrunner

非同期I/O関連。

Job()

非同期ジョブ。

ソースコード位置: pytilpack/asyncio/jobrunner.py
def __init__(self) -> None:
    self.status: JobStatus = "waiting"
run() abstractmethod async

ジョブの処理。内部でブロッキング処理がある場合は適宜 asyncio.to_thread 等を利用する。

ソースコード位置: pytilpack/asyncio/jobrunner.py
@abc.abstractmethod
async def run(self) -> None:
    """ジョブの処理。内部でブロッキング処理がある場合は適宜 asyncio.to_thread 等を利用する。"""
on_finished() async

ジョブが完了した場合に呼ばれる処理。必要に応じてサブクラスで追加の処理を行う。

ソースコード位置: pytilpack/asyncio/jobrunner.py
async def on_finished(self) -> None:
    """ジョブが完了した場合に呼ばれる処理。必要に応じてサブクラスで追加の処理を行う。"""
    self.status = "finished"
on_canceled() async

ジョブが完了する前にキャンセルされた場合に呼ばれる処理。必要に応じてサブクラスで追加の処理を行う。

ソースコード位置: pytilpack/asyncio/jobrunner.py
async def on_canceled(self) -> None:
    """ジョブが完了する前にキャンセルされた場合に呼ばれる処理。必要に応じてサブクラスで追加の処理を行う。"""
    self.status = "canceled"
on_errored() async

ジョブがエラー終了した場合に呼ばれる処理。必要に応じてサブクラスで追加の処理を行う。

ソースコード位置: pytilpack/asyncio/jobrunner.py
async def on_errored(self) -> None:
    """ジョブがエラー終了した場合に呼ばれる処理。必要に応じてサブクラスで追加の処理を行う。"""
    self.status = "errored"
on_finally() async

ジョブの終了時に必ず呼ばれる処理。必要に応じてサブクラスで追加の処理を行う。

ソースコード位置: pytilpack/asyncio/jobrunner.py
async def on_finally(self) -> None:
    """ジョブの終了時に必ず呼ばれる処理。必要に応じてサブクラスで追加の処理を行う。"""
    del self  # noqa

JobRunner(max_job_concurrency=8, poll_interval=1.0)

非同期ジョブを最大 max_job_concurrency 並列で実行するクラス。

引数:

名前 タイプ デスクリプション デフォルト
max_job_concurrency int

ジョブの最大同時実行数

8
poll_interval float

ジョブ取得のポーリング間隔(秒)

1.0
ソースコード位置: pytilpack/asyncio/jobrunner.py
def __init__(self, max_job_concurrency: int = 8, poll_interval: float = 1.0) -> None:
    self.poll_interval = poll_interval
    self.max_job_concurrency = max_job_concurrency
    self.running = True
    self.semaphore = asyncio.Semaphore(max_job_concurrency)
    self.tasks: set[asyncio.Task] = set()  # 実行中ジョブのタスクを管理
run() async

poll()でジョブを取得し、並列実行上限内でジョブを実行する。

ソースコード位置: pytilpack/asyncio/jobrunner.py
async def run(self) -> None:
    """poll()でジョブを取得し、並列実行上限内でジョブを実行する。"""
    while self.running:
        # セマフォを取得して実行可能なジョブがあるか確認
        await self.semaphore.acquire()
        # 再度self.runningをチェック (graceful_shutdown()対策)
        if not self.running:
            self.semaphore.release()
            break
        job = await self._poll()
        if job is None:
            # ジョブがなければセマフォを解放して一定時間待機
            self.semaphore.release()
            await asyncio.sleep(self.poll_interval)
        else:
            # ジョブがあれば実行
            task = asyncio.create_task(self._run_job(job))
            task.add_done_callback(self.tasks.discard)
            self.tasks.add(task)
shutdown()

停止処理。

ソースコード位置: pytilpack/asyncio/jobrunner.py
def shutdown(self) -> None:
    """停止処理。"""
    self.running = False
    # 現在実行中のタスクにキャンセルを通知
    for task in list(self.tasks):
        task.cancel()
graceful_shutdown() async

新規ジョブ取得を停止し、実行中のジョブ完了を待ってから戻る

ソースコード位置: pytilpack/asyncio/jobrunner.py
async def graceful_shutdown(self) -> None:
    """新規ジョブ取得を停止し、実行中のジョブ完了を待ってから戻る"""
    self.running = False
    await asyncio.sleep(0)
    if len(self.tasks) > 0:
        await asyncio.gather(*list(self.tasks), return_exceptions=True)
poll() abstractmethod async

次のジョブを返す。ジョブがなければ None を返す。

ソースコード位置: pytilpack/asyncio/jobrunner.py
@abc.abstractmethod
async def poll(self) -> Job | None:
    """次のジョブを返す。ジョブがなければ None を返す。"""

misc

asyncio用のユーティリティ集。

get_task_id()

現在のタスクIDを取得する。

戻り値:

タイプ デスクリプション
int | None

タスクID。タスクが存在しない場合はNone。

ソースコード位置: pytilpack/asyncio/misc.py
def get_task_id() -> int | None:
    """現在のタスクIDを取得する。

    Returns:
        タスクID。タスクが存在しない場合はNone。

    """
    task = asyncio.current_task()
    return id(task) if task is not None else None

get_task_id_hex()

現在のタスクIDを16進数文字列で取得する。

戻り値:

タイプ デスクリプション
str

タスクIDの16進数文字列。タスクが存在しない場合は"None"。

ソースコード位置: pytilpack/asyncio/misc.py
def get_task_id_hex() -> str:
    """現在のタスクIDを16進数文字列で取得する。

    Returns:
        タスクIDの16進数文字列。タスクが存在しない場合は"None"。

    """
    task_id = get_task_id()
    return f"{task_id:x}" if task_id is not None else "None"

threadpool

非同期I/O関連。

ThreadPool(max_workers)

N個のスレッド上で非同期処理を実行するスレッドプール。

スレッドプールを初期化する。

引数:

名前 タイプ デスクリプション デフォルト
max_workers int

ワーカースレッド数(1以上)

必須
ソースコード位置: pytilpack/asyncio/threadpool.py
def __init__(self, max_workers: int) -> None:
    """スレッドプールを初期化する。

    Args:
        max_workers: ワーカースレッド数(1以上)
    """
    if max_workers < 1:
        raise ValueError("max_workers must be >= 1")
    self.workers = [WorkerThread(name=f"aloop-{i}") for i in range(max_workers)]
    self.next = 0
    self.lock = threading.Lock()
    for w in self.workers:
        w.start()
__enter__()

コンテキストマネージャーの開始処理。

ソースコード位置: pytilpack/asyncio/threadpool.py
def __enter__(self) -> "ThreadPool":
    """コンテキストマネージャーの開始処理。"""
    return self
__exit__(exc_type, exc_val, exc_tb)

コンテキストマネージャーの終了処理。

ソースコード位置: pytilpack/asyncio/threadpool.py
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
    """コンテキストマネージャーの終了処理。"""
    self.shutdown()
submit(coro)

コルーチンを実行するワーカーに送信する。

引数:

名前 タイプ デスクリプション デフォルト
coro Coroutine[Any, Any, T]

実行するコルーチン

必須

戻り値:

タイプ デスクリプション
Future[T]

結果を取得するためのFuture

ソースコード位置: pytilpack/asyncio/threadpool.py
def submit[T](self, coro: typing.Coroutine[typing.Any, typing.Any, T]) -> concurrent.futures.Future[T]:
    """コルーチンを実行するワーカーに送信する。

    Args:
        coro: 実行するコルーチン

    Returns:
        結果を取得するためのFuture
    """
    with self.lock:
        idx = self.next
        self.next = (self.next + 1) % len(self.workers)
    return self.workers[idx].submit(coro)
map(coros)

複数のコルーチンをワーカーに送信する。

引数:

名前 タイプ デスクリプション デフォルト
coros Iterable[Coroutine[Any, Any, T]]

実行するコルーチンのイテラブル

必須

戻り値:

タイプ デスクリプション
list[Future[T]]

各コルーチンの結果を取得するためのFutureのリスト

ソースコード位置: pytilpack/asyncio/threadpool.py
def map[T](self, coros: typing.Iterable[typing.Coroutine[typing.Any, typing.Any, T]]) -> list[concurrent.futures.Future[T]]:
    """複数のコルーチンをワーカーに送信する。

    Args:
        coros: 実行するコルーチンのイテラブル

    Returns:
        各コルーチンの結果を取得するためのFutureのリスト
    """
    return [self.submit(c) for c in coros]
shutdown()

全てのワーカースレッドを停止する。

ソースコード位置: pytilpack/asyncio/threadpool.py
def shutdown(self) -> None:
    """全てのワーカースレッドを停止する。"""
    for w in self.workers:
        w.stop()
ashutdown() async

全てのワーカースレッドを非同期的に停止する。

ソースコード位置: pytilpack/asyncio/threadpool.py
async def ashutdown(self) -> None:
    """全てのワーカースレッドを非同期的に停止する。"""
    await asyncio.to_thread(self.shutdown)
__aenter__() async

非同期コンテキストマネージャーの開始処理。

ソースコード位置: pytilpack/asyncio/threadpool.py
async def __aenter__(self) -> "ThreadPool":
    """非同期コンテキストマネージャーの開始処理。"""
    return self
__aexit__(exc_type, exc_value, traceback) async

非同期コンテキストマネージャーの終了処理。

ソースコード位置: pytilpack/asyncio/threadpool.py
async def __aexit__(self, exc_type, exc_value, traceback) -> None:
    """非同期コンテキストマネージャーの終了処理。"""
    await self.ashutdown()
__del__()

デストラクタ。停止していないワーカーがいる場合は警告して停止シグナルを送る。

ソースコード位置: pytilpack/asyncio/threadpool.py
def __del__(self) -> None:
    """デストラクタ。停止していないワーカーがいる場合は警告して停止シグナルを送る。"""
    active_workers = [w for w in self.workers if w.thread is not None]
    if active_workers:
        logger.warning(
            "ThreadPool is being destroyed with %d active worker(s). Sending stop signal.",
            len(active_workers),
        )
        # デストラクタ内では待機せず、停止シグナルのみ送出する
        for w in active_workers:
            if w.loop is not None:
                w.loop.call_soon_threadsafe(w.loop.stop)  # 一時的に無効化

WorkerThread(name)

専用のasyncioイベントループを実行するワーカースレッド。

ワーカースレッドを初期化する。

引数:

名前 タイプ デスクリプション デフォルト
name str

スレッド名

必須
ソースコード位置: pytilpack/asyncio/threadpool.py
def __init__(self, name: str) -> None:
    """ワーカースレッドを初期化する。

    Args:
        name: スレッド名
    """
    self.name = name
    self.thread: threading.Thread | None = None
    self.loop: asyncio.AbstractEventLoop | None = None
    self.stopped = threading.Event()
start()

ワーカースレッドを起動する。

ソースコード位置: pytilpack/asyncio/threadpool.py
def start(self) -> None:
    """ワーカースレッドを起動する。"""
    started = threading.Event()

    def thread() -> None:
        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.loop)
        started.set()
        try:
            self.loop.run_forever()
        finally:
            try:
                pending = asyncio.all_tasks(self.loop)
                for t in pending:
                    t.cancel()
                if pending:
                    self.loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
            finally:
                self.loop.close()
                self.stopped.set()

    self.thread = threading.Thread(target=thread, name=self.name, daemon=True)
    self.thread.start()
    started.wait()
submit(coro)

コルーチンをイベントループに送信する。

引数:

名前 タイプ デスクリプション デフォルト
coro Coroutine[Any, Any, T]

実行するコルーチン

必須

戻り値:

タイプ デスクリプション
Future[T]

結果を取得するためのFuture

ソースコード位置: pytilpack/asyncio/threadpool.py
def submit[T](self, coro: typing.Coroutine[typing.Any, typing.Any, T]) -> concurrent.futures.Future[T]:
    """コルーチンをイベントループに送信する。

    Args:
        coro: 実行するコルーチン

    Returns:
        結果を取得するためのFuture
    """
    if self.loop is None:
        raise RuntimeError("worker not started")
    return asyncio.run_coroutine_threadsafe(coro, self.loop)
stop()

ワーカースレッドを停止する。

ソースコード位置: pytilpack/asyncio/threadpool.py
def stop(self) -> None:
    """ワーカースレッドを停止する。"""
    if self.loop is None:
        return
    self.loop.call_soon_threadsafe(self.loop.stop)
    self.stopped.wait()
    self.loop = None
    self.thread = None