123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107 |
- 'use strict'
- /*
- * merge2
- * https://github.com/teambition/merge2
- *
- * Copyright (c) 2014-2016 Teambition
- * Licensed under the MIT license.
- */
- const Stream = require('stream')
- const PassThrough = Stream.PassThrough
- const slice = Array.prototype.slice
- module.exports = merge2
- function merge2 () {
- const streamsQueue = []
- let merging = false
- let args = slice.call(arguments)
- let options = args[args.length - 1]
- if (options && !Array.isArray(options) && options.pipe == null) args.pop()
- else options = {}
- let doEnd = options.end !== false
- if (options.objectMode == null) options.objectMode = true
- if (options.highWaterMark == null) options.highWaterMark = 64 * 1024
- const mergedStream = PassThrough(options)
- function addStream () {
- for (let i = 0, len = arguments.length; i < len; i++) {
- streamsQueue.push(pauseStreams(arguments[i], options))
- }
- mergeStream()
- return this
- }
- function mergeStream () {
- if (merging) return
- merging = true
- let streams = streamsQueue.shift()
- if (!streams) {
- process.nextTick(endStream)
- return
- }
- if (!Array.isArray(streams)) streams = [streams]
- let pipesCount = streams.length + 1
- function next () {
- if (--pipesCount > 0) return
- merging = false
- mergeStream()
- }
- function pipe (stream) {
- function onend () {
- stream.removeListener('merge2UnpipeEnd', onend)
- stream.removeListener('end', onend)
- next()
- }
- // skip ended stream
- if (stream._readableState.endEmitted) return next()
- stream.on('merge2UnpipeEnd', onend)
- stream.on('end', onend)
- stream.pipe(mergedStream, {end: false})
- // compatible for old stream
- stream.resume()
- }
- for (let i = 0; i < streams.length; i++) pipe(streams[i])
- next()
- }
- function endStream () {
- merging = false
- // emit 'queueDrain' when all streams merged.
- mergedStream.emit('queueDrain')
- return doEnd && mergedStream.end()
- }
- mergedStream.setMaxListeners(0)
- mergedStream.add = addStream
- mergedStream.on('unpipe', function (stream) {
- stream.emit('merge2UnpipeEnd')
- })
- if (args.length) addStream.apply(null, args)
- return mergedStream
- }
- // check and pause streams for pipe.
- function pauseStreams (streams, options) {
- if (!Array.isArray(streams)) {
- // Backwards-compat with old-style streams
- if (!streams._readableState && streams.pipe) streams = streams.pipe(PassThrough(options))
- if (!streams._readableState || !streams.pause || !streams.pipe) {
- throw new Error('Only readable stream can be merged.')
- }
- streams.pause()
- } else {
- for (let i = 0, len = streams.length; i < len; i++) streams[i] = pauseStreams(streams[i], options)
- }
- return streams
- }
|