simple-map.asynct.js 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287
  1. 'use strict';
  2. var map = require('../')
  3. , it = require('it-is')
  4. , u = require('ubelt')
  5. , spec = require('stream-spec')
  6. , from = require('from')
  7. , Stream = require('stream')
  8. , es = require('event-stream')
  9. //REFACTOR THIS TEST TO USE es.readArray and es.writeArray
  10. function writeArray(array, stream) {
  11. array.forEach( function (j) {
  12. stream.write(j)
  13. })
  14. stream.end()
  15. }
  16. function readStream(stream, done) {
  17. var array = []
  18. stream.on('data', function (data) {
  19. array.push(data)
  20. })
  21. stream.on('error', done)
  22. stream.on('end', function (data) {
  23. done(null, array)
  24. })
  25. }
  26. //call sink on each write,
  27. //and complete when finished.
  28. function pauseStream (prob, delay) {
  29. var pauseIf = (
  30. 'number' == typeof prob
  31. ? function () {
  32. return Math.random() < prob
  33. }
  34. : 'function' == typeof prob
  35. ? prob
  36. : 0.1
  37. )
  38. var delayer = (
  39. !delay
  40. ? process.nextTick
  41. : 'number' == typeof delay
  42. ? function (next) { setTimeout(next, delay) }
  43. : delay
  44. )
  45. return es.through(function (data) {
  46. if(!this.paused && pauseIf()) {
  47. console.log('PAUSE STREAM PAUSING')
  48. this.pause()
  49. var self = this
  50. delayer(function () {
  51. console.log('PAUSE STREAM RESUMING')
  52. self.resume()
  53. })
  54. }
  55. console.log("emit ('data', " + data + ')')
  56. this.emit('data', data)
  57. })
  58. }
  59. exports ['simple map applied to a stream'] = function (test) {
  60. var input = [1,2,3,7,5,3,1,9,0,2,4,6]
  61. //create event stream from
  62. var doubler = map(function (data, cb) {
  63. cb(null, data * 2)
  64. })
  65. spec(doubler).through().validateOnExit()
  66. //a map is only a middle man, so it is both readable and writable
  67. it(doubler).has({
  68. readable: true,
  69. writable: true,
  70. })
  71. readStream(doubler, function (err, output) {
  72. it(output).deepEqual(input.map(function (j) {
  73. return j * 2
  74. }))
  75. // process.nextTick(x.validate)
  76. test.done()
  77. })
  78. writeArray(input, doubler)
  79. }
  80. exports ['stream comes back in the correct order'] = function (test) {
  81. var input = [3, 2, 1]
  82. var delayer = map(function(data, cb){
  83. setTimeout(function () {
  84. cb(null, data)
  85. }, 100 * data)
  86. })
  87. readStream(delayer, function (err, output) {
  88. it(output).deepEqual(input)
  89. test.done()
  90. })
  91. writeArray(input, delayer)
  92. }
  93. exports['pipe two maps together'] = function (test) {
  94. var input = [1,2,3,7,5,3,1,9,0,2,4,6]
  95. //create event stream from
  96. function dd (data, cb) {
  97. cb(null, data * 2)
  98. }
  99. var doubler1 = map(dd), doubler2 = map(dd)
  100. doubler1.pipe(doubler2)
  101. spec(doubler1).through().validateOnExit()
  102. spec(doubler2).through().validateOnExit()
  103. readStream(doubler2, function (err, output) {
  104. it(output).deepEqual(input.map(function (j) {
  105. return j * 4
  106. }))
  107. test.done()
  108. })
  109. writeArray(input, doubler1)
  110. }
  111. //next:
  112. //
  113. // test pause, resume and drian.
  114. //
  115. // then make a pipe joiner:
  116. //
  117. // plumber (evStr1, evStr2, evStr3, evStr4, evStr5)
  118. //
  119. // will return a single stream that write goes to the first
  120. exports ['map will not call end until the callback'] = function (test) {
  121. var ticker = map(function (data, cb) {
  122. process.nextTick(function () {
  123. cb(null, data * 2)
  124. })
  125. })
  126. spec(ticker).through().validateOnExit()
  127. ticker.write('x')
  128. ticker.end()
  129. ticker.on('end', function () {
  130. test.done()
  131. })
  132. }
  133. exports ['emit error thrown'] = function (test) {
  134. var err = new Error('INTENSIONAL ERROR')
  135. , mapper =
  136. map(function () {
  137. throw err
  138. })
  139. mapper.on('error', function (_err) {
  140. it(_err).equal(err)
  141. test.done()
  142. })
  143. mapper.write('hello')
  144. }
  145. exports ['emit error calledback'] = function (test) {
  146. var err = new Error('INTENSIONAL ERROR')
  147. , mapper =
  148. map(function (data, callback) {
  149. callback(err)
  150. })
  151. mapper.on('error', function (_err) {
  152. it(_err).equal(err)
  153. test.done()
  154. })
  155. mapper.write('hello')
  156. }
  157. exports ['do not emit drain if not paused'] = function (test) {
  158. var maps = map(function (data, callback) {
  159. u.delay(callback)(null, 1)
  160. return true
  161. })
  162. spec(maps).through().pausable().validateOnExit()
  163. maps.on('drain', function () {
  164. it(false).ok('should not emit drain unless the stream is paused')
  165. })
  166. it(maps.write('hello')).equal(true)
  167. it(maps.write('hello')).equal(true)
  168. it(maps.write('hello')).equal(true)
  169. setTimeout(function () {maps.end()},10)
  170. maps.on('end', test.done)
  171. }
  172. exports ['emits drain if paused, when all '] = function (test) {
  173. var active = 0
  174. var drained = false
  175. var maps = map(function (data, callback) {
  176. active ++
  177. u.delay(function () {
  178. active --
  179. callback(null, 1)
  180. })()
  181. console.log('WRITE', false)
  182. return false
  183. })
  184. spec(maps).through().validateOnExit()
  185. maps.on('drain', function () {
  186. drained = true
  187. it(active).equal(0, 'should emit drain when all maps are done')
  188. })
  189. it(maps.write('hello')).equal(false)
  190. it(maps.write('hello')).equal(false)
  191. it(maps.write('hello')).equal(false)
  192. process.nextTick(function () {maps.end()},10)
  193. maps.on('end', function () {
  194. console.log('end')
  195. it(drained).ok('shoud have emitted drain before end')
  196. test.done()
  197. })
  198. }
  199. exports ['map applied to a stream with filtering'] = function (test) {
  200. var input = [1,2,3,7,5,3,1,9,0,2,4,6]
  201. var doubler = map(function (data, callback) {
  202. if (data % 2)
  203. callback(null, data * 2)
  204. else
  205. callback()
  206. })
  207. readStream(doubler, function (err, output) {
  208. it(output).deepEqual(input.filter(function (j) {
  209. return j % 2
  210. }).map(function (j) {
  211. return j * 2
  212. }))
  213. test.done()
  214. })
  215. spec(doubler).through().validateOnExit()
  216. writeArray(input, doubler)
  217. }