Our Rust application appeared to have a memory leak and I've distilled down the issue to the code example below. I still can't see where the problem is.
My expectation is that on the (500,000 + 1)'th message the memory of the application would return to low levels. Instead I observe the following:
- before sending 500,000 messages the memory usage is 124KB
- after sending 500,000 message the memory usage climbs to 27MB
- after sending 500,000 + 1 message the memory usage drops to 15.5MB
After trying many things, I cannot find where the 15.5MB is hiding. The only way to free the memory is to kill the application. Valgrind did not detect any memory leaks. A work around, solution, or point in the right direction would all be much appreciated.
A demo project with the code below can be found here: https://github.com/loriopatrick/mem-help
Notes
- If I remove self.items.push(data);memory usage does not increase so I don't think it's an issue with Sender/Receiver
- Wrapping items: Vec<String>in anArc<Mutex<..>>made no observable memory difference
The task where the memory should be managed
struct Processor {
    items: Vec<String>,
}
impl Processor {
    pub fn new() -> Self {
        Processor {
            items: Vec::new(),
        }
    }
    pub async fn task(mut self, mut receiver: Receiver<String>) {
        while let Some(data) = receiver.next().await {
            self.items.push(data);
            if self.items.len() > 500000 {
                {
                    std::mem::replace(&mut self.items, Vec::new());
                }
                println!("Emptied items array");
            }
        }
        println!("Processor task closing in 5 seconds");
        tokio::time::delay_for(Duration::from_secs(5)).await;
    }
}
Full runnable example
use std::time::Duration;
use tokio::stream::StreamExt;
use tokio::runtime::Runtime;
use tokio::sync::mpsc::{channel, Receiver, Sender};
struct Processor {
    items: Vec<String>,
}
impl Processor {
    pub fn new() -> Self {
        Processor {
            items: Vec::new(),
        }
    }
    pub async fn task(mut self, mut receiver: Receiver<String>) {
        while let Some(data) = receiver.next().await {
            self.items.push(data);
            if self.items.len() > 500000 {
                {
                    std::mem::replace(&mut self.items, Vec::new());
                }
                println!("Emptied items array");
            }
        }
        println!("Processor task closing in 5 seconds");
        tokio::time::delay_for(Duration::from_secs(5)).await;
    }
}
pub fn main() {
    {
        let mut runtime: Runtime = tokio::runtime::Builder::new()
            .threaded_scheduler()
            .core_threads(1)
            .enable_all()
            .build()
            .expect("Failed to build runtime");
        let (mut sender, receiver) = channel(1024);
        let p = Processor::new();
        runtime.spawn(async move {
            println!("Before send, waiting 5 seconds");
            tokio::time::delay_for(Duration::from_secs(5)).await;
            for i in 0..500000 {
                sender.send("Hello".to_string()).await;
            }
            println!("Sent 500,000 items, waiting 5 seconds");
            tokio::time::delay_for(Duration::from_secs(5)).await;
            sender.send("Hello".to_string()).await;
            println!("Send message to clear items");
            tokio::time::delay_for(Duration::from_secs(3)).await;
            println!("Closing sender in 5 seconds");
            tokio::time::delay_for(Duration::from_secs(5)).await;
        });
        runtime.block_on(async move {
            {
                p.task(receiver).await;
            }
            println!("Task is done, waiting 5 seconds");
            tokio::time::delay_for(Duration::from_secs(5)).await;
        });
    }
    println!("Runtime closed, waiting 5 seconds");
    std::thread::sleep(Duration::from_secs(5));
}
Cargo.toml
[package]
name = "mem-help"
version = "0.1.0"
authors = ["Patrick Lorio <dev@plorio.com>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
futures = "0.3.1"
tokio = { version = "0.2.6", features = ["full"] }
