block-stream.js 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. // write data to it, and it'll emit data in 512 byte blocks.
  2. // if you .end() or .flush(), it'll emit whatever it's got,
  3. // padded with nulls to 512 bytes.
  4. module.exports = BlockStream
  5. var Stream = require("stream").Stream
  6. , inherits = require("inherits")
  7. , assert = require("assert").ok
  8. , debug = process.env.DEBUG ? console.error : function () {}
  9. function BlockStream (size, opt) {
  10. this.writable = this.readable = true
  11. this._opt = opt || {}
  12. this._chunkSize = size || 512
  13. this._offset = 0
  14. this._buffer = []
  15. this._bufferLength = 0
  16. if (this._opt.nopad) this._zeroes = false
  17. else {
  18. this._zeroes = new Buffer(this._chunkSize)
  19. for (var i = 0; i < this._chunkSize; i ++) {
  20. this._zeroes[i] = 0
  21. }
  22. }
  23. }
  24. inherits(BlockStream, Stream)
  25. BlockStream.prototype.write = function (c) {
  26. // debug(" BS write", c)
  27. if (this._ended) throw new Error("BlockStream: write after end")
  28. if (c && !Buffer.isBuffer(c)) c = new Buffer(c + "")
  29. if (c.length) {
  30. this._buffer.push(c)
  31. this._bufferLength += c.length
  32. }
  33. // debug("pushed onto buffer", this._bufferLength)
  34. if (this._bufferLength >= this._chunkSize) {
  35. if (this._paused) {
  36. // debug(" BS paused, return false, need drain")
  37. this._needDrain = true
  38. return false
  39. }
  40. this._emitChunk()
  41. }
  42. return true
  43. }
  44. BlockStream.prototype.pause = function () {
  45. // debug(" BS pausing")
  46. this._paused = true
  47. }
  48. BlockStream.prototype.resume = function () {
  49. // debug(" BS resume")
  50. this._paused = false
  51. return this._emitChunk()
  52. }
  53. BlockStream.prototype.end = function (chunk) {
  54. // debug("end", chunk)
  55. if (typeof chunk === "function") cb = chunk, chunk = null
  56. if (chunk) this.write(chunk)
  57. this._ended = true
  58. this.flush()
  59. }
  60. BlockStream.prototype.flush = function () {
  61. this._emitChunk(true)
  62. }
  63. BlockStream.prototype._emitChunk = function (flush) {
  64. // debug("emitChunk flush=%j emitting=%j paused=%j", flush, this._emitting, this._paused)
  65. // emit a <chunkSize> chunk
  66. if (flush && this._zeroes) {
  67. // debug(" BS push zeroes", this._bufferLength)
  68. // push a chunk of zeroes
  69. var padBytes = (this._bufferLength % this._chunkSize)
  70. if (padBytes !== 0) padBytes = this._chunkSize - padBytes
  71. if (padBytes > 0) {
  72. // debug("padBytes", padBytes, this._zeroes.slice(0, padBytes))
  73. this._buffer.push(this._zeroes.slice(0, padBytes))
  74. this._bufferLength += padBytes
  75. // debug(this._buffer[this._buffer.length - 1].length, this._bufferLength)
  76. }
  77. }
  78. if (this._emitting || this._paused) return
  79. this._emitting = true
  80. // debug(" BS entering loops")
  81. var bufferIndex = 0
  82. while (this._bufferLength >= this._chunkSize &&
  83. (flush || !this._paused)) {
  84. // debug(" BS data emission loop", this._bufferLength)
  85. var out
  86. , outOffset = 0
  87. , outHas = this._chunkSize
  88. while (outHas > 0 && (flush || !this._paused) ) {
  89. // debug(" BS data inner emit loop", this._bufferLength)
  90. var cur = this._buffer[bufferIndex]
  91. , curHas = cur.length - this._offset
  92. // debug("cur=", cur)
  93. // debug("curHas=%j", curHas)
  94. // If it's not big enough to fill the whole thing, then we'll need
  95. // to copy multiple buffers into one. However, if it is big enough,
  96. // then just slice out the part we want, to save unnecessary copying.
  97. // Also, need to copy if we've already done some copying, since buffers
  98. // can't be joined like cons strings.
  99. if (out || curHas < outHas) {
  100. out = out || new Buffer(this._chunkSize)
  101. cur.copy(out, outOffset,
  102. this._offset, this._offset + Math.min(curHas, outHas))
  103. } else if (cur.length === outHas && this._offset === 0) {
  104. // shortcut -- cur is exactly long enough, and no offset.
  105. out = cur
  106. } else {
  107. // slice out the piece of cur that we need.
  108. out = cur.slice(this._offset, this._offset + outHas)
  109. }
  110. if (curHas > outHas) {
  111. // means that the current buffer couldn't be completely output
  112. // update this._offset to reflect how much WAS written
  113. this._offset += outHas
  114. outHas = 0
  115. } else {
  116. // output the entire current chunk.
  117. // toss it away
  118. outHas -= curHas
  119. outOffset += curHas
  120. bufferIndex ++
  121. this._offset = 0
  122. }
  123. }
  124. this._bufferLength -= this._chunkSize
  125. assert(out.length === this._chunkSize)
  126. // debug("emitting data", out)
  127. // debug(" BS emitting, paused=%j", this._paused, this._bufferLength)
  128. this.emit("data", out)
  129. out = null
  130. }
  131. // debug(" BS out of loops", this._bufferLength)
  132. // whatever is left, it's not enough to fill up a block, or we're paused
  133. this._buffer = this._buffer.slice(bufferIndex)
  134. if (this._paused) {
  135. // debug(" BS paused, leaving", this._bufferLength)
  136. this._needsDrain = true
  137. this._emitting = false
  138. return
  139. }
  140. // if flushing, and not using null-padding, then need to emit the last
  141. // chunk(s) sitting in the queue. We know that it's not enough to
  142. // fill up a whole block, because otherwise it would have been emitted
  143. // above, but there may be some offset.
  144. var l = this._buffer.length
  145. if (flush && !this._zeroes && l) {
  146. if (l === 1) {
  147. if (this._offset) {
  148. this.emit("data", this._buffer[0].slice(this._offset))
  149. } else {
  150. this.emit("data", this._buffer[0])
  151. }
  152. } else {
  153. var outHas = this._bufferLength
  154. , out = new Buffer(outHas)
  155. , outOffset = 0
  156. for (var i = 0; i < l; i ++) {
  157. var cur = this._buffer[i]
  158. , curHas = cur.length - this._offset
  159. cur.copy(out, outOffset, this._offset)
  160. this._offset = 0
  161. outOffset += curHas
  162. this._bufferLength -= curHas
  163. }
  164. this.emit("data", out)
  165. }
  166. // truncate
  167. this._buffer.length = 0
  168. this._bufferLength = 0
  169. this._offset = 0
  170. }
  171. // now either drained or ended
  172. // debug("either draining, or ended", this._bufferLength, this._ended)
  173. // means that we've flushed out all that we can so far.
  174. if (this._needDrain) {
  175. // debug("emitting drain", this._bufferLength)
  176. this._needDrain = false
  177. this.emit("drain")
  178. }
  179. if ((this._bufferLength === 0) && this._ended && !this._endEmitted) {
  180. // debug("emitting end", this._bufferLength)
  181. this._endEmitted = true
  182. this.emit("end")
  183. }
  184. this._emitting = false
  185. // debug(" BS no longer emitting", flush, this._paused, this._emitting, this._bufferLength, this._chunkSize)
  186. }