index.js 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. var Readable = require('stream').Readable;
  2. var util = require('util');
  3. function addStream(streams, stream)
  4. {
  5. if(!stream.readable) throw new Error('All input streams must be readable');
  6. if(this._readableState.ended) throw new Error('Adding streams after ended');
  7. var self = this;
  8. stream._buffer = [];
  9. stream.on('data', function(chunk)
  10. {
  11. if(this === streams[0])
  12. self.push(chunk);
  13. else
  14. this._buffer.push(chunk);
  15. });
  16. stream.on('end', function()
  17. {
  18. for(var stream = streams[0];
  19. stream && stream._readableState.ended;
  20. stream = streams[0])
  21. {
  22. while(stream._buffer.length)
  23. self.push(stream._buffer.shift());
  24. streams.shift();
  25. }
  26. if(!streams.length) self.push(null);
  27. });
  28. stream.on('error', this.emit.bind(this, 'error'));
  29. streams.push(stream);
  30. }
  31. function OrderedStreams(streams, options) {
  32. if (!(this instanceof(OrderedStreams))) {
  33. return new OrderedStreams(streams, options);
  34. }
  35. streams = streams || [];
  36. options = options || {};
  37. options.objectMode = true;
  38. Readable.call(this, options);
  39. if(!Array.isArray(streams)) streams = [streams];
  40. if(!streams.length) return this.push(null); // no streams, close
  41. var addStream_bind = addStream.bind(this, []);
  42. this.concat = function()
  43. {
  44. Array.prototype.forEach.call(arguments, function(item)
  45. {
  46. if(Array.isArray(item))
  47. item.forEach(addStream_bind);
  48. else
  49. addStream_bind(item);
  50. });
  51. };
  52. this.concat(streams);
  53. }
  54. util.inherits(OrderedStreams, Readable);
  55. OrderedStreams.prototype._read = function () {};
  56. module.exports = OrderedStreams;