I would like to run concurrently a simple function that writes the output of a process into a txt.file and then stores it to DBFS (Databricks filesystem). In my example I use both the ThreadPoolExecutor class() and the ProcessPoolExecutor class() although the ThreadPoolExecutor class runs successfully while the second class generates a pickling error. I would like to run my function with both classes. How can I resolve the PicklingError?
Please find below the code I run to replicate my issue,
If you run it locally and not in a databricks cluster
from pyspark.sql import SparkSession
spark =  SparkSession.builder.appName("test").getOrCreate()
sc = spark.sparkContext
Create spark df and arguments
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import copyreg as copy_reg
import types
from itertools import cycle
from datetime import datetime, timedelta
import time
import os
import pandas as pd
date_format = '%Y-%m-%d %H-%M-%S'
timestamp_snapshot=datetime.utcnow()
timestamp_snap=timestamp_snapshot.strftime(date_format)
pandas_df = pd.DataFrame({  'id' : ['001', '001', '001', '001', '001', '002', '002', '002', '002', '002'],
                            'PoweredOn':[0, 0, 0, 1, 0, 0, 0, 1, 0, 0]
                        })
spark_df=spark.createDataFrame(pandas_df)
device_ids=list(pandas_df['id'].unique())
location=range(1, len(device_ids)+1, 1)
devices_total_number=len(device_ids)
Approach 1 | Using ThreadPoolExecutor class - Works perfectly
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
if __name__ == "__main__":
    
    #main function
    def testing_function_map(iterables_tuple):
        print("{0}: START EXECUTION PLAN OF ASSET ID {1}: {2}/{3}".format(datetime.utcnow().strftime(date_format), iterables_tuple[0], str(iterables_tuple[1]), iterables_tuple[2]))
        filtered_dataset=iterables_tuple[4].where(iterables_tuple[4].id.isin([iterables_tuple[0]]))
        filtered_dataset.groupBy('PoweredOn').count()
        message_list=filtered_dataset.groupBy('PoweredOn', 'id').count().collect()
        filename='message_{0}_{1}.txt'.format(iterables_tuple[0], iterables_tuple[3])
        with open(os.path.join(os.getcwd(),filename), 'w') as file:
            file.writelines("Number of Powered on devices for asset id {0}: {1} & ".format(iterables_tuple[0], message_list[1][2]))
            file.writelines("Number of Powered off devices for asset id {0}: {1}".format(iterables_tuple[0], message_list[0][2]))
        print("Data saved successfully in dbfs!\n")
        print("{0}: FINSIH EXECUTION PLAN OF ASSET ID {1}: {2}/{3}".format(datetime.utcnow().strftime(date_format), iterables_tuple[0], str(iterables_tuple[1]), len(device_ids)))
    
    #wait function
    def wait_on_device(iterables_tuple):
        time.sleep(1)
        testing_function_map(iterables_tuple)
    
    executor = ThreadPoolExecutor(max_workers=2)
#     executor = ProcessPoolExecutor(max_workers=2)
    tasks=[*zip(device_ids, location, cycle([str(devices_total_number)]), cycle([timestamp_snap]), cycle([spark_df]))]
    
    list(executor.map(wait_on_device, tasks))
Approach 2 | Using ProcessPoolExecutor class - Generates pickling Error for the wait_on_device() function
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
if __name__ == "__main__":
    def testing_function_map(iterables_tuple):
        print("{0}: START EXECUTION PLAN OF ASSET ID {1}: {2}/{3}".format(datetime.utcnow().strftime(date_format), iterables_tuple[0], str(iterables_tuple[1]), iterables_tuple[2]))
        filtered_dataset=iterables_tuple[4].where(iterables_tuple[4].id.isin([iterables_tuple[0]]))
        filtered_dataset.groupBy('PoweredOn').count()
        message_list=filtered_dataset.groupBy('PoweredOn', 'id').count().collect()
        filename='message_{0}_{1}.txt'.format(iterables_tuple[0], iterables_tuple[3])
        with open(os.path.join(os.getcwd(),filename), 'w') as file:
            file.writelines("Number of Powered on devices for asset id {0}: {1} & ".format(iterables_tuple[0], message_list[1][2]))
            file.writelines("Number of Powered off devices for asset id {0}: {1}".format(iterables_tuple[0], message_list[0][2]))
        print("Data saved successfully in dbfs!\n")
        print("{0}: FINSIH EXECUTION PLAN OF ASSET ID {1}: {2}/{3}".format(datetime.utcnow().strftime(date_format), iterables_tuple[0], str(iterables_tuple[1]), len(device_ids)))
    def wait_on_device(iterables_tuple):
        time.sleep(1)
        testing_function_map(iterables_tuple)
    
#     executor = ThreadPoolExecutor(max_workers=2)
    executor = ProcessPoolExecutor(max_workers=2)
    tasks=[*zip(device_ids, location, cycle([str(devices_total_number)]), cycle([timestamp_snap]), cycle([spark_df]))]
    
    list(executor.map(wait_on_device, tasks))
With the ProcessPoolExecutor class I get a PicklingError:

In general testing this application of the ProcessPoolExecutor, it keeps giving me a pickle Error on the function wait_on_device()
How can I resolve the pickling error? I have search for various approaches like making a global call of the main function using a class or by creating a function with the import copyreg as copy_reg although none of them could resolve my problem, probably because I don't create them correctly.
My approach so far 
As presented here by @Steven Bethard
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import copyreg as copy_reg
import types
def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    return _unpickle_method, (func_name, obj, cls)
def _unpickle_method(func_name, obj, cls):
    for cls in cls.mro():
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)
copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)
    
if __name__ == "__main__":
# The rest of my code already presented above
But the PicklingError still exists.
[UPDATE]---The above PicklingError generated when I run the code on Databricks...Running the same code locally on my machine in Jupyter Notebook I got the following error with the ProcessPoolExecutor only,
Other related questions I have search yet couldn't apply their solutions.
Related question 1
Related question 2 
Related question 3 
Related question 4


