index.js 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. 'use strict'
  2. /*
  3. * merge2
  4. * https://github.com/teambition/merge2
  5. *
  6. * Copyright (c) 2014-2020 Teambition
  7. * Licensed under the MIT license.
  8. */
  9. const Stream = require('stream')
  10. const PassThrough = Stream.PassThrough
  11. const slice = Array.prototype.slice
  12. module.exports = merge2
  13. function merge2 () {
  14. const streamsQueue = []
  15. const args = slice.call(arguments)
  16. let merging = false
  17. let options = args[args.length - 1]
  18. if (options && !Array.isArray(options) && options.pipe == null) {
  19. args.pop()
  20. } else {
  21. options = {}
  22. }
  23. const doEnd = options.end !== false
  24. const doPipeError = options.pipeError === true
  25. if (options.objectMode == null) {
  26. options.objectMode = true
  27. }
  28. if (options.highWaterMark == null) {
  29. options.highWaterMark = 64 * 1024
  30. }
  31. const mergedStream = PassThrough(options)
  32. function addStream () {
  33. for (let i = 0, len = arguments.length; i < len; i++) {
  34. streamsQueue.push(pauseStreams(arguments[i], options))
  35. }
  36. mergeStream()
  37. return this
  38. }
  39. function mergeStream () {
  40. if (merging) {
  41. return
  42. }
  43. merging = true
  44. let streams = streamsQueue.shift()
  45. if (!streams) {
  46. process.nextTick(endStream)
  47. return
  48. }
  49. if (!Array.isArray(streams)) {
  50. streams = [streams]
  51. }
  52. let pipesCount = streams.length + 1
  53. function next () {
  54. if (--pipesCount > 0) {
  55. return
  56. }
  57. merging = false
  58. mergeStream()
  59. }
  60. function pipe (stream) {
  61. function onend () {
  62. stream.removeListener('merge2UnpipeEnd', onend)
  63. stream.removeListener('end', onend)
  64. if (doPipeError) {
  65. stream.removeListener('error', onerror)
  66. }
  67. next()
  68. }
  69. function onerror (err) {
  70. mergedStream.emit('error', err)
  71. }
  72. // skip ended stream
  73. if (stream._readableState.endEmitted) {
  74. return next()
  75. }
  76. stream.on('merge2UnpipeEnd', onend)
  77. stream.on('end', onend)
  78. if (doPipeError) {
  79. stream.on('error', onerror)
  80. }
  81. stream.pipe(mergedStream, { end: false })
  82. // compatible for old stream
  83. stream.resume()
  84. }
  85. for (let i = 0; i < streams.length; i++) {
  86. pipe(streams[i])
  87. }
  88. next()
  89. }
  90. function endStream () {
  91. merging = false
  92. // emit 'queueDrain' when all streams merged.
  93. mergedStream.emit('queueDrain')
  94. if (doEnd) {
  95. mergedStream.end()
  96. }
  97. }
  98. mergedStream.setMaxListeners(0)
  99. mergedStream.add = addStream
  100. mergedStream.on('unpipe', function (stream) {
  101. stream.emit('merge2UnpipeEnd')
  102. })
  103. if (args.length) {
  104. addStream.apply(null, args)
  105. }
  106. return mergedStream
  107. }
  108. // check and pause streams for pipe.
  109. function pauseStreams (streams, options) {
  110. if (!Array.isArray(streams)) {
  111. // Backwards-compat with old-style streams
  112. if (!streams._readableState && streams.pipe) {
  113. streams = streams.pipe(PassThrough(options))
  114. }
  115. if (!streams._readableState || !streams.pause || !streams.pipe) {
  116. throw new Error('Only readable stream can be merged.')
  117. }
  118. streams.pause()
  119. } else {
  120. for (let i = 0, len = streams.length; i < len; i++) {
  121. streams[i] = pauseStreams(streams[i], options)
  122. }
  123. }
  124. return streams
  125. }