I am experimenting with tokio runtime in rust and making a web crawler.
Right now, basically, I am spawning a separate task for each link to crawl since parallelism seems to be better than simple concurrency after a limit. All the communication is being done through channels, and there is no function return. So I am just wondering what might be a good way to add a limit to tasks spawned?
I was using the method given in this answer when I was using function returns, but it simply does not seem too useful with channels.
I also thought about simply taking a limited number of to_crawl links but that makes it difficult to keep track of depth.
I can't just move the channel receiving part to a separate task since I am currently using mutable HashSets for keeping track of links and that might get messy with mutex locking and unlocking.
Anyway, I would also love to hear other peoples thoughts over whether channels should even be used over function returns in a program like this coz I pretty much decided to do that on a whim after hearing this talk by Rich Hickey
The main crawler is something like this:
pub async fn crawl_with_depth(
    origin_url: Link,
    crawl_depth: usize,
    whitelist: Option<HashSet<String>>,
    blacklist: Option<HashSet<String>>,
    tx_output: mpsc::Sender<Link>,
    task_limit: usize,
) {
    let mut to_crawl: HashSet<Url> = HashSet::new();
    let mut crawled: HashSet<Url> = HashSet::new();
    let mut dont_crawl: HashSet<Url> = HashSet::new();
    let client = reqwest::Client::new();
    to_crawl.insert(origin_url.url);
    for _ in 0..crawl_depth {
        println!("Crawling {} URls", to_crawl.len());
        let (tx_cralwer, mut rx_crawler) = mpsc::channel::<Link>(task_limit);
        stream::iter(to_crawl.clone())
            .for_each(|x| async {
                let tx_clone = tx_cralwer.clone();
                let client_clone = client.clone();
                tokio::spawn(async move { crawl_page(x, client_clone, tx_clone).await });
            })
            .await;
        to_crawl.clear();
        drop(tx_cralwer);
        while let Some(link) = rx_crawler.recv().await {
            if link.crawled {
                crawled.insert(link.url.clone());
                /// tx channel basically is for io handling and stuff in a separate task.
                if let Err(_) = tx_output.send(link).await {
                    return;
                }
            } else {
                let should_crawl = link.should_crawl(&whitelist, &blacklist);
                if should_crawl && !crawled.contains(&link.url) {
                    to_crawl.insert(link.url);
                } else if !should_crawl && !dont_crawl.contains(&link.url) {
                    dont_crawl.insert(link.url.clone());
                    if let Err(_) = tx_output.send(link).await {
                        return;
                    }
                }
            }
        }
    }
}
The whole program can be found here if someone needs it.
