stream-api.js 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. 'use strict';
  2. var si = require('set-immediate-shim');
  3. var stream = require('readable-stream');
  4. var util = require('util');
  5. var Readable = stream.Readable;
  6. module.exports = ReaddirpReadable;
  7. util.inherits(ReaddirpReadable, Readable);
  8. function ReaddirpReadable (opts) {
  9. if (!(this instanceof ReaddirpReadable)) return new ReaddirpReadable(opts);
  10. opts = opts || {};
  11. opts.objectMode = true;
  12. Readable.call(this, opts);
  13. // backpressure not implemented at this point
  14. this.highWaterMark = Infinity;
  15. this._destroyed = false;
  16. this._paused = false;
  17. this._warnings = [];
  18. this._errors = [];
  19. this._pauseResumeErrors();
  20. }
  21. var proto = ReaddirpReadable.prototype;
  22. proto._pauseResumeErrors = function () {
  23. var self = this;
  24. self.on('pause', function () { self._paused = true });
  25. self.on('resume', function () {
  26. if (self._destroyed) return;
  27. self._paused = false;
  28. self._warnings.forEach(function (err) { self.emit('warn', err) });
  29. self._warnings.length = 0;
  30. self._errors.forEach(function (err) { self.emit('error', err) });
  31. self._errors.length = 0;
  32. })
  33. }
  34. // called for each entry
  35. proto._processEntry = function (entry) {
  36. if (this._destroyed) return;
  37. this.push(entry);
  38. }
  39. proto._read = function () { }
  40. proto.destroy = function () {
  41. // when stream is destroyed it will emit nothing further, not even errors or warnings
  42. this.push(null);
  43. this.readable = false;
  44. this._destroyed = true;
  45. this.emit('close');
  46. }
  47. proto._done = function () {
  48. this.push(null);
  49. }
  50. // we emit errors and warnings async since we may handle errors like invalid args
  51. // within the initial event loop before any event listeners subscribed
  52. proto._handleError = function (err) {
  53. var self = this;
  54. si(function () {
  55. if (self._paused) return self._warnings.push(err);
  56. if (!self._destroyed) self.emit('warn', err);
  57. });
  58. }
  59. proto._handleFatalError = function (err) {
  60. var self = this;
  61. si(function () {
  62. if (self._paused) return self._errors.push(err);
  63. if (!self._destroyed) self.emit('error', err);
  64. });
  65. }
  66. function createStreamAPI () {
  67. var stream = new ReaddirpReadable();
  68. return {
  69. stream : stream
  70. , processEntry : stream._processEntry.bind(stream)
  71. , done : stream._done.bind(stream)
  72. , handleError : stream._handleError.bind(stream)
  73. , handleFatalError : stream._handleFatalError.bind(stream)
  74. };
  75. }
  76. module.exports = createStreamAPI;