I want to stop asynchronous multiprocessing jobs with KeyboardInterrupt. But sometimes hang occurred when call terminate.
from multiprocessing.pool import ThreadPool
import multiprocessing
import time
import queue
import inspect
def worker(index):
print('{}: start'.format(index))
for i in range(5):
time.sleep(1)
print('{}: stop'.format(index))
return index, True
def wrapper(index, stopEvent, qResult):
if stopEvent.is_set() is True:
return index, False
try:
result = worker(index)
except:
print('*' * 50)
return index, False
else:
if result[1] == True:
qResult.put(result)
return result
def watcher(qResult, stopEvent):
cntQ = 0
while True:
try:
result = qResult.get(timeout=10)
qResult.task_done()
except queue.Empty:
if stopEvent.is_set() is True:
break
except KeyboardInterrupt:
stopEvent.set()
else:
cntQ += 1
print(result)
qResult.join()
qResult.close()
print('qResult count:', cntQ)
def main():
stopEvent = multiprocessing.Event()
qResult = multiprocessing.JoinableQueue()
qResult.cancel_join_thread()
watch = multiprocessing.Process(target=watcher, args=(qResult, stopEvent))
watch.start()
pool = ThreadPool()
lsRet = []
for i in range(100000):
try:
ret = pool.apply_async(wrapper, args=(i, stopEvent, qResult))
lsRet.append(ret)
except KeyboardInterrupt:
stopEvent.set()
time.sleep(1)
break
if i+1 % 10 == 0:
time.sleep(2)
cntTotal = len(lsRet)
cntRet = 0
for ret in lsRet:
if stopEvent.is_set():
break
try:
ret.get()
except KeyboardInterrupt:
stopEvent.set()
time.sleep(1)
else:
cntRet += 1
if stopEvent.is_set() is False:
stopEvent.set()
print(inspect.stack()[0][1:4])
if watch.is_alive() is True:
watch.join()
print(inspect.stack()[0][1:4])
pool.terminate() # Why hang??????????
print(inspect.stack()[0][1:4])
pool.join()
print(cntTotal, cntRet)
if __name__ == '__main__':
main()
main() invokes a watcher() thread and many wrapper() threads asynchronously using multiprocessing.pool.Threadpool.
wrapper() calls worker() and put its result to queue.
watcher() watches above queue of results.
If ctrl-c pressed, stopEvent is set.
When stopEvent is set, wrapper() stops calling worker(), and Watcher() indicates queue.Empty and stopEvent and exits loop.
Finally main() calls terminate() of pool.
Sometimes processes done well, but sometimes hang. It's different each time.