I have small program that takes the links from a text file, pass those links to backend system at ImportIO, and save the results to a CSV. However I am seeing following errors after 15,20 min of run. I encounter two exception whichever comes first 1. System.OutOfMemoryException OR 2. System.NUllReferenceException Both of these are however I feel my fault somewhere in the code. I am not an expert but I tried to use timer, or closing the files, or even setting objects to null. None worked or even using ArgumentNullException. I ran the code analysis and it suggested that I should Idispose by this error.
CA1001 Types that own disposable fields should be disposable Implement IDisposable on 'ImportIO' because it creates members of the following IDisposable types: 'BlockingCollection>'. Ostock Main.cs 232
My code is as followed, I am not including importIO class it is long. I think solution is easy but I am just not on right path. Could you guys please help?
namespace MinimalCometLibrary
{
    class Program
    {
        private static CountdownEvent countdownLatch;
        static void Main(string[] args)
        {            
            string[] lines = File.ReadAllLines(@"C:\Users\James\Desktop\Exper\Input_Links\Stock_links.txt");
            for (int i = 0; i < lines.Length; i++)
            {                
                string[] line = lines[i].Split(new string[] { "\t" }, StringSplitOptions.RemoveEmptyEntries);
                for (int j = 0; j < line.Length; j++)
                {
                 ImportIO io = new ImportIO("https://query.import.io", Guid.Parse("sdasd-asdasd-NoReal-3easdecb"), "NoReal=");
                /* Time Starts
                    Stopwatch sw = new Stopwatch(); // sw cotructor
                    sw.Start(); // starts the stopwatch
                    for (int b = 0; ; b++)
                    {
                        if (b % 1000 == 0) // if in 100000th iteration (could be any other large number
                        // depending on how often you want the time to be checked) 
                        {
                            sw.Stop(); // stop the time measurement
                            if (sw.ElapsedMilliseconds > 25) // check if desired period of time has elapsed
                            {
                                break; // if more than 5000 milliseconds have passed, stop looping and return
                                // to the existing code
                            }
                            else
                            {
                                sw.Start(); // if less than 5000 milliseconds have elapsed, continue looping
                                // and resume time measurement
                            }
                        }
                    }
                //Time Ends 
                 */   
                    io.Connect();
                    countdownLatch = new CountdownEvent(1);
                    // Query for tile SamsClub_Extractor, line[j]
                    Dictionary<String, Object> query1 = new Dictionary<string, object>();
                    query1.Add("input", new Dictionary<String, String>() { { "webpage/url", line[j] } });
                    query1.Add("connectorGuids", new List<String>() { "189f34f3-0f82-4abb-8fbc-f353f35a255a" });
                    io.DoQuery(query1, HandleQuery);
                    countdownLatch.Wait(); 
                    io.Disconnect();
                }        
            }
            Environment.Exit(0);
        }
       private static void HandleQuery(Query query, Dictionary<String, Object> message)
        {
            if (message["type"].Equals("MESSAGE"))
            {
                Console.WriteLine("Got data!");
                string JSON = JsonConvert.SerializeObject(message["data"]);
                //Deserialize to strongly typed class i.e., RootObject
                RootObject obj = JsonConvert.DeserializeObject<RootObject>(JSON);
        // handle null reference
                if (obj == null) { throw new ArgumentNullException("PleaseKeepRunning"); }
                //loop through the list and write to CSV file
                foreach (Result resultsItem in obj.results)
                {
                    Console.WriteLine(resultsItem.itemnbr + "-" + resultsItem.price +
                           "-" + resultsItem.product_name + "_" + obj.pageUrl);                   
                    string filePath = @"C:\Users\James\Desktop\Exper\Output_Files\StockPrice_NOW.txt";
                    //checking if file already exists, if not, create it:
                    if (!File.Exists(filePath))
                    {
                        FileStream fs = new FileStream(filePath, FileMode.CreateNew);
                        fs.Close();
                    }
                    //writing to a file (appending text):
                    using (FileStream fs = new FileStream(filePath, FileMode.Append, FileAccess.Write))
                    {
                        using (TextWriter tw = new StreamWriter(fs))
                            tw.WriteLine(resultsItem.itemnbr + "\t" + resultsItem.price + "\t" + resultsItem.product_name + "\t" + resultsItem.misc + 
                           "\t" +  resultsItem.qty + "\t" + obj.pageUrl);
                        fs.Close();
                    }
                    //Set object to null
                    obj = null;
                    obj.results = null;
                }
            }
            if (query.isFinished) countdownLatch.Signal();
        }        
   }    
//root Object
    public class Result
{
    public double price { get; set; }    
    public string itemnbr { get; set; }
    public string product_name { get; set; }
    public string qty { get; set; }
    public string misc { get; set; }
}
public class RootObject
{
    public List<string> cookies { get; set; }
    public List<Result> results { get; set; }
    public string pageUrl { get; set; }
    public string connectorGuid { get; set; }
    public string connectorVersionGuid { get; set; }
    public int offset { get; set; }
}
Please excuse my limited knowledge in .net. I am totally new to it. :) Thanks
---- Edit I used dispose and using as suggested but I am still facing the error. I am seeing error exception and debugger highlight this code of line in importIO.
new Thread(new ThreadStart(PollQueue)).Start();
I also observe that stock.vshost.exe *32 also keep increasing memory and throw out of memory exception at any time after 70MB or something. I am including the importIO class code
class ImportIO 
    {
        private String host { get; set; }
        private int port { get; set; }
        private Guid userGuid;
        private String apiKey;
        private static String messagingChannel = "/messaging";
        private String url;
        private int msgId = 0;
        private String clientId;
        private Boolean isConnected;
        CookieContainer cookieContainer = new CookieContainer();
        Dictionary<Guid, Query> queries = new Dictionary<Guid, Query>();
        private BlockingCollection<Dictionary<String, Object>> messageQueue = new BlockingCollection<Dictionary<string, object>>();
        public ImportIO(String host = "http://query.import.io", Guid userGuid = default(Guid), String apiKey = null)
        {
            this.userGuid = userGuid;
            this.apiKey = apiKey;
            this.url = host + "/query/comet/";
            clientId = null;
        }
        public void Login(String username, String password, String host = "http://api.import.io")
        {
            Console.WriteLine("Logging in");
            String loginParams = "username=" + HttpUtility.UrlEncode(username) + "&password=" + HttpUtility.UrlEncode(password);
            String searchUrl = host + "/auth/login";
            HttpWebRequest loginRequest = (HttpWebRequest)WebRequest.Create(searchUrl);
            loginRequest.Method = "POST";
            loginRequest.ContentType = "application/x-www-form-urlencoded";
            loginRequest.ContentLength = loginParams.Length;
            loginRequest.CookieContainer = cookieContainer;
            using (Stream dataStream = loginRequest.GetRequestStream())
            {
                dataStream.Write(System.Text.UTF8Encoding.UTF8.GetBytes(loginParams), 0, loginParams.Length);
                HttpWebResponse loginResponse = (HttpWebResponse)loginRequest.GetResponse();
                if (loginResponse.StatusCode != HttpStatusCode.OK)
                {
                    throw new Exception("Could not log in, code:" + loginResponse.StatusCode);
                }
                else
                {
                    foreach (Cookie cookie in loginResponse.Cookies)
                    {
                        if (cookie.Name.Equals("AUTH"))
                        {
                            // Login was successful
                            Console.WriteLine("Login Successful");
                        }
                    }
                }
            }
        }
        public List<Dictionary<String, Object>> Request(String channel, Dictionary<String, Object> data = null, String path = "", Boolean doThrow = true)
        {
            Dictionary<String, Object> dataPacket = new Dictionary<String, Object>();
            dataPacket.Add("channel", channel);
            dataPacket.Add("connectionType", "long-polling");
            dataPacket.Add("id", (msgId++).ToString());
            if (this.clientId != null)
                dataPacket.Add("clientId", this.clientId);
            if (data != null)
            {
                foreach (KeyValuePair<String, Object> entry in data)
                {
                    dataPacket.Add(entry.Key, entry.Value);
                }
            }
            String url = this.url + path;
            if (apiKey != null)
            {
                url += "?_user=" + HttpUtility.UrlEncode(userGuid.ToString()) + "&_apikey=" + HttpUtility.UrlEncode(apiKey);
            }
            HttpWebRequest request = (HttpWebRequest)WebRequest.Create(url);
            request.AutomaticDecompression = DecompressionMethods.GZip;
            request.Method = "POST";
            request.ContentType = "application/json;charset=UTF-8";
            request.Headers.Add(HttpRequestHeader.AcceptEncoding, "gzip");
            String dataJson = JsonConvert.SerializeObject(new List<Object>() { dataPacket });
            request.ContentLength = dataJson.Length;
            request.CookieContainer = cookieContainer;
            using (Stream dataStream = request.GetRequestStream())
            {
                dataStream.Write(System.Text.UTF8Encoding.UTF8.GetBytes(dataJson), 0, dataJson.Length);
                try
                {
                    HttpWebResponse response = (HttpWebResponse)request.GetResponse();
                    using (StreamReader responseStream = new StreamReader(response.GetResponseStream()))
                    {
                        String responseJson = responseStream.ReadToEnd();
                        List<Dictionary<String, Object>> responseList = JsonConvert.DeserializeObject<List<Dictionary<String, Object>>>(responseJson);
                        foreach (Dictionary<String, Object> responseDict in responseList)
                        {
                            if (responseDict.ContainsKey("successful") && (bool)responseDict["successful"] != true)
                            {
                                if (doThrow)
                                    throw new Exception("Unsucessful request");
                            }
                            if (!responseDict["channel"].Equals(messagingChannel)) continue;
                            if (responseDict.ContainsKey("data"))
                            {
                                messageQueue.Add(((Newtonsoft.Json.Linq.JObject)responseDict["data"]).ToObject<Dictionary<String, Object>>());
                            }
                        }
                        return responseList;
                    }
                }
                catch (Exception e)
                {
                    Console.WriteLine("Error occurred {0}", e.Message);
                    return new List<Dictionary<String, Object>>();
                }
            }
        }
        public void Handshake()
        {
            Dictionary<String, Object> handshakeData = new Dictionary<String, Object>();
            handshakeData.Add("version", "1.0");
            handshakeData.Add("minimumVersion", "0.9");
            handshakeData.Add("supportedConnectionTypes", new List<String> { "long-polling" });
            handshakeData.Add("advice", new Dictionary<String, int>() { { "timeout", 60000 }, { "interval", 0 } });
            List<Dictionary<String, Object>> responseList = Request("/meta/handshake", handshakeData, "handshake");
            clientId = (String)responseList[0]["clientId"];
        }
        public void Connect()
        {
            if (isConnected)
            {
                return;
            }
            Handshake();
            Dictionary<String, Object> subscribeData = new Dictionary<string, object>();
            subscribeData.Add("subscription", messagingChannel);
            Request("/meta/subscribe", subscribeData);
            isConnected = true;
            new Thread(new ThreadStart(Poll)).Start();
            new Thread(new ThreadStart(PollQueue)).Start();
        }
        public void Disconnect()
        {
            Request("/meta/disconnect", null, "", true);
            isConnected = false;
        }
        private void Poll()
        {
            while (isConnected)
            {
                Request("/meta/connect", null, "connect", false);
            }
        }
        private void PollQueue()
        {
            while (isConnected)
            {
                ProcessMessage(messageQueue.Take());
            }
        }
        private void ProcessMessage(Dictionary<String, Object> data)
        {
            Guid requestId = Guid.Parse((String)data["requestId"]);
            Query query = queries[requestId];
            query.OnMessage(data);
            if (query.isFinished)
            {
                queries.Remove(requestId);
            }
        }
        public void DoQuery(Dictionary<String, Object> query, QueryHandler queryHandler)
        {
            Guid requestId = Guid.NewGuid();
            queries.Add(requestId, new Query(query, queryHandler));
            query.Add("requestId", requestId);
            Request("/service/query", new Dictionary<String, Object>() { { "data", query } });
        }
    }
 
     
     
     
    