After cogitating on your problem a little more I believe I found a more general solution. Let's start with the EventStream constructor (which is more general than your Stream constructor):
function EventStream() {
this.listeners = [];
}
Then we create a dispatch method to add events to the stream:
EventStream.prototype.dispatch = function (event) {
return this.listeners.map(function (listener) {
return listener(event);
});
};
Next we'll create a map method which is again more general than your foreach method:
EventStream.prototype.map = function (f) {
var stream = new EventStream;
this.listeners.push(function (x) {
return stream.dispatch(f(x));
});
return stream;
};
Now when you map a function over an event stream you get an entirely new event stream. For example if your stream is [0,1,3,5..] and you map (+2) over it then the new stream would be [2,3,5,7..].
We'll also create a few more beneficial utility methods like filter, scan and merge as follows:
EventStream.prototype.filter = function (f) {
var stream = new EventStream;
this.listeners.push(function (x) {
if (f(x)) return stream.dispatch(x);
});
return stream;
};
The filter method filter out certain events in an event stream to create an entirely new event stream. For example given [2,3,5,7..] and the function odd the filtered event stream would be [3,5,7..].
EventStream.prototype.scan = function (a, f) {
var stream = new EventStream;
setTimeout(function () {
stream.dispatch(a);
});
this.listeners.push(function (x) {
return stream.dispatch(a = f(a, x));
});
return stream;
};
The scan method is used to cumulatively create a new event stream. For example given the stream [3,5,7..], the initial value 0 and the scanning function (+) the new event stream would be [0,3,8,15..].
EventStream.prototype.merge = function (that) {
var stream = new EventStream;
this.listeners.push(function (x) {
return stream.dispatch(new Left(x));
});
this.listeners.push(function (y) {
return stream.dispatch(new Right(x));
});
return stream;
};
function Left(x) {
this.left = x;
}
function Right(x) {
this.right = x;
}
The merge method combines two separate event streams into one. To differentiate which stream generated each event we tag all the events as either left or right.
Alright, now onto bigger problems. Let's create a zip method. The really cool thing is that we can create zip using the map, filter, scan and merge methods as follows:
EventStream.prototype.zip = function (that) {
return this.merge(that).scan([[], [], null], function (acc, event) {
var left = acc[0], right = acc[1];
if (event instanceof Left) {
var value = event.left;
return right.length ?
[left, right.slice(1), new Just([value, right[0]])] :
[left.concat(value), right, null];
} else {
var value = event.right;
return left.length ?
[left.slice(1), right, new Just([left[0], value])] :
[tuple(left, right.concat(value), null];
}
})
.filter(function (a) {
return a[2] instanceof Just;
})
.map(function (a) {
return a[2].just;
});
};
function Just(x) {
this.just = x;
}
Now you can use it as follows:
stream1.zip(stream2).map(function (v) {
console.log(v);
});
You can define stream1 and stream2 as follows:
var stream1 = getRandomStream();
var stream2 = getRandomStream();
function getRandomStream() {
var stream = new EventStream;
setInterval(function () {
stream.dispatch(Math.random());
}, ((Math.random() * 100) + 500) | 0);
return stream;
}
That's all there is to it. No need for promises.