I need to build a function for processing large CSV files for use in a bluebird.map() call. Given the potential sizes of the file, I'd like to use streaming.
This function should accept a stream (a CSV file) and a function (that processes the chunks from the stream) and return a promise when the file is read to end (resolved) or errors (rejected).
So, I start with:
'use strict';
var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');
var pgp = require('pg-promise')({promiseLib: promise});
api.parsers.processCsvStream = function(passedStream, processor) {
  var parser = csv.parse(passedStream, {trim: true});
  passedStream.pipe(parser);
  // use readable or data event?
  parser.on('readable', function() {
    // call processor, which may be async
    // how do I throttle the amount of promises generated
  });
  var db = pgp(api.config.mailroom.fileMakerDbConfig);
  return new Promise(function(resolve, reject) {
    parser.on('end', resolve);
    parser.on('error', reject);
  });
}
Now, I have two inter-related issues:
- I need to throttle the actual amount of data being processed, so as to not create memory pressures.
 - The function passed as the 
processorparam is going to often be async, such as saving the contents of the file to the db via a library that is promise-based (right now:pg-promise). As such, it will create a promise in memory and move on, repeatedly. 
The pg-promise library has functions to manage this, like page(), but I'm not able to wrap my ahead around how to mix stream event handlers with these promise methods. Right now, I return a promise in the handler for readable section after each read(), which means I create a huge amount of promised database operations and eventually fault out because I hit a process memory limit.
Does anyone have a working example of this that I can use as a jumping point?
UPDATE: Probably more than one way to skin the cat, but this works:
'use strict';
var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');
var pgp = require('pg-promise')({promiseLib: promise});
api.parsers.processCsvStream = function(passedStream, processor) {
  // some checks trimmed out for example
  var db = pgp(api.config.mailroom.fileMakerDbConfig);
  var parser = csv.parse(passedStream, {trim: true});
  passedStream.pipe(parser);
  var readDataFromStream = function(index, data, delay) {
    var records = [];
    var record;
    do {
      record = parser.read();
      if(record != null)
        records.push(record);
    } while(record != null && (records.length < api.config.mailroom.fileParserConcurrency))
    parser.pause();
    if(records.length)
      return records;
  };
  var processData = function(index, data, delay) {
    console.log('processData(' + index + ') > data: ', data);
    parser.resume();
  };
  parser.on('readable', function() {
    db.task(function(tsk) {
      this.page(readDataFromStream, processData);
    });
  });
  return new Promise(function(resolve, reject) {
    parser.on('end', resolve);
    parser.on('error', reject);
  });
}
Anyone sees a potential problem with this approach?