I have the following files:
binreader/
├─ packet/
│  ├─ __init__.py
│  ├─ aggregator.py
│  ├─ parser.py
│  ├─ uploader.py
├─ __init__.py
├─ __main__.py
├─ upload_concurrent.py
Code that reproduces the error:
/packet/__init__.py
<empty> 
/packet/aggergator.py
from multiprocessing import Queue, Process
import logging
log = logging.getLogger()
class AggregatorProcess(Process):
    def __init__(self, q_in: Queue, q_out: Queue):
        super(AggregatorProcess, self).__init__()
        self.q_in = q_in
        self.q_out = q_out
    def run(self):
        while x := self.q_in.get():
            log.debug(f"Aggregator: {x}")
            self.q_out.put(x)
        log.debug("Aggregator: Done")
        self.q_out.put(None)
/packet/parser.py
from multiprocessing import Queue, Process
import logging
from typing import List
log = logging.getLogger()
class ParserProcess(Process):
    """Threaded version of parser class"""
    def __init__(self, data: List, q_out: Queue):
        super(ParserProcess, self).__init__()
        self.q_out = q_out
        self.data = data
    def run(self):
        for x in self.data:
            log.debug(f"Parser: {x}")
            self.q_out.put(x)
        log.debug("Parser: Done")
        self.q_out.put(None)
/packet/uploader.py
from multiprocessing import Queue, Process
import logging
log = logging.getLogger()
class UploaderProcess(Process):
    def __init__(self, q_in: Queue) -> None:
        super(UploaderProcess, self).__init__()
        self.q_in = q_in
    def run(self):
        while x := self.q_in.get():
            log.debug(f"Uploader: {x}")
        log.debug("Uploader: Done")
/__init__.py
import sys
import click
import logging
from binreader import upload_concurrent
@click.group()
def cli():
    logging.basicConfig(
        format="%(asctime)s [%(processName)-16s]@%(lineno)4d %(levelname)s: %(message)s",
        level=logging.DEBUG,
        handlers=[
            logging.StreamHandler(sys.stdout),
        ],
    )
cli.add_command(upload_concurrent.upload_data_concurrent)
cli()
/__main__.py
<empty>
/upload_concurrent.py
from multiprocessing import Queue
import click
from .packet.aggregator import AggregatorProcess
from .packet.parser import ParserProcess
from .packet.uploader import UploaderProcess
log = logging.getLogger()
@click.command(name="upload-concurrent")
def upload_data_concurrent():
    parser_agg_wq = Queue()
    agg_upl_wq = Queue()
    parser = ParserProcess([1, 2, 3, 4, 5], parser_agg_wq)
    parser.name = type(parser).__name__
    aggregator = AggregatorProcess(parser_agg_wq, agg_upl_wq)
    aggregator.name = type(aggregator).__name__
    uploader = UploaderProcess(agg_upl_wq)
    uploader.name = type(uploader).__name__
    parser.start()
    aggregator.start()
    uploader.start()
    parser.join()
    aggregator.join()
    uploader.join()
I have synchronous code that completes the processing, however it is way too slow at ~1 hr/GB. There is about 1.5TB of data that needs processed every two weeks.
When introducing multiprocessing I am getting the following error once per call to Process.start:
RuntimeError:
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.
        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:
            if __name__ == '__main__':
                freeze_support()
                ...
        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.
This program is run as a module: python -m binreader upload-concurrent
I have read this question, however I am not sure where to add the if __name__ == '__main__': guard. This may not be a viable solution given this is using the click module and I'm unsure what effect that has on how the module starts/runs.
Any guidance is greatly appreciated