multiprocessing template with live log of available results
I use multiprocessing to test newly developed code against massive amounts of test data. I thereby want to get results as fast as possible: If the new code fails for one of the test data, I can start developing a fix. While I do so, I want to see how the code performes on the rest of the test data. Then I can potentially change the order in which test data is processed in the next run (to see failures fast).
The following template
- executes a maximum number of processes in parallel (using semaphore)
 
- collects all results in a 
pd.DataFrame as soon as available 
- prints a summary as soon as a new result is available
 
- non-parallel mode available for debugging
 
code
import sys
import time
import random
from typing import List, Callable, Dict, Any
import multiprocessing as mp
from multiprocessing.managers import DictProxy
import logging
import pandas as pd
N_PROC = mp.cpu_count() - 1  # number of processes you want to run in parallel (others are waiting for semaphore)
MULTIPROCESSING_UPDATE_CICLE = .1  # wait so long until you check all jobs again if finished
# logging
DEFAULT_FORMAT = "\n%(levelname)s - %(asctime)s.%(msecs)03d - %(filename)s, l %(lineno)d:\n%(message)s"
DEFAULT_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S"
default_stream_handler = logging.StreamHandler(sys.stdout)
default_stream_handler.setFormatter(logging.Formatter(fmt=DEFAULT_FORMAT, datefmt=DEFAULT_TIME_FORMAT))
logger = logging.getLogger("mp_template")
logger.setLevel(logging.DEBUG)
logger.addHandler(default_stream_handler)
# fix seed
random.seed(42)  # a 'not so' arbitrary number
def process_single_task(task_name: str) -> Dict:
    """
    This is the slow function you want to parallelize.
    Parameters
    ----------
    task_name : str
        some input
    Returns
    -------
    Dict :
        Returns dictionary of different value produced during execution.
        This is overengeneered for this example, but pretty handy for more complex function.
    """
    result = {}
    n_sec = random.randint(1, 4)
    logger.debug(f"start {task_name=}, {n_sec=}")
    time.sleep(n_sec)
    logger.debug(f"end {task_name=}, {n_sec=}")
    result['n_sec'] = n_sec
    result['log'] = f"executed {task_name=}"
    return result
def fct_to_multiprocessing(
        fct: Callable, fct_kwargs: Dict[str, Any], job_id: int, results: DictProxy, semaphore: mp.Semaphore):
    """
    Function for handling maximum number of active processes and managing each processes return value.
    Parameters
    ----------
    fct : Callable
        Function to execute in separate process
    fct_kwargs : Dict[str, Any]
        kwargs for fct
    job_id : int
        id to manage results. Result is stored in results[job_id]
    results: DictProxy
        special mp dict to manage return values of fct
    semaphore: mp.Semaphore
        semaphore object to prevent more than N_PROC running in parallel
    Example
    -------
    Use as following:
        manager = mp.Manager()
        results = manager.dict()
        sema = mp.Semaphore(N_PROC)
        jobs = {}
        for job_id in ...:
            jobs[job_id] = mp.Process(
                target=fct_to_multiprocessing,
                kwargs={
                    "fct": ..., "fct_kwargs": {...},
                    "job_id": job_id, "results": results, "semaphore": sema
                }
            )
            jobs[proj_name].start()
    """
    if semaphore is not None:
        semaphore.acquire()
    results[job_id] = fct(**fct_kwargs)
    if semaphore is not None:
        semaphore.release()
def manage_results(df_results: pd.DataFrame, job_id: int, result: Dict) -> pd.DataFrame:
    df_results.loc[job_id, result.keys()] = result.values()
    logger.info(df_results)
    return df_results
def process_all_tasks(tasks: List[str]):
    logger.info(f"\n\n{''.center(80, '=')}\n{' started '.center(80, '=')}\n{''.center(80, '=')}\n")
    logger.info(f"executing code on {N_PROC} / {mp.cpu_count()} simultaneous processes")
    job_ids = [f"job_id={job_id}" for job_id in tasks]
    df_results = pd.DataFrame(index=job_ids)
    # run jobs
    if N_PROC == 1:  # no parallelization, good for debugging
        for job_id, task in zip(job_ids, tasks):
            result = process_single_task(task_name=task)
            df_results = manage_results(df_results=df_results, job_id=job_id, result=result)
    else:  # parallelization on
        manager = mp.Manager()
        results = manager.dict()
        sema = mp.Semaphore(N_PROC)
        jobs = {}
        for job_id, task in zip(job_ids, tasks):
            jobs[job_id] = mp.Process(
                target=fct_to_multiprocessing,
                kwargs={
                    "fct": process_single_task, "fct_kwargs": {"task_name": task},
                    "job_id": job_id, "results": results, "semaphore": sema
                }
            )
            jobs[job_id].start()
        while jobs:  # as soon as a job is completed, add this to df_results
            for job_id in jobs.keys():
                job = jobs[job_id]
                if job.exitcode is not None: # a new job is completed
                    job.join()
                    result = results[job_id]
                    job.close()
                    del jobs[job_id]
                    df_results = manage_results(df_results=df_results, job_id=job_id, result=result)
                    break
            time.sleep(MULTIPROCESSING_UPDATE_CICLE)
    logger.info(f"\n\n{''.center(80, '=')}\n{' finished '.center(80, '=')}\n{''.center(80, '=')}\n")
    logger.info(df_results)
