index.js 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. const Minipass = require('minipass')
  2. const EE = require('events')
  3. const isStream = s => s && s instanceof EE && (
  4. typeof s.pipe === 'function' || // readable
  5. (typeof s.write === 'function' && typeof s.end === 'function') // writable
  6. )
  7. const _head = Symbol('_head')
  8. const _tail = Symbol('_tail')
  9. const _linkStreams = Symbol('_linkStreams')
  10. const _setHead = Symbol('_setHead')
  11. const _setTail = Symbol('_setTail')
  12. const _onError = Symbol('_onError')
  13. const _onData = Symbol('_onData')
  14. const _onEnd = Symbol('_onEnd')
  15. const _onDrain = Symbol('_onDrain')
  16. const _streams = Symbol('_streams')
  17. class Pipeline extends Minipass {
  18. constructor (opts, ...streams) {
  19. if (isStream(opts)) {
  20. streams.unshift(opts)
  21. opts = {}
  22. }
  23. super(opts)
  24. this[_streams] = []
  25. if (streams.length)
  26. this.push(...streams)
  27. }
  28. [_linkStreams] (streams) {
  29. // reduce takes (left,right), and we return right to make it the
  30. // new left value.
  31. return streams.reduce((src, dest) => {
  32. src.on('error', er => dest.emit('error', er))
  33. src.pipe(dest)
  34. return dest
  35. })
  36. }
  37. push (...streams) {
  38. this[_streams].push(...streams)
  39. if (this[_tail])
  40. streams.unshift(this[_tail])
  41. const linkRet = this[_linkStreams](streams)
  42. this[_setTail](linkRet)
  43. if (!this[_head])
  44. this[_setHead](streams[0])
  45. }
  46. unshift (...streams) {
  47. this[_streams].unshift(...streams)
  48. if (this[_head])
  49. streams.push(this[_head])
  50. const linkRet = this[_linkStreams](streams)
  51. this[_setHead](streams[0])
  52. if (!this[_tail])
  53. this[_setTail](linkRet)
  54. }
  55. destroy (er) {
  56. // set fire to the whole thing.
  57. this[_streams].forEach(s =>
  58. typeof s.destroy === 'function' && s.destroy())
  59. return super.destroy(er)
  60. }
  61. // readable interface -> tail
  62. [_setTail] (stream) {
  63. this[_tail] = stream
  64. stream.on('error', er => this[_onError](stream, er))
  65. stream.on('data', chunk => this[_onData](stream, chunk))
  66. stream.on('end', () => this[_onEnd](stream))
  67. stream.on('finish', () => this[_onEnd](stream))
  68. }
  69. // errors proxied down the pipeline
  70. // they're considered part of the "read" interface
  71. [_onError] (stream, er) {
  72. if (stream === this[_tail])
  73. this.emit('error', er)
  74. }
  75. [_onData] (stream, chunk) {
  76. if (stream === this[_tail])
  77. super.write(chunk)
  78. }
  79. [_onEnd] (stream) {
  80. if (stream === this[_tail])
  81. super.end()
  82. }
  83. pause () {
  84. super.pause()
  85. return this[_tail] && this[_tail].pause && this[_tail].pause()
  86. }
  87. // NB: Minipass calls its internal private [RESUME] method during
  88. // pipe drains, to avoid hazards where stream.resume() is overridden.
  89. // Thus, we need to listen to the resume *event*, not override the
  90. // resume() method, and proxy *that* to the tail.
  91. emit (ev, ...args) {
  92. if (ev === 'resume' && this[_tail] && this[_tail].resume)
  93. this[_tail].resume()
  94. return super.emit(ev, ...args)
  95. }
  96. // writable interface -> head
  97. [_setHead] (stream) {
  98. this[_head] = stream
  99. stream.on('drain', () => this[_onDrain](stream))
  100. }
  101. [_onDrain] (stream) {
  102. if (stream === this[_head])
  103. this.emit('drain')
  104. }
  105. write (chunk, enc, cb) {
  106. return this[_head].write(chunk, enc, cb) &&
  107. (this.flowing || this.buffer.length === 0)
  108. }
  109. end (chunk, enc, cb) {
  110. this[_head].end(chunk, enc, cb)
  111. return this
  112. }
  113. }
  114. module.exports = Pipeline