I'm trying to implement a redis queue in the current system. The job will be sent to another module and it should wait until the job is done and returned the result, job.result, then move on:
with Connection(redis_connection):
job = job_queue.enqueue(worker_func, func_input1, func_input2)
print("waiting for result")
print(datetime.datetime.now())
while job.result is None:
pass
print(datetime.datetime.now())
print("got result")
# next step
next_step_func(job.result)
...
I'm facing 2 issues here:
- the busy wait,
while job.result is Noneis taking a very long time. My processing in theworker_funcis about 2-3 seconds, which involves calling APIs on another server, yet the busy waitwhile job.result is Noneitself takes another >= 3 seconds, adding to >= 5 seconds in total. I am positive that the waiting takes place after the execution ofwhile job.result is Nonebecause I added logs for bothworker_funcandwhile job.result is None:
2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT start work
2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT 2021-07-12 10:57:57.601189
2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT 2021-07-12 10:57:59.075137
2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT end work
...
2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT waiting for result
2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT 2021-07-12 10:57:53.704891
2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT 2021-07-12 10:57:59.096009
as you can see above, the busy wait while loop happens after the worker_func is done.
2, is there any other elegant way to implement this synchronous wait here instead of a busy loop? I think the busy loop here is definitely not the best implementation as it will consume a lot of CPU resources.
Thanks!
-- editing my code above to give a clearer context
I need to value of next_step_func(job.result) to be returned from where job_queue.enqueue is called. so a clearer structure would be:
def endpoint():
with Connection(redis_connection):
job = job_queue.enqueue(worker_func, func_input1, func_input2)
print("waiting for result")
print(datetime.datetime.now())
while job.result is None:
pass
print(datetime.datetime.now())
print("got result")
# next step
return next_step_func(job.result)
...
so the painpoint is that I need the job.result able to be returned in endpoint(), yet the Job Callback will take my job to a different context at on_success.