I would like to process a temporal graph (essentially, a list of networkx graphs) in parallel using asynchronous parallelism on a shared memory machine. To achieve it I use Pool.apply_async() from the multiprocessing module. The temporal graph consists of 5 unit (snapshot) graphs. For each unit graph I perform multiple computationally expensive matrix operations.
Consider a simple sequential example first:
#------------------------------------
# Constants
#------------------------------------
NV  = 100    # No. of vertices
NE  =  25    # No. of edges
NG  =   5    # No. of unit graphs
#------------------------------------
# Generate random time-varying graph
#------------------------------------
Gt = gen_time_graph(NV, NE, NG)
# Snapshot index
k = 0
# for each unit graph
for Gk in Gt:
    # Temporal adjacency matrix
    Atk = adj_mtrx(Gk)
    # Temporal weight matrix
    # ...
    # Temporal eigenvector centrality
    # ...
    k += 1
It works flawlessly. Next, I attempt to assign each matrix operation to a worker from a pool:
#------------------------------------
# Constants
#------------------------------------
NV  = 100    # No. of vertices
NE  =  25    # No. of edges
NG  =   5    # No. of unit graphs
NP  =   2    # No. of processes
#------------------------------------
# Generate random time-varying graph
#------------------------------------
Gt = gen_time_graph(NV, NE, NG)
# Snapshot index
k = 0
if __name__ == '__main__':
    with Pool(processes=NP) as pool:
        # for each unit graph
        for Gk in Gt:
    
            # Temporal adjacency matrix
            Atk = pool.apply_async( adj_mtrx, (Gk) ).get()
    
            # Temporal weight matrix
            # ...
            # Temporal eigenvector centrality
            # ...
            k += 1
However, here the code crashes with the following error:
multiprocessing.pool.RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
TypeError: adj_mtrx() takes 1 positional argument but 100 were given
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
  File "./aggr_vs_time_dat_par_mini.py", line 100, in <module>
    Atk = pool.apply_async( adj_mtrx, (Gk) ).get()
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 771, in get
    raise self._value
TypeError: adj_mtrx() takes 1 positional argument but 100 were given
I need help debugging the problem. It seems, the graph Gk is decomposed by the Pool and is passed to the function as a set of vertices. Also I would be grateful, if you could comment on (the appropriatness of) my general parallelisation approach with Pool.apply_async() from multiprocessing.
You may find all the necessary code for the minimal working example below:
import networkx as nx
import random   as rnd
import numpy    as np
from multiprocessing import Pool
# Generates random graph
def gen_rnd_graph(nv, ne):
    
    # Create random list of sources
    Vsrc = [rnd.randint(0,nv-1) for iter in range(ne)]
    
    # Create random list of sinks
    Vsnk = [rnd.randint(0,nv-1) for iter in range(ne)]
    
    # Create random list of edge weights
    U = [rnd.random() for iter in range(ne)]
    
    # Create list of tuples {Vsrc, Vsnk, U}
    T = list(zip(Vsrc,Vsnk,U))
    
    # Create graph
    G = nx.Graph()
    
    # Create list of vertices
    V = list(range(nv))
    
    # Add nodes to graph
    G.add_nodes_from(V)
    
    # Add edges between random vertices with random edge weights
    G.add_weighted_edges_from(T)
    
    return G
# Generates time-varying graph
def gen_time_graph(nv, ne, ng):
    # Initialise list of graphs
    l = []
    for i in range(ng):
        gi = gen_rnd_graph(nv, ne)
        l.append(gi)
    return l
# Computes adjacency matrix for snaphot of time-varying graph
def adj_mtrx(Gk):
    # no. of vertices
    n = Gk.number_of_nodes()
    # adjacency matrix
    Ak = np.zeros([n,n])
    # for each vertex
    for i in range(n):
        for j in range(n):
            if Gk.has_edge(i,j): Ak[i,j] = 1
        
    return Ak
#------------------------------------
# Constants
#------------------------------------
NV  = 100    # No. of vertices
NE  =  25    # No. of edges
NG  =   5    # No. of unit graphs
NP  =   2    # No. of processes
#------------------------------------
# Generate random time-varying graph
#------------------------------------
Gt = gen_time_graph(NV, NE, NG)
# Snapshot index
k = 0
if __name__ == '__main__':
    with Pool(processes=NP) as pool:
        # for each unit graph
        for Gk in Gt:
        
            # Temporal adjacency matrix
            Atk = pool.apply_async( adj_mtrx, (Gk) ).get()
        
            k += 1
 
    