index.js 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. //filter will reemit the data if cb(err,pass) pass is truthy
  2. // reduce is more tricky
  3. // maybe we want to group the reductions or emit progress updates occasionally
  4. // the most basic reduce just emits one 'data' event after it has recieved 'end'
  5. var Stream = require('stream').Stream
  6. //create an event stream and apply function to each .write
  7. //emitting each response as data
  8. //unless it's an empty callback
  9. module.exports = function (mapper) {
  10. var stream = new Stream()
  11. , inputs = 0
  12. , outputs = 0
  13. , ended = false
  14. , paused = false
  15. , destroyed = false
  16. , lastWritten = 0
  17. , inNext = false
  18. // Items that are not ready to be written yet (because they would come out of
  19. // order) get stuck in a queue for later.
  20. var writeQueue = {}
  21. stream.writable = true
  22. stream.readable = true
  23. function queueData (data, number) {
  24. var nextToWrite = lastWritten + 1
  25. if (number === nextToWrite) {
  26. // If it's next, and its not undefined write it
  27. if (data !== undefined) {
  28. stream.emit.apply(stream, ['data', data])
  29. }
  30. lastWritten ++
  31. nextToWrite ++
  32. } else {
  33. // Otherwise queue it for later.
  34. writeQueue[number] = data
  35. }
  36. // If the next value is in the queue, write it
  37. if (writeQueue.hasOwnProperty(nextToWrite)) {
  38. var dataToWrite = writeQueue[nextToWrite]
  39. delete writeQueue[nextToWrite]
  40. return queueData(dataToWrite, nextToWrite)
  41. }
  42. outputs ++
  43. if(inputs === outputs) {
  44. if(paused) paused = false, stream.emit('drain') //written all the incoming events
  45. if(ended) end()
  46. }
  47. }
  48. function next (err, data, number) {
  49. if(destroyed) return
  50. inNext = true
  51. if(err) {
  52. return inNext = false, stream.emit.apply(stream, ['error', err])
  53. }
  54. queueData(data, number)
  55. inNext = false
  56. }
  57. // Wrap the mapper function by calling its callback with the order number of
  58. // the item in the stream.
  59. function wrappedMapper (input, number, callback) {
  60. return mapper.call(null, input, function(err, data){
  61. callback(err, data, number)
  62. })
  63. }
  64. stream.write = function (data) {
  65. if(ended) throw new Error('map stream is not writable')
  66. inNext = false
  67. inputs ++
  68. try {
  69. //catch sync errors and handle them like async errors
  70. var written = wrappedMapper(data, inputs, next)
  71. paused = (written === false)
  72. return !paused
  73. } catch (err) {
  74. //if the callback has been called syncronously, and the error
  75. //has occured in an listener, throw it again.
  76. if(inNext)
  77. throw err
  78. next(err)
  79. return !paused
  80. }
  81. }
  82. function end (data) {
  83. //if end was called with args, write it,
  84. ended = true //write will emit 'end' if ended is true
  85. stream.writable = false
  86. if(data !== undefined) {
  87. return queueData(data, inputs)
  88. } else if (inputs == outputs) { //wait for processing
  89. stream.readable = false, stream.emit('end'), stream.destroy()
  90. }
  91. }
  92. stream.end = function (data) {
  93. if(ended) return
  94. end()
  95. }
  96. stream.destroy = function () {
  97. ended = destroyed = true
  98. stream.writable = stream.readable = paused = false
  99. process.nextTick(function () {
  100. stream.emit('close')
  101. })
  102. }
  103. stream.pause = function () {
  104. paused = true
  105. }
  106. stream.resume = function () {
  107. paused = false
  108. }
  109. return stream
  110. }