combined_stream.js 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. var util = require('util');
  2. var Stream = require('stream').Stream;
  3. var DelayedStream = require('delayed-stream');
  4. module.exports = CombinedStream;
  5. function CombinedStream() {
  6. this.writable = false;
  7. this.readable = true;
  8. this.dataSize = 0;
  9. this.maxDataSize = 2 * 1024 * 1024;
  10. this.pauseStreams = true;
  11. this._released = false;
  12. this._streams = [];
  13. this._currentStream = null;
  14. }
  15. util.inherits(CombinedStream, Stream);
  16. CombinedStream.create = function(options) {
  17. var combinedStream = new this();
  18. options = options || {};
  19. for (var option in options) {
  20. combinedStream[option] = options[option];
  21. }
  22. return combinedStream;
  23. };
  24. CombinedStream.isStreamLike = function(stream) {
  25. return (typeof stream !== 'function')
  26. && (typeof stream !== 'string')
  27. && (typeof stream !== 'boolean')
  28. && (typeof stream !== 'number')
  29. && (!Buffer.isBuffer(stream));
  30. };
  31. CombinedStream.prototype.append = function(stream) {
  32. var isStreamLike = CombinedStream.isStreamLike(stream);
  33. if (isStreamLike) {
  34. if (!(stream instanceof DelayedStream)) {
  35. var newStream = DelayedStream.create(stream, {
  36. maxDataSize: Infinity,
  37. pauseStream: this.pauseStreams,
  38. });
  39. stream.on('data', this._checkDataSize.bind(this));
  40. stream = newStream;
  41. }
  42. this._handleErrors(stream);
  43. if (this.pauseStreams) {
  44. stream.pause();
  45. }
  46. }
  47. this._streams.push(stream);
  48. return this;
  49. };
  50. CombinedStream.prototype.pipe = function(dest, options) {
  51. Stream.prototype.pipe.call(this, dest, options);
  52. this.resume();
  53. return dest;
  54. };
  55. CombinedStream.prototype._getNext = function() {
  56. this._currentStream = null;
  57. var stream = this._streams.shift();
  58. if (typeof stream == 'undefined') {
  59. this.end();
  60. return;
  61. }
  62. if (typeof stream !== 'function') {
  63. this._pipeNext(stream);
  64. return;
  65. }
  66. var getStream = stream;
  67. getStream(function(stream) {
  68. var isStreamLike = CombinedStream.isStreamLike(stream);
  69. if (isStreamLike) {
  70. stream.on('data', this._checkDataSize.bind(this));
  71. this._handleErrors(stream);
  72. }
  73. this._pipeNext(stream);
  74. }.bind(this));
  75. };
  76. CombinedStream.prototype._pipeNext = function(stream) {
  77. this._currentStream = stream;
  78. var isStreamLike = CombinedStream.isStreamLike(stream);
  79. if (isStreamLike) {
  80. stream.on('end', this._getNext.bind(this));
  81. stream.pipe(this, {end: false});
  82. return;
  83. }
  84. var value = stream;
  85. this.write(value);
  86. this._getNext();
  87. };
  88. CombinedStream.prototype._handleErrors = function(stream) {
  89. var self = this;
  90. stream.on('error', function(err) {
  91. self._emitError(err);
  92. });
  93. };
  94. CombinedStream.prototype.write = function(data) {
  95. this.emit('data', data);
  96. };
  97. CombinedStream.prototype.pause = function() {
  98. if (!this.pauseStreams) {
  99. return;
  100. }
  101. if(this.pauseStreams && this._currentStream && typeof(this._currentStream.pause) == 'function') this._currentStream.pause();
  102. this.emit('pause');
  103. };
  104. CombinedStream.prototype.resume = function() {
  105. if (!this._released) {
  106. this._released = true;
  107. this.writable = true;
  108. this._getNext();
  109. }
  110. if(this.pauseStreams && this._currentStream && typeof(this._currentStream.resume) == 'function') this._currentStream.resume();
  111. this.emit('resume');
  112. };
  113. CombinedStream.prototype.end = function() {
  114. this._reset();
  115. this.emit('end');
  116. };
  117. CombinedStream.prototype.destroy = function() {
  118. this._reset();
  119. this.emit('close');
  120. };
  121. CombinedStream.prototype._reset = function() {
  122. this.writable = false;
  123. this._streams = [];
  124. this._currentStream = null;
  125. };
  126. CombinedStream.prototype._checkDataSize = function() {
  127. this._updateDataSize();
  128. if (this.dataSize <= this.maxDataSize) {
  129. return;
  130. }
  131. var message =
  132. 'DelayedStream#maxDataSize of ' + this.maxDataSize + ' bytes exceeded.';
  133. this._emitError(new Error(message));
  134. };
  135. CombinedStream.prototype._updateDataSize = function() {
  136. this.dataSize = 0;
  137. var self = this;
  138. this._streams.forEach(function(stream) {
  139. if (!stream.dataSize) {
  140. return;
  141. }
  142. self.dataSize += stream.dataSize;
  143. });
  144. if (this._currentStream && this._currentStream.dataSize) {
  145. this.dataSize += this._currentStream.dataSize;
  146. }
  147. };
  148. CombinedStream.prototype._emitError = function(err) {
  149. this._reset();
  150. this.emit('error', err);
  151. };