I am trying to extract images from a csv file by doing the following:
- Parsing/streaming in a large csv file using csv-parseand thefscreateReadStream method
- Grabbing each line for processing using stream-transform
- Extraction of image and other row data for processing using the asyncwaterfall method.
- Download and write image to server using requestand thefscreateWriteStream method
For some reason after the data gets piped into createWriteStream, there is some event in which an async callback never gets called. I have run this same code only using request, without piping to createWriteStream, and it works. I've also run createWriteStream w/ a drain event, and then some how it works? Can anyone explain this to me? 
In the code below, request is trying to pipe 14,970 images, but the createWriteStream close or finish events only fire 14,895 times, with error firing 0 times. Could this be a draining issue? Could highWaterMark be exceeded and a write fail could be occurring undetected? 
Here is my csv line getting code:
var first = true;
var parser = parse();
var transformer = transform( (line, complete) => {
        if(!first)
            extractData(line,complete)
        else {
            first = false;
            complete(null);
        }
    }, 
    () => {
        console.log('Done: parseFile');
    });
fs.createReadStream(this.upload.location).pipe(parser).pipe(transformer);
extractData function that doesn't always do a required async callback:    
extractData(line,complete){
    var now = new Date();
    var image = { 
        createdAt: now,
        updatedAt: now
    };
    async.waterfall([
        next => { // Data Extraction
            async.forEachOf(line, (data, i, complete) => {
                if(i === 2) image.src = data; 
                if(i === 3) image.importSrc = data;
                complete(null);
            }, err => {
                if(err) throw err;
                next(null);
            });
        },
        next => { // Download Image
            var file = fs.createWriteStream('public/'+image.src);
            var sendReq = request.get(image.importSrc);
            sendReq.on('response', response => {
                if (response.statusCode !== 200) {
                    this.upload.report.image.errors++;
                    return next(null);
                } 
            });
            sendReq.on('error', err => {
                this.upload.report.image.errors++;
                next(null);
            });
            sendReq.pipe(file);
            file.on('finish', () => {
                this.upload.report.image.inserts++;
                file.close(next); // Close file and callback
            });
            file.on('error', err => {
                this.upload.report.image.errors++;
                next(null);
            });
        }
    ], err => {
        if(err) throw err;
        complete(null);
    });
}
As suggested by @mscdex, I've also tried switching out finish for his replacement close approach.
 
    