if __name__ == "__main__":
    tasks = list("abcdef")
    process_all_tasks(tasks)
output
$ python 230315_multiprocessing_template.py 
INFO - 2023-03-15T10:51:09.786 - 230315_multiprocessing_template.py, l 98:
================================================================================
=================================== started ====================================
================================================================================
INFO - 2023-03-15T10:51:09.786 - 230315_multiprocessing_template.py, l 99:
executing code on 3 / 4 simultaneous processes
DEBUG - 2023-03-15T10:51:09.794 - 230315_multiprocessing_template.py, l 43:
start task_name='a', n_sec=2
DEBUG - 2023-03-15T10:51:09.794 - 230315_multiprocessing_template.py, l 43:
start task_name='b', n_sec=2
DEBUG - 2023-03-15T10:51:09.796 - 230315_multiprocessing_template.py, l 43:
start task_name='c', n_sec=1
DEBUG - 2023-03-15T10:51:10.797 - 230315_multiprocessing_template.py, l 45:
end task_name='c', n_sec=1
DEBUG - 2023-03-15T10:51:10.798 - 230315_multiprocessing_template.py, l 43:
start task_name='d', n_sec=1
INFO - 2023-03-15T10:51:10.901 - 230315_multiprocessing_template.py, l 94:
          n_sec                     log
job_id=a    NaN                     NaN
job_id=b    NaN                     NaN
job_id=c    1.0  executed task_name='c'
job_id=d    NaN                     NaN
job_id=e    NaN                     NaN
job_id=f    NaN                     NaN
DEBUG - 2023-03-15T10:51:11.796 - 230315_multiprocessing_template.py, l 45:
end task_name='a', n_sec=2
DEBUG - 2023-03-15T10:51:11.796 - 230315_multiprocessing_template.py, l 45:
end task_name='b', n_sec=2
DEBUG - 2023-03-15T10:51:11.797 - 230315_multiprocessing_template.py, l 43:
start task_name='f', n_sec=2
DEBUG - 2023-03-15T10:51:11.798 - 230315_multiprocessing_template.py, l 43:
start task_name='e', n_sec=1
DEBUG - 2023-03-15T10:51:11.798 - 230315_multiprocessing_template.py, l 45:
end task_name='d', n_sec=1
INFO - 2023-03-15T10:51:11.807 - 230315_multiprocessing_template.py, l 94:
          n_sec                     log
job_id=a    2.0  executed task_name='a'
job_id=b    NaN                     NaN
job_id=c    1.0  executed task_name='c'
job_id=d    NaN                     NaN
job_id=e    NaN                     NaN
job_id=f    NaN                     NaN
INFO - 2023-03-15T10:51:11.910 - 230315_multiprocessing_template.py, l 94:
          n_sec                     log
job_id=a    2.0  executed task_name='a'
job_id=b    2.0  executed task_name='b'
job_id=c    1.0  executed task_name='c'
job_id=d    NaN                     NaN
job_id=e    NaN                     NaN
job_id=f    NaN                     NaN
INFO - 2023-03-15T10:51:12.014 - 230315_multiprocessing_template.py, l 94:
          n_sec                     log
job_id=a    2.0  executed task_name='a'
job_id=b    2.0  executed task_name='b'
job_id=c    1.0  executed task_name='c'
job_id=d    1.0  executed task_name='d'
job_id=e    NaN                     NaN
job_id=f    NaN                     NaN
DEBUG - 2023-03-15T10:51:12.799 - 230315_multiprocessing_template.py, l 45:
end task_name='e', n_sec=1
INFO - 2023-03-15T10:51:12.819 - 230315_multiprocessing_template.py, l 94:
          n_sec                     log
job_id=a    2.0  executed task_name='a'
job_id=b    2.0  executed task_name='b'
job_id=c    1.0  executed task_name='c'
job_id=d    1.0  executed task_name='d'
job_id=e    1.0  executed task_name='e'
job_id=f    NaN                     NaN
DEBUG - 2023-03-15T10:51:13.800 - 230315_multiprocessing_template.py, l 45:
end task_name='f', n_sec=2
INFO - 2023-03-15T10:51:13.824 - 230315_multiprocessing_template.py, l 94:
          n_sec                     log
job_id=a    2.0  executed task_name='a'
job_id=b    2.0  executed task_name='b'
job_id=c    1.0  executed task_name='c'
job_id=d    1.0  executed task_name='d'
job_id=e    1.0  executed task_name='e'
job_id=f    2.0  executed task_name='f'
INFO - 2023-03-15T10:51:13.927 - 230315_multiprocessing_template.py, l 140:
================================================================================
=================================== finished ===================================
================================================================================
INFO - 2023-03-15T10:51:13.927 - 230315_multiprocessing_template.py, l 141:
          n_sec                     log
job_id=a    2.0  executed task_name='a'
job_id=b    2.0  executed task_name='b'
job_id=c    1.0  executed task_name='c'
job_id=d    1.0  executed task_name='d'
job_id=e    1.0  executed task_name='e'
job_id=f    2.0  executed task_name='f'