index.js 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. /**
  2. * Module dependencies.
  3. */
  4. var duplexer = require('duplexer2');
  5. var Stream = require('stream');
  6. /**
  7. * Slice reference.
  8. */
  9. var slice = [].slice;
  10. /**
  11. * Duplexer options.
  12. */
  13. var opts = {
  14. bubbleErrors: false
  15. };
  16. /**
  17. * Expose `pipe`.
  18. */
  19. module.exports = pipe;
  20. /**
  21. * Pipe.
  22. *
  23. * @param {Stream,...,[Function]}
  24. * @return {Stream}
  25. * @api public
  26. */
  27. function pipe(){
  28. if (arguments.length == 1) return arguments[0];
  29. var streams = slice.call(arguments);
  30. var cb;
  31. if ('function' == typeof streams[streams.length - 1]) {
  32. cb = streams.splice(-1)[0];
  33. }
  34. var first = streams[0];
  35. var last = streams[streams.length - 1];
  36. var ret;
  37. if (first.writable && last.readable) ret = duplexer(opts, first, last);
  38. else if (first.writable) ret = first;
  39. else if (last.readable) ret = last;
  40. else ret = new Stream;
  41. streams.forEach(function(stream, i){
  42. var next = streams[i+1];
  43. if (next) stream.pipe(next);
  44. if (stream != ret) stream.on('error', ret.emit.bind(ret, 'error'));
  45. });
  46. if (cb) {
  47. var ended = false;
  48. ret.on('error', end);
  49. last.on('finish', end);
  50. function end(err){
  51. if (ended) return;
  52. ended = true;
  53. cb(err);
  54. }
  55. }
  56. return ret;
  57. }