123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172 |
- /**
- * Module dependencies.
- */
- var duplexer = require('duplexer2');
- var Stream = require('stream');
- /**
- * Slice reference.
- */
- var slice = [].slice;
- /**
- * Duplexer options.
- */
- var opts = {
- bubbleErrors: false
- };
- /**
- * Expose `pipe`.
- */
- module.exports = pipe;
- /**
- * Pipe.
- *
- * @param {Stream,...,[Function]}
- * @return {Stream}
- * @api public
- */
- function pipe(){
- if (arguments.length == 1) return arguments[0];
- var streams = slice.call(arguments);
- var cb;
- if ('function' == typeof streams[streams.length - 1]) {
- cb = streams.splice(-1)[0];
- }
- var first = streams[0];
- var last = streams[streams.length - 1];
- var ret;
-
- if (first.writable && last.readable) ret = duplexer(opts, first, last);
- else if (first.writable) ret = first;
- else if (last.readable) ret = last;
- else ret = new Stream;
-
- streams.forEach(function(stream, i){
- var next = streams[i+1];
- if (next) stream.pipe(next);
- if (stream != ret) stream.on('error', ret.emit.bind(ret, 'error'));
- });
- if (cb) {
- var ended = false;
- ret.on('error', end);
- last.on('finish', end);
- function end(err){
- if (ended) return;
- ended = true;
- cb(err);
- }
- }
- return ret;
- }
|