I read this question 2 days ago and couldn't shake it. There are multiple concepts in play here, and some (if not all) are quite complex. I created a prototype for myself to fully understand what was happening, and this is a working example. I added many comments to explain what or why something is happening.
Couple of pointers though right of the bat:
- An asyncio.Queueis not thread or process safe. That means, you can (and probably will) get a corrupt state when sharing such an object across processes. Such a Queue is good for sharing state acrossTasks, as they al run on the same thread in the event loop.
- multiprocessing.Queueis thread and process safe, but you will need a- Manager()to handle the specifics. It essentially creates another subprocess to handle all communication with the Queue (from other processes).
- Make sure that your code is not blocking other requests. In my example below, I use asyncio.sleep()to yield control back to the event loop, to allow other tasks in the event loop to continue processing. If I hadn't, it would block the current task in the infinite while loop.
I tested the below with 4 concurrent requests (I used wscat for that from the command line). Please note that I am in no way an expert in asyncio or multiprocessing, so I am not claiming that these are best practices.
import asyncio
from concurrent.futures import ProcessPoolExecutor
import multiprocessing as mp
from queue import Empty
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import time
app = FastAPI()
#do not re-create the pool with every request, only create it once
pool = ProcessPoolExecutor()
def long_running_task(q: mp.Queue) -> str:
    # This would be your update_cicada function
    for i in range(5):
        #this represents some blocking IO
        time.sleep(3)
        q.put(f"'result': 'Iteration {i}'")
    return "done!"
@app.websocket_route("/ws")
async def websocket_endpoint(websocket: WebSocket):
    loop = asyncio.get_event_loop()
    
    #To use Queue's across processes, you need to use the mp.Manager()
    m = mp.Manager()
    q = m.Queue()
    
    await websocket.accept()
    
    #run_in_executor will return a Future object. Normally, you would await
    #such an method but we want a bit more control over it. 
    result = loop.run_in_executor(pool, long_running_task, q)
    while True:
        
        #None of the coroutines called in this block (e.g. send_json()) 
        # will yield back control. asyncio.sleep() does, and so it will allow
        # the event loop to switch context and serve multiple requests 
        # concurrently.
        await asyncio.sleep(0)
        try:
            #see if our long running task has some intermediate result.
            # Will result None if there isn't any.
            q_result = q.get(block=False)
        except Empty:
            #if q.get() throws Empty exception, then nothing was 
            # available (yet!).
            q_result = None
        #If there is an intermediate result, let's send it to the client.
        if q_result:
            try:
                await websocket.send_json(q_result)
            except WebSocketDisconnect:
                #This happens if client has moved on, we should stop the long
                #  running task
                result.cancel()
                #break out of the while loop.
                break
        
        #We want to stop the connection when the long running task is done.
        if result.done():
            try:
                await websocket.send_json(result.result())
                await websocket.close()  
            except WebSocketDisconnect:
                #This happens if client has moved on, we should stop the long
                #  running task
                result.cancel()
            finally:
                #Make sure we break out of the infinte While loop.
                break
            
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000,  )