コンテンツにスキップ

pytilpack.threadinga

pytilpack.threadinga

スレッド関連のasync版。

parallel(funcs, max_workers=None, timeout=None) async

複数の関数を並列実行する。

引数:

名前 タイプ デスクリプション デフォルト
funcs list[Callable[[], Coroutine[Any, Any, T]]]

実行する関数のリスト。

必須
max_workers int | None

同時実行するスレッド数。Noneの場合はCPUのコア数。

None
timeout float | None

タイムアウト時間。

None

戻り値:

タイプ デスクリプション
list[T]

各関数の戻り値のリスト。

ソースコード位置: pytilpack/threadinga.py
async def parallel[T](
    funcs: list[typing.Callable[[], typing.Coroutine[typing.Any, typing.Any, T]]],
    max_workers: int | None = None,
    timeout: float | None = None,
) -> list[T]:
    """複数の関数を並列実行する。

    Args:
        funcs: 実行する関数のリスト。
        max_workers: 同時実行するスレッド数。Noneの場合はCPUのコア数。
        timeout: タイムアウト時間。

    Returns:
        各関数の戻り値のリスト。

    """
    semaphore = threading.Semaphore(max_workers if max_workers is not None else multiprocessing.cpu_count())

    def _thread(
        func: typing.Callable[[], typing.Coroutine[typing.Any, typing.Any, T]],
    ) -> T:
        with semaphore:
            return asyncio.run(func())

    tasks = [asyncio.to_thread(_thread, func) for func in funcs]
    if timeout is None:
        results = await asyncio.gather(*tasks)
    else:
        results = await asyncio.wait_for(asyncio.gather(*tasks), timeout=timeout)
    return list(results)

parallel_for(func, n) async

複数の関数を並列実行する。

引数:

名前 タイプ デスクリプション デフォルト
func Callable[[int], Awaitable[T]]

実行する関数。

必須
n int

ループ回数。

必須

戻り値:

タイプ デスクリプション
list[T]

各関数の戻り値のリスト。

ソースコード位置: pytilpack/threadinga.py
async def parallel_for[T](func: typing.Callable[[int], typing.Awaitable[T]], n: int) -> list[T]:
    """複数の関数を並列実行する。

    Args:
        func: 実行する関数。
        n: ループ回数。

    Returns:
        各関数の戻り値のリスト。

    """
    return await parallel([lambda i=i: func(i) for i in range(n)])  # type: ignore[misc]

parallel_foreach(func, items) async

複数の関数を並列実行する。

引数:

名前 タイプ デスクリプション デフォルト
func Callable[[U], Awaitable[T]]

実行する関数。

必須
items Iterable[U]

引数のリスト。

必須

戻り値:

タイプ デスクリプション
list[T]

各関数の戻り値のリスト。

ソースコード位置: pytilpack/threadinga.py
async def parallel_foreach[T, U](func: typing.Callable[[U], typing.Awaitable[T]], items: typing.Iterable[U]) -> list[T]:
    """複数の関数を並列実行する。

    Args:
        func: 実行する関数。
        items: 引数のリスト。

    Returns:
        各関数の戻り値のリスト。

    """
    return await parallel([lambda item=item: func(item) for item in items])  # type: ignore[misc]