from flask import Flask, request, jsonify
from sentence_transformers import SentenceTransformer
import asyncio
loop = asyncio.get_event_loop()
app = Flask(__name__)
prio_queue = asyncio.PriorityQueue()
# Load the SentenceTransformer model
model = SentenceTransformer('all-MiniLM-L6-v2')
async def embed_task_loop():
    while True:
        # get next item
        prio, p= await prio_queue.get()
        sentences, fut = p
        try:
            # Encode the sentences using the model
            embeddings = model.encode(sentences)
            # Create a dictionary to hold the sentence-embedding pairs
            result = {
                'texts': sentences,
                'embeddings': embeddings.tolist()
            }
            #return jsonify(result), 200
            fut.set_result((jsonify(result), 200))
        except Exception as e:
            #return jsonify(error=str(e)), 500
            fut.set_result((jsonify(error=str(e)), 500))
            
async def add_to_prio_queue(sentences, prio):
    global prio_queue
    # add to prio queue always if prio is one, if prio is zero add only if prio queue is not larger than 10
    if prio == 1 or (prio == 0 and prio_queue.qsize() < 10):
        fut=loop.create_future()
        package = (prio, (sentences, fut))
        prio_queue.put_nowait(package)
    else:
        fut.set_result((jsonify(error='Too many requests'), 429))
    return await fut
@app.route('/embed', methods=['POST'])
def embed_sentences():
    # Get the list of sentences from the request body
    data = request.get_json(force=True)
    sentences = data.get('texts', [])
    prio= data.get('prio', 0)
    if not sentences:
        return jsonify(error='No sentences provided'), 400
    result = loop.run_until_complete(add_to_prio_queue(sentences, prio))
    return result
 
if __name__ == '__main__':
    # start the embed task loop
    with app.app_context():
        loop.create_task(embed_task_loop())
    app.run()
I have this code that contains two parts, an api that takes in sentences to be embedded (you only need to know that that is a process that takes a while) and adds them to a priority queue. High priority tasks are always processed first and low priority tasks may be rejected. The embedding threads work simultaneously and enqueuing an embedding task will return a future that can be awaited. Unfortunately, Flask and asyncio really don't like each other and so if I try to use await instead of loop.run_until_complete I get this error:
RuntimeError: Task <Task pending name='Task-9' coro=<AsyncToSync.main_wrap() running at /home/user/miniconda3/envs/tf/lib/python3.9/site-packages/asgiref/sync.py:353> cb=[_run_until_complete_cb() at /home/user/miniconda3/envs/tf/lib/python3.9/asyncio/base_events.py:184]> got Future <Future pending> attached to a different loop
with loop.run_until_complete It sort of kind of works but it regularly gives me errors like these (most likely because the threads start to overlap):
Traceback (most recent call last):
  File "/home/user/miniconda3/envs/tf/lib/python3.9/site-packages/flask/app.py", line 2529, in wsgi_app
    response = self.full_dispatch_request()
  File "/home/user/miniconda3/envs/tf/lib/python3.9/site-packages/flask/app.py", line 1825, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/home/user/miniconda3/envs/tf/lib/python3.9/site-packages/flask/app.py", line 1823, in full_dispatch_request
    rv = self.dispatch_request()
  File "/home/user/miniconda3/envs/tf/lib/python3.9/site-packages/flask/app.py", line 1799, in dispatch_request
    return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)
  File "/home/user/projects/vector-crawler/backend/src/embed.py", line 78, in embed_sentences
    result = loop.run_until_complete(add_to_prio_queue(sentences, prio))
  File "/home/user/miniconda3/envs/tf/lib/python3.9/asyncio/base_events.py", line 623, in run_until_complete
    self._check_running()
  File "/home/user/miniconda3/envs/tf/lib/python3.9/asyncio/base_events.py", line 583, in _check_running
    raise RuntimeError('This event loop is already running')
RuntimeError: This event loop is already running
It also crashes pretty hard on exit, although thats not that big of a deal So is there a clean way to handle and await promises in Flask? Or is there another way to solve this?
