index.js 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545
  1. 'use strict'
  2. const EE = require('events')
  3. const Stream = require('stream')
  4. const Yallist = require('yallist')
  5. const SD = require('string_decoder').StringDecoder
  6. const EOF = Symbol('EOF')
  7. const MAYBE_EMIT_END = Symbol('maybeEmitEnd')
  8. const EMITTED_END = Symbol('emittedEnd')
  9. const EMITTING_END = Symbol('emittingEnd')
  10. const CLOSED = Symbol('closed')
  11. const READ = Symbol('read')
  12. const FLUSH = Symbol('flush')
  13. const FLUSHCHUNK = Symbol('flushChunk')
  14. const ENCODING = Symbol('encoding')
  15. const DECODER = Symbol('decoder')
  16. const FLOWING = Symbol('flowing')
  17. const PAUSED = Symbol('paused')
  18. const RESUME = Symbol('resume')
  19. const BUFFERLENGTH = Symbol('bufferLength')
  20. const BUFFERPUSH = Symbol('bufferPush')
  21. const BUFFERSHIFT = Symbol('bufferShift')
  22. const OBJECTMODE = Symbol('objectMode')
  23. const DESTROYED = Symbol('destroyed')
  24. // TODO remove when Node v8 support drops
  25. const doIter = global._MP_NO_ITERATOR_SYMBOLS_ !== '1'
  26. const ASYNCITERATOR = doIter && Symbol.asyncIterator
  27. || Symbol('asyncIterator not implemented')
  28. const ITERATOR = doIter && Symbol.iterator
  29. || Symbol('iterator not implemented')
  30. // events that mean 'the stream is over'
  31. // these are treated specially, and re-emitted
  32. // if they are listened for after emitting.
  33. const isEndish = ev =>
  34. ev === 'end' ||
  35. ev === 'finish' ||
  36. ev === 'prefinish'
  37. const isArrayBuffer = b => b instanceof ArrayBuffer ||
  38. typeof b === 'object' &&
  39. b.constructor &&
  40. b.constructor.name === 'ArrayBuffer' &&
  41. b.byteLength >= 0
  42. const isArrayBufferView = b => !Buffer.isBuffer(b) && ArrayBuffer.isView(b)
  43. module.exports = class Minipass extends Stream {
  44. constructor (options) {
  45. super()
  46. this[FLOWING] = false
  47. // whether we're explicitly paused
  48. this[PAUSED] = false
  49. this.pipes = new Yallist()
  50. this.buffer = new Yallist()
  51. this[OBJECTMODE] = options && options.objectMode || false
  52. if (this[OBJECTMODE])
  53. this[ENCODING] = null
  54. else
  55. this[ENCODING] = options && options.encoding || null
  56. if (this[ENCODING] === 'buffer')
  57. this[ENCODING] = null
  58. this[DECODER] = this[ENCODING] ? new SD(this[ENCODING]) : null
  59. this[EOF] = false
  60. this[EMITTED_END] = false
  61. this[EMITTING_END] = false
  62. this[CLOSED] = false
  63. this.writable = true
  64. this.readable = true
  65. this[BUFFERLENGTH] = 0
  66. this[DESTROYED] = false
  67. }
  68. get bufferLength () { return this[BUFFERLENGTH] }
  69. get encoding () { return this[ENCODING] }
  70. set encoding (enc) {
  71. if (this[OBJECTMODE])
  72. throw new Error('cannot set encoding in objectMode')
  73. if (this[ENCODING] && enc !== this[ENCODING] &&
  74. (this[DECODER] && this[DECODER].lastNeed || this[BUFFERLENGTH]))
  75. throw new Error('cannot change encoding')
  76. if (this[ENCODING] !== enc) {
  77. this[DECODER] = enc ? new SD(enc) : null
  78. if (this.buffer.length)
  79. this.buffer = this.buffer.map(chunk => this[DECODER].write(chunk))
  80. }
  81. this[ENCODING] = enc
  82. }
  83. setEncoding (enc) {
  84. this.encoding = enc
  85. }
  86. get objectMode () { return this[OBJECTMODE] }
  87. set objectMode (om) { this[OBJECTMODE] = this[OBJECTMODE] || !!om }
  88. write (chunk, encoding, cb) {
  89. if (this[EOF])
  90. throw new Error('write after end')
  91. if (this[DESTROYED]) {
  92. this.emit('error', Object.assign(
  93. new Error('Cannot call write after a stream was destroyed'),
  94. { code: 'ERR_STREAM_DESTROYED' }
  95. ))
  96. return true
  97. }
  98. if (typeof encoding === 'function')
  99. cb = encoding, encoding = 'utf8'
  100. if (!encoding)
  101. encoding = 'utf8'
  102. // convert array buffers and typed array views into buffers
  103. // at some point in the future, we may want to do the opposite!
  104. // leave strings and buffers as-is
  105. // anything else switches us into object mode
  106. if (!this[OBJECTMODE] && !Buffer.isBuffer(chunk)) {
  107. if (isArrayBufferView(chunk))
  108. chunk = Buffer.from(chunk.buffer, chunk.byteOffset, chunk.byteLength)
  109. else if (isArrayBuffer(chunk))
  110. chunk = Buffer.from(chunk)
  111. else if (typeof chunk !== 'string')
  112. // use the setter so we throw if we have encoding set
  113. this.objectMode = true
  114. }
  115. // this ensures at this point that the chunk is a buffer or string
  116. // don't buffer it up or send it to the decoder
  117. if (!this.objectMode && !chunk.length) {
  118. if (this[BUFFERLENGTH] !== 0)
  119. this.emit('readable')
  120. if (cb)
  121. cb()
  122. return this.flowing
  123. }
  124. // fast-path writing strings of same encoding to a stream with
  125. // an empty buffer, skipping the buffer/decoder dance
  126. if (typeof chunk === 'string' && !this[OBJECTMODE] &&
  127. // unless it is a string already ready for us to use
  128. !(encoding === this[ENCODING] && !this[DECODER].lastNeed)) {
  129. chunk = Buffer.from(chunk, encoding)
  130. }
  131. if (Buffer.isBuffer(chunk) && this[ENCODING])
  132. chunk = this[DECODER].write(chunk)
  133. if (this.flowing) {
  134. // if we somehow have something in the buffer, but we think we're
  135. // flowing, then we need to flush all that out first, or we get
  136. // chunks coming in out of order. Can't emit 'drain' here though,
  137. // because we're mid-write, so that'd be bad.
  138. if (this[BUFFERLENGTH] !== 0)
  139. this[FLUSH](true)
  140. this.emit('data', chunk)
  141. } else
  142. this[BUFFERPUSH](chunk)
  143. if (this[BUFFERLENGTH] !== 0)
  144. this.emit('readable')
  145. if (cb)
  146. cb()
  147. return this.flowing
  148. }
  149. read (n) {
  150. if (this[DESTROYED])
  151. return null
  152. try {
  153. if (this[BUFFERLENGTH] === 0 || n === 0 || n > this[BUFFERLENGTH])
  154. return null
  155. if (this[OBJECTMODE])
  156. n = null
  157. if (this.buffer.length > 1 && !this[OBJECTMODE]) {
  158. if (this.encoding)
  159. this.buffer = new Yallist([
  160. Array.from(this.buffer).join('')
  161. ])
  162. else
  163. this.buffer = new Yallist([
  164. Buffer.concat(Array.from(this.buffer), this[BUFFERLENGTH])
  165. ])
  166. }
  167. return this[READ](n || null, this.buffer.head.value)
  168. } finally {
  169. this[MAYBE_EMIT_END]()
  170. }
  171. }
  172. [READ] (n, chunk) {
  173. if (n === chunk.length || n === null)
  174. this[BUFFERSHIFT]()
  175. else {
  176. this.buffer.head.value = chunk.slice(n)
  177. chunk = chunk.slice(0, n)
  178. this[BUFFERLENGTH] -= n
  179. }
  180. this.emit('data', chunk)
  181. if (!this.buffer.length && !this[EOF])
  182. this.emit('drain')
  183. return chunk
  184. }
  185. end (chunk, encoding, cb) {
  186. if (typeof chunk === 'function')
  187. cb = chunk, chunk = null
  188. if (typeof encoding === 'function')
  189. cb = encoding, encoding = 'utf8'
  190. if (chunk)
  191. this.write(chunk, encoding)
  192. if (cb)
  193. this.once('end', cb)
  194. this[EOF] = true
  195. this.writable = false
  196. // if we haven't written anything, then go ahead and emit,
  197. // even if we're not reading.
  198. // we'll re-emit if a new 'end' listener is added anyway.
  199. // This makes MP more suitable to write-only use cases.
  200. if (this.flowing || !this[PAUSED])
  201. this[MAYBE_EMIT_END]()
  202. return this
  203. }
  204. // don't let the internal resume be overwritten
  205. [RESUME] () {
  206. if (this[DESTROYED])
  207. return
  208. this[PAUSED] = false
  209. this[FLOWING] = true
  210. this.emit('resume')
  211. if (this.buffer.length)
  212. this[FLUSH]()
  213. else if (this[EOF])
  214. this[MAYBE_EMIT_END]()
  215. else
  216. this.emit('drain')
  217. }
  218. resume () {
  219. return this[RESUME]()
  220. }
  221. pause () {
  222. this[FLOWING] = false
  223. this[PAUSED] = true
  224. }
  225. get destroyed () {
  226. return this[DESTROYED]
  227. }
  228. get flowing () {
  229. return this[FLOWING]
  230. }
  231. get paused () {
  232. return this[PAUSED]
  233. }
  234. [BUFFERPUSH] (chunk) {
  235. if (this[OBJECTMODE])
  236. this[BUFFERLENGTH] += 1
  237. else
  238. this[BUFFERLENGTH] += chunk.length
  239. return this.buffer.push(chunk)
  240. }
  241. [BUFFERSHIFT] () {
  242. if (this.buffer.length) {
  243. if (this[OBJECTMODE])
  244. this[BUFFERLENGTH] -= 1
  245. else
  246. this[BUFFERLENGTH] -= this.buffer.head.value.length
  247. }
  248. return this.buffer.shift()
  249. }
  250. [FLUSH] (noDrain) {
  251. do {} while (this[FLUSHCHUNK](this[BUFFERSHIFT]()))
  252. if (!noDrain && !this.buffer.length && !this[EOF])
  253. this.emit('drain')
  254. }
  255. [FLUSHCHUNK] (chunk) {
  256. return chunk ? (this.emit('data', chunk), this.flowing) : false
  257. }
  258. pipe (dest, opts) {
  259. if (this[DESTROYED])
  260. return
  261. const ended = this[EMITTED_END]
  262. opts = opts || {}
  263. if (dest === process.stdout || dest === process.stderr)
  264. opts.end = false
  265. else
  266. opts.end = opts.end !== false
  267. const p = { dest: dest, opts: opts, ondrain: _ => this[RESUME]() }
  268. this.pipes.push(p)
  269. dest.on('drain', p.ondrain)
  270. this[RESUME]()
  271. // piping an ended stream ends immediately
  272. if (ended && p.opts.end)
  273. p.dest.end()
  274. return dest
  275. }
  276. addListener (ev, fn) {
  277. return this.on(ev, fn)
  278. }
  279. on (ev, fn) {
  280. try {
  281. return super.on(ev, fn)
  282. } finally {
  283. if (ev === 'data' && !this.pipes.length && !this.flowing)
  284. this[RESUME]()
  285. else if (isEndish(ev) && this[EMITTED_END]) {
  286. super.emit(ev)
  287. this.removeAllListeners(ev)
  288. }
  289. }
  290. }
  291. get emittedEnd () {
  292. return this[EMITTED_END]
  293. }
  294. [MAYBE_EMIT_END] () {
  295. if (!this[EMITTING_END] &&
  296. !this[EMITTED_END] &&
  297. !this[DESTROYED] &&
  298. this.buffer.length === 0 &&
  299. this[EOF]) {
  300. this[EMITTING_END] = true
  301. this.emit('end')
  302. this.emit('prefinish')
  303. this.emit('finish')
  304. if (this[CLOSED])
  305. this.emit('close')
  306. this[EMITTING_END] = false
  307. }
  308. }
  309. emit (ev, data) {
  310. // error and close are only events allowed after calling destroy()
  311. if (ev !== 'error' && ev !== 'close' && ev !== DESTROYED && this[DESTROYED])
  312. return
  313. else if (ev === 'data') {
  314. if (!data)
  315. return
  316. if (this.pipes.length)
  317. this.pipes.forEach(p =>
  318. p.dest.write(data) === false && this.pause())
  319. } else if (ev === 'end') {
  320. // only actual end gets this treatment
  321. if (this[EMITTED_END] === true)
  322. return
  323. this[EMITTED_END] = true
  324. this.readable = false
  325. if (this[DECODER]) {
  326. data = this[DECODER].end()
  327. if (data) {
  328. this.pipes.forEach(p => p.dest.write(data))
  329. super.emit('data', data)
  330. }
  331. }
  332. this.pipes.forEach(p => {
  333. p.dest.removeListener('drain', p.ondrain)
  334. if (p.opts.end)
  335. p.dest.end()
  336. })
  337. } else if (ev === 'close') {
  338. this[CLOSED] = true
  339. // don't emit close before 'end' and 'finish'
  340. if (!this[EMITTED_END] && !this[DESTROYED])
  341. return
  342. }
  343. // TODO: replace with a spread operator when Node v4 support drops
  344. const args = new Array(arguments.length)
  345. args[0] = ev
  346. args[1] = data
  347. if (arguments.length > 2) {
  348. for (let i = 2; i < arguments.length; i++) {
  349. args[i] = arguments[i]
  350. }
  351. }
  352. try {
  353. return super.emit.apply(this, args)
  354. } finally {
  355. if (!isEndish(ev))
  356. this[MAYBE_EMIT_END]()
  357. else
  358. this.removeAllListeners(ev)
  359. }
  360. }
  361. // const all = await stream.collect()
  362. collect () {
  363. const buf = []
  364. if (!this[OBJECTMODE])
  365. buf.dataLength = 0
  366. // set the promise first, in case an error is raised
  367. // by triggering the flow here.
  368. const p = this.promise()
  369. this.on('data', c => {
  370. buf.push(c)
  371. if (!this[OBJECTMODE])
  372. buf.dataLength += c.length
  373. })
  374. return p.then(() => buf)
  375. }
  376. // const data = await stream.concat()
  377. concat () {
  378. return this[OBJECTMODE]
  379. ? Promise.reject(new Error('cannot concat in objectMode'))
  380. : this.collect().then(buf =>
  381. this[OBJECTMODE]
  382. ? Promise.reject(new Error('cannot concat in objectMode'))
  383. : this[ENCODING] ? buf.join('') : Buffer.concat(buf, buf.dataLength))
  384. }
  385. // stream.promise().then(() => done, er => emitted error)
  386. promise () {
  387. return new Promise((resolve, reject) => {
  388. this.on(DESTROYED, () => reject(new Error('stream destroyed')))
  389. this.on('end', () => resolve())
  390. this.on('error', er => reject(er))
  391. })
  392. }
  393. // for await (let chunk of stream)
  394. [ASYNCITERATOR] () {
  395. const next = () => {
  396. const res = this.read()
  397. if (res !== null)
  398. return Promise.resolve({ done: false, value: res })
  399. if (this[EOF])
  400. return Promise.resolve({ done: true })
  401. let resolve = null
  402. let reject = null
  403. const onerr = er => {
  404. this.removeListener('data', ondata)
  405. this.removeListener('end', onend)
  406. reject(er)
  407. }
  408. const ondata = value => {
  409. this.removeListener('error', onerr)
  410. this.removeListener('end', onend)
  411. this.pause()
  412. resolve({ value: value, done: !!this[EOF] })
  413. }
  414. const onend = () => {
  415. this.removeListener('error', onerr)
  416. this.removeListener('data', ondata)
  417. resolve({ done: true })
  418. }
  419. const ondestroy = () => onerr(new Error('stream destroyed'))
  420. return new Promise((res, rej) => {
  421. reject = rej
  422. resolve = res
  423. this.once(DESTROYED, ondestroy)
  424. this.once('error', onerr)
  425. this.once('end', onend)
  426. this.once('data', ondata)
  427. })
  428. }
  429. return { next }
  430. }
  431. // for (let chunk of stream)
  432. [ITERATOR] () {
  433. const next = () => {
  434. const value = this.read()
  435. const done = value === null
  436. return { value, done }
  437. }
  438. return { next }
  439. }
  440. destroy (er) {
  441. if (this[DESTROYED]) {
  442. if (er)
  443. this.emit('error', er)
  444. else
  445. this.emit(DESTROYED)
  446. return this
  447. }
  448. this[DESTROYED] = true
  449. // throw away all buffered data, it's never coming out
  450. this.buffer = new Yallist()
  451. this[BUFFERLENGTH] = 0
  452. if (typeof this.close === 'function' && !this[CLOSED])
  453. this.close()
  454. if (er)
  455. this.emit('error', er)
  456. else // if no error to emit, still reject pending promises
  457. this.emit(DESTROYED)
  458. return this
  459. }
  460. static isStream (s) {
  461. return !!s && (s instanceof Minipass || s instanceof Stream ||
  462. s instanceof EE && (
  463. typeof s.pipe === 'function' || // readable
  464. (typeof s.write === 'function' && typeof s.end === 'function') // writable
  465. ))
  466. }
  467. }