index.js 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560
  1. 'use strict'
  2. const proc = typeof process === 'object' && process ? process : {
  3. stdout: null,
  4. stderr: null,
  5. }
  6. const EE = require('events')
  7. const Stream = require('stream')
  8. const Yallist = require('yallist')
  9. const SD = require('string_decoder').StringDecoder
  10. const EOF = Symbol('EOF')
  11. const MAYBE_EMIT_END = Symbol('maybeEmitEnd')
  12. const EMITTED_END = Symbol('emittedEnd')
  13. const EMITTING_END = Symbol('emittingEnd')
  14. const EMITTED_ERROR = Symbol('emittedError')
  15. const CLOSED = Symbol('closed')
  16. const READ = Symbol('read')
  17. const FLUSH = Symbol('flush')
  18. const FLUSHCHUNK = Symbol('flushChunk')
  19. const ENCODING = Symbol('encoding')
  20. const DECODER = Symbol('decoder')
  21. const FLOWING = Symbol('flowing')
  22. const PAUSED = Symbol('paused')
  23. const RESUME = Symbol('resume')
  24. const BUFFERLENGTH = Symbol('bufferLength')
  25. const BUFFERPUSH = Symbol('bufferPush')
  26. const BUFFERSHIFT = Symbol('bufferShift')
  27. const OBJECTMODE = Symbol('objectMode')
  28. const DESTROYED = Symbol('destroyed')
  29. // TODO remove when Node v8 support drops
  30. const doIter = global._MP_NO_ITERATOR_SYMBOLS_ !== '1'
  31. const ASYNCITERATOR = doIter && Symbol.asyncIterator
  32. || Symbol('asyncIterator not implemented')
  33. const ITERATOR = doIter && Symbol.iterator
  34. || Symbol('iterator not implemented')
  35. // events that mean 'the stream is over'
  36. // these are treated specially, and re-emitted
  37. // if they are listened for after emitting.
  38. const isEndish = ev =>
  39. ev === 'end' ||
  40. ev === 'finish' ||
  41. ev === 'prefinish'
  42. const isArrayBuffer = b => b instanceof ArrayBuffer ||
  43. typeof b === 'object' &&
  44. b.constructor &&
  45. b.constructor.name === 'ArrayBuffer' &&
  46. b.byteLength >= 0
  47. const isArrayBufferView = b => !Buffer.isBuffer(b) && ArrayBuffer.isView(b)
  48. module.exports = class Minipass extends Stream {
  49. constructor (options) {
  50. super()
  51. this[FLOWING] = false
  52. // whether we're explicitly paused
  53. this[PAUSED] = false
  54. this.pipes = new Yallist()
  55. this.buffer = new Yallist()
  56. this[OBJECTMODE] = options && options.objectMode || false
  57. if (this[OBJECTMODE])
  58. this[ENCODING] = null
  59. else
  60. this[ENCODING] = options && options.encoding || null
  61. if (this[ENCODING] === 'buffer')
  62. this[ENCODING] = null
  63. this[DECODER] = this[ENCODING] ? new SD(this[ENCODING]) : null
  64. this[EOF] = false
  65. this[EMITTED_END] = false
  66. this[EMITTING_END] = false
  67. this[CLOSED] = false
  68. this[EMITTED_ERROR] = null
  69. this.writable = true
  70. this.readable = true
  71. this[BUFFERLENGTH] = 0
  72. this[DESTROYED] = false
  73. }
  74. get bufferLength () { return this[BUFFERLENGTH] }
  75. get encoding () { return this[ENCODING] }
  76. set encoding (enc) {
  77. if (this[OBJECTMODE])
  78. throw new Error('cannot set encoding in objectMode')
  79. if (this[ENCODING] && enc !== this[ENCODING] &&
  80. (this[DECODER] && this[DECODER].lastNeed || this[BUFFERLENGTH]))
  81. throw new Error('cannot change encoding')
  82. if (this[ENCODING] !== enc) {
  83. this[DECODER] = enc ? new SD(enc) : null
  84. if (this.buffer.length)
  85. this.buffer = this.buffer.map(chunk => this[DECODER].write(chunk))
  86. }
  87. this[ENCODING] = enc
  88. }
  89. setEncoding (enc) {
  90. this.encoding = enc
  91. }
  92. get objectMode () { return this[OBJECTMODE] }
  93. set objectMode (om) { this[OBJECTMODE] = this[OBJECTMODE] || !!om }
  94. write (chunk, encoding, cb) {
  95. if (this[EOF])
  96. throw new Error('write after end')
  97. if (this[DESTROYED]) {
  98. this.emit('error', Object.assign(
  99. new Error('Cannot call write after a stream was destroyed'),
  100. { code: 'ERR_STREAM_DESTROYED' }
  101. ))
  102. return true
  103. }
  104. if (typeof encoding === 'function')
  105. cb = encoding, encoding = 'utf8'
  106. if (!encoding)
  107. encoding = 'utf8'
  108. // convert array buffers and typed array views into buffers
  109. // at some point in the future, we may want to do the opposite!
  110. // leave strings and buffers as-is
  111. // anything else switches us into object mode
  112. if (!this[OBJECTMODE] && !Buffer.isBuffer(chunk)) {
  113. if (isArrayBufferView(chunk))
  114. chunk = Buffer.from(chunk.buffer, chunk.byteOffset, chunk.byteLength)
  115. else if (isArrayBuffer(chunk))
  116. chunk = Buffer.from(chunk)
  117. else if (typeof chunk !== 'string')
  118. // use the setter so we throw if we have encoding set
  119. this.objectMode = true
  120. }
  121. // this ensures at this point that the chunk is a buffer or string
  122. // don't buffer it up or send it to the decoder
  123. if (!this.objectMode && !chunk.length) {
  124. if (this[BUFFERLENGTH] !== 0)
  125. this.emit('readable')
  126. if (cb)
  127. cb()
  128. return this.flowing
  129. }
  130. // fast-path writing strings of same encoding to a stream with
  131. // an empty buffer, skipping the buffer/decoder dance
  132. if (typeof chunk === 'string' && !this[OBJECTMODE] &&
  133. // unless it is a string already ready for us to use
  134. !(encoding === this[ENCODING] && !this[DECODER].lastNeed)) {
  135. chunk = Buffer.from(chunk, encoding)
  136. }
  137. if (Buffer.isBuffer(chunk) && this[ENCODING])
  138. chunk = this[DECODER].write(chunk)
  139. if (this.flowing) {
  140. // if we somehow have something in the buffer, but we think we're
  141. // flowing, then we need to flush all that out first, or we get
  142. // chunks coming in out of order. Can't emit 'drain' here though,
  143. // because we're mid-write, so that'd be bad.
  144. if (this[BUFFERLENGTH] !== 0)
  145. this[FLUSH](true)
  146. // if we are still flowing after flushing the buffer we can emit the
  147. // chunk otherwise we have to buffer it.
  148. this.flowing
  149. ? this.emit('data', chunk)
  150. : this[BUFFERPUSH](chunk)
  151. } else
  152. this[BUFFERPUSH](chunk)
  153. if (this[BUFFERLENGTH] !== 0)
  154. this.emit('readable')
  155. if (cb)
  156. cb()
  157. return this.flowing
  158. }
  159. read (n) {
  160. if (this[DESTROYED])
  161. return null
  162. try {
  163. if (this[BUFFERLENGTH] === 0 || n === 0 || n > this[BUFFERLENGTH])
  164. return null
  165. if (this[OBJECTMODE])
  166. n = null
  167. if (this.buffer.length > 1 && !this[OBJECTMODE]) {
  168. if (this.encoding)
  169. this.buffer = new Yallist([
  170. Array.from(this.buffer).join('')
  171. ])
  172. else
  173. this.buffer = new Yallist([
  174. Buffer.concat(Array.from(this.buffer), this[BUFFERLENGTH])
  175. ])
  176. }
  177. return this[READ](n || null, this.buffer.head.value)
  178. } finally {
  179. this[MAYBE_EMIT_END]()
  180. }
  181. }
  182. [READ] (n, chunk) {
  183. if (n === chunk.length || n === null)
  184. this[BUFFERSHIFT]()
  185. else {
  186. this.buffer.head.value = chunk.slice(n)
  187. chunk = chunk.slice(0, n)
  188. this[BUFFERLENGTH] -= n
  189. }
  190. this.emit('data', chunk)
  191. if (!this.buffer.length && !this[EOF])
  192. this.emit('drain')
  193. return chunk
  194. }
  195. end (chunk, encoding, cb) {
  196. if (typeof chunk === 'function')
  197. cb = chunk, chunk = null
  198. if (typeof encoding === 'function')
  199. cb = encoding, encoding = 'utf8'
  200. if (chunk)
  201. this.write(chunk, encoding)
  202. if (cb)
  203. this.once('end', cb)
  204. this[EOF] = true
  205. this.writable = false
  206. // if we haven't written anything, then go ahead and emit,
  207. // even if we're not reading.
  208. // we'll re-emit if a new 'end' listener is added anyway.
  209. // This makes MP more suitable to write-only use cases.
  210. if (this.flowing || !this[PAUSED])
  211. this[MAYBE_EMIT_END]()
  212. return this
  213. }
  214. // don't let the internal resume be overwritten
  215. [RESUME] () {
  216. if (this[DESTROYED])
  217. return
  218. this[PAUSED] = false
  219. this[FLOWING] = true
  220. this.emit('resume')
  221. if (this.buffer.length)
  222. this[FLUSH]()
  223. else if (this[EOF])
  224. this[MAYBE_EMIT_END]()
  225. else
  226. this.emit('drain')
  227. }
  228. resume () {
  229. return this[RESUME]()
  230. }
  231. pause () {
  232. this[FLOWING] = false
  233. this[PAUSED] = true
  234. }
  235. get destroyed () {
  236. return this[DESTROYED]
  237. }
  238. get flowing () {
  239. return this[FLOWING]
  240. }
  241. get paused () {
  242. return this[PAUSED]
  243. }
  244. [BUFFERPUSH] (chunk) {
  245. if (this[OBJECTMODE])
  246. this[BUFFERLENGTH] += 1
  247. else
  248. this[BUFFERLENGTH] += chunk.length
  249. return this.buffer.push(chunk)
  250. }
  251. [BUFFERSHIFT] () {
  252. if (this.buffer.length) {
  253. if (this[OBJECTMODE])
  254. this[BUFFERLENGTH] -= 1
  255. else
  256. this[BUFFERLENGTH] -= this.buffer.head.value.length
  257. }
  258. return this.buffer.shift()
  259. }
  260. [FLUSH] (noDrain) {
  261. do {} while (this[FLUSHCHUNK](this[BUFFERSHIFT]()))
  262. if (!noDrain && !this.buffer.length && !this[EOF])
  263. this.emit('drain')
  264. }
  265. [FLUSHCHUNK] (chunk) {
  266. return chunk ? (this.emit('data', chunk), this.flowing) : false
  267. }
  268. pipe (dest, opts) {
  269. if (this[DESTROYED])
  270. return
  271. const ended = this[EMITTED_END]
  272. opts = opts || {}
  273. if (dest === proc.stdout || dest === proc.stderr)
  274. opts.end = false
  275. else
  276. opts.end = opts.end !== false
  277. const p = { dest: dest, opts: opts, ondrain: _ => this[RESUME]() }
  278. this.pipes.push(p)
  279. dest.on('drain', p.ondrain)
  280. this[RESUME]()
  281. // piping an ended stream ends immediately
  282. if (ended && p.opts.end)
  283. p.dest.end()
  284. return dest
  285. }
  286. addListener (ev, fn) {
  287. return this.on(ev, fn)
  288. }
  289. on (ev, fn) {
  290. try {
  291. return super.on(ev, fn)
  292. } finally {
  293. if (ev === 'data' && !this.pipes.length && !this.flowing)
  294. this[RESUME]()
  295. else if (isEndish(ev) && this[EMITTED_END]) {
  296. super.emit(ev)
  297. this.removeAllListeners(ev)
  298. } else if (ev === 'error' && this[EMITTED_ERROR]) {
  299. fn.call(this, this[EMITTED_ERROR])
  300. }
  301. }
  302. }
  303. get emittedEnd () {
  304. return this[EMITTED_END]
  305. }
  306. [MAYBE_EMIT_END] () {
  307. if (!this[EMITTING_END] &&
  308. !this[EMITTED_END] &&
  309. !this[DESTROYED] &&
  310. this.buffer.length === 0 &&
  311. this[EOF]) {
  312. this[EMITTING_END] = true
  313. this.emit('end')
  314. this.emit('prefinish')
  315. this.emit('finish')
  316. if (this[CLOSED])
  317. this.emit('close')
  318. this[EMITTING_END] = false
  319. }
  320. }
  321. emit (ev, data) {
  322. // error and close are only events allowed after calling destroy()
  323. if (ev !== 'error' && ev !== 'close' && ev !== DESTROYED && this[DESTROYED])
  324. return
  325. else if (ev === 'data') {
  326. if (!data)
  327. return
  328. if (this.pipes.length)
  329. this.pipes.forEach(p =>
  330. p.dest.write(data) === false && this.pause())
  331. } else if (ev === 'end') {
  332. // only actual end gets this treatment
  333. if (this[EMITTED_END] === true)
  334. return
  335. this[EMITTED_END] = true
  336. this.readable = false
  337. if (this[DECODER]) {
  338. data = this[DECODER].end()
  339. if (data) {
  340. this.pipes.forEach(p => p.dest.write(data))
  341. super.emit('data', data)
  342. }
  343. }
  344. this.pipes.forEach(p => {
  345. p.dest.removeListener('drain', p.ondrain)
  346. if (p.opts.end)
  347. p.dest.end()
  348. })
  349. } else if (ev === 'close') {
  350. this[CLOSED] = true
  351. // don't emit close before 'end' and 'finish'
  352. if (!this[EMITTED_END] && !this[DESTROYED])
  353. return
  354. } else if (ev === 'error') {
  355. this[EMITTED_ERROR] = data
  356. }
  357. // TODO: replace with a spread operator when Node v4 support drops
  358. const args = new Array(arguments.length)
  359. args[0] = ev
  360. args[1] = data
  361. if (arguments.length > 2) {
  362. for (let i = 2; i < arguments.length; i++) {
  363. args[i] = arguments[i]
  364. }
  365. }
  366. try {
  367. return super.emit.apply(this, args)
  368. } finally {
  369. if (!isEndish(ev))
  370. this[MAYBE_EMIT_END]()
  371. else
  372. this.removeAllListeners(ev)
  373. }
  374. }
  375. // const all = await stream.collect()
  376. collect () {
  377. const buf = []
  378. if (!this[OBJECTMODE])
  379. buf.dataLength = 0
  380. // set the promise first, in case an error is raised
  381. // by triggering the flow here.
  382. const p = this.promise()
  383. this.on('data', c => {
  384. buf.push(c)
  385. if (!this[OBJECTMODE])
  386. buf.dataLength += c.length
  387. })
  388. return p.then(() => buf)
  389. }
  390. // const data = await stream.concat()
  391. concat () {
  392. return this[OBJECTMODE]
  393. ? Promise.reject(new Error('cannot concat in objectMode'))
  394. : this.collect().then(buf =>
  395. this[OBJECTMODE]
  396. ? Promise.reject(new Error('cannot concat in objectMode'))
  397. : this[ENCODING] ? buf.join('') : Buffer.concat(buf, buf.dataLength))
  398. }
  399. // stream.promise().then(() => done, er => emitted error)
  400. promise () {
  401. return new Promise((resolve, reject) => {
  402. this.on(DESTROYED, () => reject(new Error('stream destroyed')))
  403. this.on('error', er => reject(er))
  404. this.on('end', () => resolve())
  405. })
  406. }
  407. // for await (let chunk of stream)
  408. [ASYNCITERATOR] () {
  409. const next = () => {
  410. const res = this.read()
  411. if (res !== null)
  412. return Promise.resolve({ done: false, value: res })
  413. if (this[EOF])
  414. return Promise.resolve({ done: true })
  415. let resolve = null
  416. let reject = null
  417. const onerr = er => {
  418. this.removeListener('data', ondata)
  419. this.removeListener('end', onend)
  420. reject(er)
  421. }
  422. const ondata = value => {
  423. this.removeListener('error', onerr)
  424. this.removeListener('end', onend)
  425. this.pause()
  426. resolve({ value: value, done: !!this[EOF] })
  427. }
  428. const onend = () => {
  429. this.removeListener('error', onerr)
  430. this.removeListener('data', ondata)
  431. resolve({ done: true })
  432. }
  433. const ondestroy = () => onerr(new Error('stream destroyed'))
  434. return new Promise((res, rej) => {
  435. reject = rej
  436. resolve = res
  437. this.once(DESTROYED, ondestroy)
  438. this.once('error', onerr)
  439. this.once('end', onend)
  440. this.once('data', ondata)
  441. })
  442. }
  443. return { next }
  444. }
  445. // for (let chunk of stream)
  446. [ITERATOR] () {
  447. const next = () => {
  448. const value = this.read()
  449. const done = value === null
  450. return { value, done }
  451. }
  452. return { next }
  453. }
  454. destroy (er) {
  455. if (this[DESTROYED]) {
  456. if (er)
  457. this.emit('error', er)
  458. else
  459. this.emit(DESTROYED)
  460. return this
  461. }
  462. this[DESTROYED] = true
  463. // throw away all buffered data, it's never coming out
  464. this.buffer = new Yallist()
  465. this[BUFFERLENGTH] = 0
  466. if (typeof this.close === 'function' && !this[CLOSED])
  467. this.close()
  468. if (er)
  469. this.emit('error', er)
  470. else // if no error to emit, still reject pending promises
  471. this.emit(DESTROYED)
  472. return this
  473. }
  474. static isStream (s) {
  475. return !!s && (s instanceof Minipass || s instanceof Stream ||
  476. s instanceof EE && (
  477. typeof s.pipe === 'function' || // readable
  478. (typeof s.write === 'function' && typeof s.end === 'function') // writable
  479. ))
  480. }
  481. }