123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137 |
- //filter will reemit the data if cb(err,pass) pass is truthy
- // reduce is more tricky
- // maybe we want to group the reductions or emit progress updates occasionally
- // the most basic reduce just emits one 'data' event after it has recieved 'end'
- var Stream = require('stream').Stream
- //create an event stream and apply function to each .write
- //emitting each response as data
- //unless it's an empty callback
- module.exports = function (mapper) {
- var stream = new Stream()
- , inputs = 0
- , outputs = 0
- , ended = false
- , paused = false
- , destroyed = false
- , lastWritten = 0
- , inNext = false
- // Items that are not ready to be written yet (because they would come out of
- // order) get stuck in a queue for later.
- var writeQueue = {}
- stream.writable = true
- stream.readable = true
- function queueData (data, number) {
- var nextToWrite = lastWritten + 1
- if (number === nextToWrite) {
- // If it's next, and its not undefined write it
- if (data !== undefined) {
- stream.emit.apply(stream, ['data', data])
- }
- lastWritten ++
- nextToWrite ++
- } else {
- // Otherwise queue it for later.
- writeQueue[number] = data
- }
- // If the next value is in the queue, write it
- if (writeQueue.hasOwnProperty(nextToWrite)) {
- var dataToWrite = writeQueue[nextToWrite]
- delete writeQueue[nextToWrite]
- return queueData(dataToWrite, nextToWrite)
- }
- outputs ++
- if(inputs === outputs) {
- if(paused) paused = false, stream.emit('drain') //written all the incoming events
- if(ended) end()
- }
- }
- function next (err, data, number) {
- if(destroyed) return
- inNext = true
- if(err) {
- return inNext = false, stream.emit.apply(stream, ['error', err])
- }
- queueData(data, number)
- inNext = false
- }
- // Wrap the mapper function by calling its callback with the order number of
- // the item in the stream.
- function wrappedMapper (input, number, callback) {
- return mapper.call(null, input, function(err, data){
- callback(err, data, number)
- })
- }
- stream.write = function (data) {
- if(ended) throw new Error('map stream is not writable')
- inNext = false
- inputs ++
- try {
- //catch sync errors and handle them like async errors
- var written = wrappedMapper(data, inputs, next)
- paused = (written === false)
- return !paused
- } catch (err) {
- //if the callback has been called syncronously, and the error
- //has occured in an listener, throw it again.
- if(inNext)
- throw err
- next(err)
- return !paused
- }
- }
- function end (data) {
- //if end was called with args, write it,
- ended = true //write will emit 'end' if ended is true
- stream.writable = false
- if(data !== undefined) {
- return queueData(data, inputs)
- } else if (inputs == outputs) { //wait for processing
- stream.readable = false, stream.emit('end'), stream.destroy()
- }
- }
- stream.end = function (data) {
- if(ended) return
- end()
- }
- stream.destroy = function () {
- ended = destroyed = true
- stream.writable = stream.readable = paused = false
- process.nextTick(function () {
- stream.emit('close')
- })
- }
- stream.pause = function () {
- paused = true
- }
- stream.resume = function () {
- paused = false
- }
- return stream
- }
|