コンテンツにスキップ

pytilpack.threading

pytilpack.threading

スレッド関連。

acquire_with_timeout(lock, timeout)

ロックを取得し、タイムアウト時間内に取得できなかった場合はFalseを返す。

引数:

名前 タイプ デスクリプション デフォルト
lock Lock | RLock | Semaphore | BoundedSemaphore

取得するロック。

必須
timeout float

タイムアウト時間(秒)。

必須

戻り値:

タイプ デスクリプション
None

ロックが取得できた場合はTrue、取得できなかった場合はFalse。

ソースコード位置: pytilpack/threading.py
@contextlib.contextmanager
def acquire_with_timeout(
    lock: "threading.Lock | threading.RLock | threading.Semaphore | threading.BoundedSemaphore",
    timeout: float,
) -> typing.Generator[bool, None, None]:
    """ロックを取得し、タイムアウト時間内に取得できなかった場合はFalseを返す。

    Args:
        lock: 取得するロック。
        timeout: タイムアウト時間(秒)。

    Returns:
        ロックが取得できた場合はTrue、取得できなかった場合はFalse。

    """
    acquired = lock.acquire(timeout=timeout)
    try:
        yield acquired
    finally:
        if acquired:
            lock.release()

parallel(funcs, max_workers=None, thread_name_prefix='', timeout=None, chunksize=1)

複数の関数を並列実行する。

引数:

名前 タイプ デスクリプション デフォルト
funcs Iterable[Callable[[], T]]

実行する関数のリスト。

必須
max_workers int | None

同時実行するスレッド数。Noneの場合はCPUのコア数。

None
thread_name_prefix str

スレッド名のプレフィックス。

''
timeout float | None

タイムアウト時間。

None
chunksize int

一度に実行する関数の数。

1

戻り値:

タイプ デスクリプション
list[T]

各関数の戻り値のリスト。

ソースコード位置: pytilpack/threading.py
def parallel[T](
    funcs: typing.Iterable[typing.Callable[[], T]],
    max_workers: int | None = None,
    thread_name_prefix: str = "",
    timeout: float | None = None,
    chunksize: int = 1,
) -> list[T]:
    """複数の関数を並列実行する。

    Args:
        funcs: 実行する関数のリスト。
        max_workers: 同時実行するスレッド数。Noneの場合はCPUのコア数。
        thread_name_prefix: スレッド名のプレフィックス。
        timeout: タイムアウト時間。
        chunksize: 一度に実行する関数の数。

    Returns:
        各関数の戻り値のリスト。

    """
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix=thread_name_prefix) as executor:
        return list(executor.map(lambda f: f(), funcs, timeout=timeout, chunksize=chunksize))

parallel_for(func, n)

複数の関数を並列実行する。

引数:

名前 タイプ デスクリプション デフォルト
func Callable[[int], T]

実行する関数。

必須
n int

ループ回数。

必須

戻り値:

タイプ デスクリプション
list[T]

各関数の戻り値のリスト。

ソースコード位置: pytilpack/threading.py
def parallel_for[T](func: typing.Callable[[int], T], n: int) -> list[T]:
    """複数の関数を並列実行する。

    Args:
        func: 実行する関数。
        n: ループ回数。

    Returns:
        各関数の戻り値のリスト。

    """
    return parallel([lambda i=i: func(i) for i in range(n)])  # type: ignore[misc]

parallel_foreach(func, items)

複数の関数を並列実行する。

引数:

名前 タイプ デスクリプション デフォルト
func Callable[[U], T]

実行する関数。

必須
items Iterable[U]

引数のリスト。

必須

戻り値:

タイプ デスクリプション
list[T]

各関数の戻り値のリスト。

ソースコード位置: pytilpack/threading.py
def parallel_foreach[T, U](func: typing.Callable[[U], T], items: typing.Iterable[U]) -> list[T]:
    """複数の関数を並列実行する。

    Args:
        func: 実行する関数。
        items: 引数のリスト。

    Returns:
        各関数の戻り値のリスト。

    """
    return parallel([lambda item=item: func(item) for item in items])  # type: ignore[misc]