Source code for sctoolbox.utils.multiprocessing

"""Functions related to multiprocessing."""

import time
import sctoolbox.utils as utils

# type hint imports
from beartype.typing import TYPE_CHECKING, Any, Tuple
from beartype import beartype

if TYPE_CHECKING:
    import tqdm


[docs] @beartype def get_pbar(total: int, description: str, **kwargs: Any) -> "tqdm.tqdm": """ Get a progress bar depending on whether the user is using a notebook or not. Parameters ---------- total : int Total number elements to be shown in the progress bar. description : str Description to be shown in the progress bar. **kwargs : Any Keyword arguments to be passed to tqdm. Returns ------- tqdm.tqdm A progress bar object. """ if utils._is_notebook() is True: from tqdm import tqdm_notebook as tqdm else: from tqdm import tqdm pbar = tqdm(total=total, desc=description, **kwargs) return pbar
[docs] @beartype def monitor_jobs(jobs: dict[Tuple[int, int | str], Any] | list[Any], description: str = "Progress") -> None: """ Monitor the status of jobs submitted to a pool. Parameters ---------- jobs : dict[Tuple[int, int | str], Any] | list[Any] List or dict of job objects, e.g. as returned by pool.map_async(). description : str, default "Progress" Description to be shown in the progress bar. """ if isinstance(jobs, dict): jobs = list(jobs.values()) # Wait for all jobs to finish n_ready = sum([job.ready() for job in jobs]) pbar = get_pbar(len(jobs), description) while n_ready != len(jobs): if n_ready != pbar.n: pbar.n = n_ready pbar.refresh() time.sleep(1) n_ready = sum([job.ready() for job in jobs]) pbar.n = n_ready # update progress bar to 100% pbar.refresh() pbar.close()