In convert_to_xml, the process = subprocess.Popen(...) statements spawns a latexml subprocess.
Without a blocking call such as process.communicate(), the convert_to_xml ends even while latexml continues to run in the background.
Since convert_to_xml ends, the Pool sends the associated worker process another task to run and so convert_to_xml is called again.
Once again another latexml process is spawned in the background.
Pretty soon, you are up to your eyeballs in latexml processes and the resource limit on the number of open files is reached.
The fix is easy: add process.communicate() to tell convert_to_xml to wait until the latexml process has finished.
try:
process = subprocess.Popen(['latexml', '--dest=' + outpath, inpath],
stderr=subprocess.PIPE,
stdout=subprocess.PIPE)
process.communicate()
except Exception as error:
process.kill()
message = "error: %s run(*%r, **%r)" % (e, args, kwargs)
print(message)
else: # use else so that this won't run if there is an Exception
message = '{}: Converted!'.format(inpath)
print(message)
Regarding if __name__ == '__main__':
As martineau pointed out, there is a warning in the multiprocessing docs that
code that spawns new processes should not be called at the top level of a module.
Instead, the code should be contained inside a if __name__ == '__main__' statement.
In Linux, nothing terrible happens if you disregard this warning.
But in Windows, the code "fork-bombs". Or more accurately, the code
causes an unmitigated chain of subprocesses to be spawned, because on Windows fork is simulated by spawning a new Python process which then imports the calling script. Every import spawns a new Python process. Every Python process tries to import the calling script. The cycle is not broken until all resources are consumed.
So to be nice to our Windows-fork-bereft brethren, use
if __name__ == '__main__:
start()
Sometimes processes require a lot of memory. The only reliable way to free memory is to terminate the process. maxtasksperchild=1 tells the pool to terminate each worker process after it completes 1 task. It then spawns a new worker process to handle another task (if there are any). This frees the (memory) resources the original worker may have allocated which could not otherwise have been freed.
In your situation it does not look like the worker process is going to require much memory, so you probably don't need maxtasksperchild=1.
In convert_to_xml, the process = subprocess.Popen(...) statements spawns a latexml subprocess.
Without a blocking call such as process.communicate(), the convert_to_xml ends even while latexml continues to run in the background.
Since convert_to_xml ends, the Pool sends the associated worker process another task to run and so convert_to_xml is called again.
Once again another latexml process is spawned in the background.
Pretty soon, you are up to your eyeballs in latexml processes and the resource limit on the number of open files is reached.
The fix is easy: add process.communicate() to tell convert_to_xml to wait until the latexml process has finished.
try:
process = subprocess.Popen(['latexml', '--dest=' + outpath, inpath],
stderr=subprocess.PIPE,
stdout=subprocess.PIPE)
process.communicate()
except Exception as error:
process.kill()
message = "error: %s run(*%r, **%r)" % (e, args, kwargs)
print(message)
else: # use else so that this won't run if there is an Exception
message = '{}: Converted!'.format(inpath)
print(message)
The chunksize affects how many tasks a worker performs before sending the result back to the main process.
Sometimes this can affect performance, especially if interprocess communication is a signficant portion of overall runtime.
In your situation, convert_to_xml takes a relatively long time (assuming we wait until latexml finishes) and it simply returns None. So interprocess communication probably isn't a significant portion of overall runtime. Therefore, I don't expect you would find a significant change in performance in this case (though it never hurts to experiment!).
In plain Python, map should not be used just to call a function multiple times.
For a similar stylistic reason, I would reserve using the pool.*map* methods for situations where I cared about the return values.
So instead of
for _ in pool.imap_unordered(convert_to_xml, preprints, chunksize=5):
pass
you might consider using
for preprint in preprints:
pool.apply_async(convert_to_xml, args=(preprint, ))
instead.
The iterable passed to any of the pool.*map* functions is consumed
immediately. It doesn't matter if the iterable is an iterator. There is no
special memory benefit to using an iterator here. imap_unordered returns an
iterator, but it does not handle its input in any especially iterator-friendly
way.
No matter what type of iterable you pass, upon calling the pool.*map* function the iterable is
consumed and turned into tasks which are put into a task queue.
Here is code which corroborates this claim:
version1.py:
import multiprocessing as mp
import time
def foo(x):
time.sleep(0.1)
return x * x
def gen():
for x in range(1000):
if x % 100 == 0:
print('Got here')
yield x
def start():
pool = mp.Pool()
for item in pool.imap_unordered(foo, gen()):
pass
pool.close()
pool.join()
if __name__ == '__main__':
start()
version2.py:
import multiprocessing as mp
import time
def foo(x):
time.sleep(0.1)
return x * x
def gen():
for x in range(1000):
if x % 100 == 0:
print('Got here')
yield x
def start():
pool = mp.Pool()
for item in gen():
result = pool.apply_async(foo, args=(item, ))
pool.close()
pool.join()
if __name__ == '__main__':
start()
Running version1.py and version2.py both produce the same result.
Got here
Got here
Got here
Got here
Got here
Got here
Got here
Got here
Got here
Got here
Crucially, you will notice that Got here is printed 10 times very quickly at
the beginning of the run, and then there is a long pause (while the calculation
is done) before the program ends.
If the generator gen() were somehow consumed slowly by pool.imap_unordered,
we should expect Got here to be printed slowly as well. Since Got here is
printed 10 times and quickly, we can see that the iterable gen() is being
completely consumed well before the tasks are completed.
Running these programs should hopefully give you confidence that
pool.imap_unordered and pool.apply_async are putting tasks in the queue
essentially in the same way: immediate after the call is made.