I'm working on a programme which processes a huge json file and does some analysis before inserting into db In the beginning, my prototype of the programme is divided the json file into n parts. Then they runs independently by script:
python data_import.py --start 1 --cluster 6
python data_import.py --start 2 --cluster 6
python data_import.py --start 3 --cluster 6...
The performance is pretty good but it's so annoying to create so many tabs when I have to run this. Therefore, I modified the programme with multi-processing like this.
def main():
    parser = argparse.ArgumentParser(
        formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument('--cluster', type=int, default=5,
                        help='number of clusters')
    parser.add_argument('--total', type=int, default=5701017,
                        help='total number of data')
    parser.add_argument('--json_path', type=str, default="../Data/output.json",
                        help='location of data source')
    args = parser.parse_args()
    manager = Manager()
    work_count = manager.list()
    for i in range(0, args.cluster):
        work_count.append(0)
    p_logger = setup_logger("Workers", 'done' + '_' + str(args.cluster) + '.log',
                            logging.INFO)
    try:
         processes = []
         for c in range(1, args.cluster + 1):
             p = Process(target=update_extractor_result, args=(args, c, work_count, p_logger))
             processes.append(p)
         # Start the processes
         for p in processes:
             p.start()
         # Ensure all processes have finished execution
         for p in processes:
             p.join()          
    except Exception as e:
        print("Error: unable to start thread")
def update_extractor_result(args, num_start, work_count, p_logger):
    logger = setup_logger(__name__, 'error' + str(num_start) + '_' + str(args.cluster) + '.log', logging.ERROR)
    batch = 1
    total_loaded_count = 0
    total = args.total - 1
    total_works = int(total / args.cluster)
    done_count = 0
    startfrom = int(total_works * (num_start - 1))
    endfrom = int(total_works * (num_start))
    json_path = args.json_path
    with open(json_path, 'r', encoding="utf8") as f:
        for line in f:
            try:
                data = json.loads(line)
                if total_loaded_count % 100 == 0:
                    p_logger.info("Workers: " + str(work_count))
                total_loaded_count += 1
                work_count[num_start - 1] += 1
                if total_loaded_count >= startfrom and total_loaded_count <= endfrom:
                    data = data.doAnalysis()
                    insertToDB()
                    print("Done batch " + str(batch) + " - count: " + str(done_count))
                    batch += 1
Comparing to running same programme in many tabs at the same time with multi-processing, if there is 6 clusters, the previous needs 6-8 hours but the other one finishes 1/5 process in 12 hours.
Why are they so much differences? Or my programme has some problems?
