My main goal is to write an API Server, which retrieves part of the information from another external API server. However, this API server is quite fragile, therefore I would like to limit the global amount of concurrent requests made to those external API Servers for example to 10 or 20.
Thus, my idea was to write something a HttpPool, which consumes task via a crossbeam bounded channels and distributes them among tokio tasks. The ideas was to use a bounded channel to avoid publishing to much work and use a set of tasks to limit the amount of request per external API call.
It deems to work, if I do not create more than 8 tasks. If I define more, it blocks after fetching the first tasks from the queue.
use std::{error::Error, result::Result};
use tokio::sync::oneshot::Sender;
use tokio::time::timeout;
use tokio::time::{sleep, Duration};
use crossbeam_channel;
#[derive(Debug)]
struct HttpTaskRequest {
    url: String,
    result: Sender<String>,
}
type PoolSender = crossbeam_channel::Sender<HttpTaskRequest>;
type PoolReceiver = crossbeam_channel::Receiver<HttpTaskRequest>;
#[derive(Debug)]
struct HttpPool {
    size: i32,
    sender: PoolSender,
    receiver: PoolReceiver,
}
impl HttpPool {
    fn new(capacity: i32) -> Self {
        let (tx, rx) = crossbeam_channel::bounded::<HttpTaskRequest>(capacity as usize);
        HttpPool {
            size: capacity,
            sender: tx,
            receiver: rx,
        }
    }
    async fn start(self) -> Result<HttpPool, Box<dyn Error>> {
        for i in 0..self.size {
            let task_receiver = self.receiver.clone();
            tokio::spawn(async move {
                loop {
                    match task_receiver.recv() {
                        Ok(request) => {
                            if request.result.is_closed() {
                                println!("Task[{i}] received url {} already closed by receiver, seems to reach timeout already", request.url);
                            } else {
                                println!("Task[{i}] started to work {:?}", request.url);
                                let resp = reqwest::get("https://httpbin.org/ip").await;
                                println!("Resp: {:?}", resp);
                                println!("Done Send request for url {}", request.url);
                                request.result.send("Result".to_owned()).expect("Failed to send result");
                            }
                        }
                        Err(err) => println!("Error: {err}"),
                    }
                }
            });
        }
        Ok(self)
    }
    pub async fn request(&self, url: String) -> Result<(), Box<dyn Error>> {
        let (os_sender, os_receiver) = tokio::sync::oneshot::channel::<String>();
        let request = HttpTaskRequest {
            result: os_sender,
            url: url.clone(),
        };
        self.sender.send(request).expect("Failed to publish message to task group");
        // check if a timeout or value was returned
        match timeout(Duration::from_millis(100), os_receiver).await {
            Ok(res) => {
                println!("Request finished without reaching the timeout {}",res.unwrap());
            }
            Err(_) => {println!("Request {url} run into timeout");}
        }
        Ok(())
    }
}
#[tokio::main]
async fn main() {
    let http_pool = HttpPool::new(20).start().await.expect("Failed to start http pool");
    for i in 0..10 {
        let url = format!("T{}", i.to_string());
        http_pool.request(url).await.expect("Failed to request message");
    }
    loop {}
}
Maybe somebody can explain, why the code blocks? Is it related to the tokio::spawn?
I guess my attempt is wrong, so please let me know if there is another way to handle it. The goal can be summarized like this. I would like to requests URLs and process them in a fashion, that not more than N concurrent requests are made against the API server.
I have read this question: How can I perform parallel asynchronous HTTP GET requests with reqwest?. But here, this answer, I do know the work, which is not the case in my example. They arrive on the fly, hence I am not sure how to handle them.
 
     
    