index.js 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. var PassThrough = require('readable-stream').PassThrough;
  2. var util = require('util');
  3. // Inherit of PassThrough stream
  4. util.inherits(BufferStream, PassThrough);
  5. // Constructor
  6. function BufferStream(options, cb) {
  7. // Ensure new were used
  8. if (!(this instanceof BufferStream)) {
  9. return new BufferStream(options, cb);
  10. }
  11. // Cast args
  12. if(options instanceof Function) {
  13. cb = options;
  14. options = {};
  15. }
  16. options = options || {};
  17. if(!(cb instanceof Function)) {
  18. throw new Error('The given callback must be a function.');
  19. }
  20. this.__objectMode = options.objectMode;
  21. // Parent constructor
  22. PassThrough.call(this, options);
  23. // Keep a reference to the callback
  24. this._cb = cb;
  25. // Internal buffer
  26. this._buf = options.objectMode ? [] : Buffer('');
  27. }
  28. BufferStream.prototype._transform = function(chunk, encoding, done) {
  29. if(this.__objectMode) {
  30. this._buf.push(chunk);
  31. } else {
  32. this._buf = Buffer.concat([this._buf, chunk], this._buf.length + chunk.length);
  33. }
  34. done();
  35. };
  36. BufferStream.prototype._flush = function(done) {
  37. var _this = this;
  38. this._cb(null, this._buf, function(err, buf) {
  39. if(buf && buf.length) {
  40. if(_this.__objectMode) {
  41. buf.forEach(function(chunk) {
  42. _this.push(chunk);
  43. });
  44. } else {
  45. _this.push(buf);
  46. }
  47. }
  48. done();
  49. });
  50. };
  51. module.exports = BufferStream;