123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684 |
- 'use strict'
- var fs = require('fs')
- var path = require('path')
- var test = require('tape').test
- var from = require('from2')
- var crypto = require('crypto')
- var sink = require('flush-write-stream')
- var cloneable = require('./')
- test('basic passthrough', function (t) {
- t.plan(2)
- var read = false
- var source = from(function (size, next) {
- if (read) {
- this.push(null)
- } else {
- read = true
- this.push('hello world')
- }
- next()
- })
- var instance = cloneable(source)
- t.notOk(read, 'stream not started')
- instance.pipe(sink(function (chunk, enc, cb) {
- t.equal(chunk.toString(), 'hello world', 'chunk matches')
- cb()
- }))
- })
- test('clone sync', function (t) {
- t.plan(4)
- var read = false
- var source = from(function (size, next) {
- if (read) {
- this.push(null)
- } else {
- read = true
- this.push('hello world')
- }
- next()
- })
- var instance = cloneable(source)
- t.notOk(read, 'stream not started')
- var cloned = instance.clone()
- t.notOk(read, 'stream not started')
- instance.pipe(sink(function (chunk, enc, cb) {
- t.equal(chunk.toString(), 'hello world', 'chunk matches')
- cb()
- }))
- cloned.pipe(sink(function (chunk, enc, cb) {
- t.equal(chunk.toString(), 'hello world', 'chunk matches')
- cb()
- }))
- })
- test('clone async', function (t) {
- t.plan(4)
- var read = false
- var source = from(function (size, next) {
- if (read) {
- this.push(null)
- } else {
- read = true
- this.push('hello world')
- }
- next()
- })
- var instance = cloneable(source)
- t.notOk(read, 'stream not started')
- var cloned = instance.clone()
- t.notOk(read, 'stream not started')
- instance.pipe(sink(function (chunk, enc, cb) {
- t.equal(chunk.toString(), 'hello world', 'chunk matches')
- cb()
- }))
- setImmediate(function () {
- cloned.pipe(sink(function (chunk, enc, cb) {
- t.equal(chunk.toString(), 'hello world', 'chunk matches')
- cb()
- }))
- })
- })
- test('basic passthrough in obj mode', function (t) {
- t.plan(2)
- var read = false
- var source = from.obj(function (size, next) {
- if (read) {
- return this.push(null)
- } else {
- read = true
- this.push({ hello: 'world' })
- }
- next()
- })
- var instance = cloneable(source)
- t.notOk(read, 'stream not started')
- instance.pipe(sink.obj(function (chunk, enc, cb) {
- t.deepEqual(chunk, { hello: 'world' }, 'chunk matches')
- cb()
- }))
- })
- test('multiple clone in object mode', function (t) {
- t.plan(4)
- var read = false
- var source = from.obj(function (size, next) {
- if (read) {
- return this.push(null)
- } else {
- read = true
- this.push({ hello: 'world' })
- }
- next()
- })
- var instance = cloneable(source)
- t.notOk(read, 'stream not started')
- var cloned = instance.clone()
- t.notOk(read, 'stream not started')
- instance.pipe(sink.obj(function (chunk, enc, cb) {
- t.deepEqual(chunk, { hello: 'world' }, 'chunk matches')
- cb()
- }))
- setImmediate(function () {
- cloned.pipe(sink.obj(function (chunk, enc, cb) {
- t.deepEqual(chunk, { hello: 'world' }, 'chunk matches')
- cb()
- }))
- })
- })
- test('basic passthrough with data event', function (t) {
- t.plan(2)
- var read = false
- var source = from(function (size, next) {
- if (read) {
- this.push(null)
- } else {
- read = true
- this.push('hello world')
- }
- next()
- })
- var instance = cloneable(source)
- t.notOk(read, 'stream not started')
- var data = ''
- instance.on('data', function (chunk) {
- data += chunk.toString()
- })
- instance.on('end', function () {
- t.equal(data, 'hello world', 'chunk matches')
- })
- })
- test('basic passthrough with data event on clone', function (t) {
- t.plan(3)
- var read = false
- var source = from(function (size, next) {
- if (read) {
- this.push(null)
- } else {
- read = true
- this.push('hello world')
- }
- next()
- })
- var instance = cloneable(source)
- var cloned = instance.clone()
- t.notOk(read, 'stream not started')
- var data = ''
- cloned.on('data', function (chunk) {
- data += chunk.toString()
- })
- cloned.on('end', function () {
- t.equal(data, 'hello world', 'chunk matches in clone')
- })
- instance.pipe(sink(function (chunk, enc, cb) {
- t.equal(chunk.toString(), 'hello world', 'chunk matches in instance')
- cb()
- }))
- })
- test('errors if cloned after start', function (t) {
- t.plan(2)
- var source = from(function (size, next) {
- this.push('hello world')
- this.push(null)
- next()
- })
- var instance = cloneable(source)
- instance.pipe(sink(function (chunk, enc, cb) {
- t.equal(chunk.toString(), 'hello world', 'chunk matches')
- t.throws(function () {
- instance.clone()
- }, 'throws if cloned after start')
- cb()
- }))
- })
- test('basic passthrough with readable event', function (t) {
- t.plan(2)
- var read = false
- var source = from(function (size, next) {
- if (read) {
- this.push(null)
- } else {
- read = true
- this.push('hello world')
- }
- next()
- })
- var instance = cloneable(source)
- t.notOk(read, 'stream not started')
- var data = ''
- instance.on('readable', function () {
- var chunk
- while ((chunk = this.read()) !== null) {
- data += chunk.toString()
- }
- })
- instance.on('end', function () {
- t.equal(data, 'hello world', 'chunk matches')
- })
- })
- test('basic passthrough with readable event on clone', function (t) {
- t.plan(3)
- var read = false
- var source = from(function (size, next) {
- if (read) {
- this.push(null)
- } else {
- read = true
- this.push('hello world')
- }
- next()
- })
- var instance = cloneable(source)
- var cloned = instance.clone()
- t.notOk(read, 'stream not started')
- var data = ''
- cloned.on('readable', function () {
- var chunk
- while ((chunk = this.read()) !== null) {
- data += chunk.toString()
- }
- })
- cloned.on('end', function () {
- t.equal(data, 'hello world', 'chunk matches in clone')
- })
- instance.pipe(sink(function (chunk, enc, cb) {
- t.equal(chunk.toString(), 'hello world', 'chunk matches in instance')
- cb()
- }))
- })
- test('source error destroys all', function (t) {
- t.plan(3)
- var source = from()
- var instance = cloneable(source)
- var clone = instance.clone()
- source.on('error', function (err) {
- t.ok(err, 'source errors')
- instance.on('error', function (err2) {
- t.ok(err === err2, 'instance receives same error')
- })
- clone.on('error', function (err3) {
- t.ok(err === err3, 'clone receives same error')
- })
- })
- source.emit('error', new Error())
- })
- test('source destroy destroys all', function (t) {
- t.plan(2)
- var source = from()
- var instance = cloneable(source)
- var clone = instance.clone()
- instance.on('end', function () {
- t.pass('instance has ended')
- })
- clone.on('end', function () {
- t.pass('clone has ended')
- })
- clone.resume()
- instance.resume()
- source.destroy()
- })
- test('instance error destroys all but the source', function (t) {
- t.plan(2)
- var source = from()
- var instance = cloneable(source)
- var clone = instance.clone()
- source.on('close', function () {
- t.fail('source should not be closed')
- })
- instance.on('error', function (err) {
- t.is(err.message, 'beep', 'instance errors')
- })
- instance.on('close', function () {
- t.fail('close should not be emitted')
- })
- clone.on('error', function (err) {
- t.is(err.message, 'beep', 'instance errors')
- })
- clone.on('close', function () {
- t.fail('close should not be emitted')
- })
- instance.destroy(new Error('beep'))
- })
- test('instance destroy destroys all but the source', function (t) {
- t.plan(2)
- var source = from()
- var instance = cloneable(source)
- var clone = instance.clone()
- source.on('close', function () {
- t.fail('source should not be closed')
- })
- instance.on('end', function () {
- t.pass('instance has ended')
- })
- clone.on('end', function () {
- t.pass('clone has ended')
- })
- instance.resume()
- clone.resume()
- instance.destroy()
- })
- test('clone destroy does not affect other clones, cloneable or source', function (t) {
- t.plan(1)
- var source = from()
- var instance = cloneable(source)
- var clone = instance.clone()
- var other = instance.clone()
- source.on('close', function () {
- t.fail('source should not be closed')
- })
- instance.on('close', function () {
- t.fail('instance should not be closed')
- })
- other.on('close', function () {
- t.fail('other clone should not be closed')
- })
- clone.on('close', function () {
- t.pass('clone is closed')
- })
- clone.destroy()
- })
- test('clone remains readable if other is destroyed', function (t) {
- t.plan(3)
- var read = false
- var source = from(function (size, next) {
- if (read) {
- this.push(null)
- } else {
- read = true
- this.push('hello')
- }
- next()
- })
- var instance = cloneable(source)
- var clone = instance.clone()
- var other = instance.clone()
- instance.pipe(sink.obj(function (chunk, enc, cb) {
- t.deepEqual(chunk.toString(), 'hello', 'instance chunk matches')
- cb()
- }))
- clone.pipe(sink.obj(function (chunk, enc, cb) {
- t.deepEqual(chunk.toString(), 'hello', 'clone chunk matches')
- cb()
- }))
- clone.on('close', function () {
- t.fail('clone should not be closed')
- })
- instance.on('close', function () {
- t.fail('instance should not be closed')
- })
- other.on('close', function () {
- t.pass('other is closed')
- })
- other.destroy()
- })
- test('clone of clone', function (t) {
- t.plan(6)
- var read = false
- var source = from(function (size, next) {
- if (read) {
- this.push(null)
- } else {
- read = true
- this.push('hello world')
- }
- next()
- })
- var instance = cloneable(source)
- t.notOk(read, 'stream not started')
- var cloned = instance.clone()
- t.notOk(read, 'stream not started')
- var replica = cloned.clone()
- t.notOk(read, 'stream not started')
- instance.pipe(sink(function (chunk, enc, cb) {
- t.equal(chunk.toString(), 'hello world', 'chunk matches')
- cb()
- }))
- cloned.pipe(sink(function (chunk, enc, cb) {
- t.equal(chunk.toString(), 'hello world', 'chunk matches')
- cb()
- }))
- replica.pipe(sink(function (chunk, enc, cb) {
- t.equal(chunk.toString(), 'hello world', 'chunk matches')
- cb()
- }))
- })
- test('from vinyl', function (t) {
- t.plan(3)
- var source = from(['wa', 'dup'])
- var instance = cloneable(source)
- var clone = instance.clone()
- var data = ''
- var data2 = ''
- var ends = 2
- function latch () {
- if (--ends === 0) {
- t.equal(data, data2)
- }
- }
- instance.on('data', function (chunk) {
- data += chunk.toString()
- })
- process.nextTick(function () {
- t.equal('', data, 'nothing was written yet')
- t.equal('', data2, 'nothing was written yet')
- clone.on('data', function (chunk) {
- data2 += chunk.toString()
- })
- })
- instance.on('end', latch)
- clone.on('end', latch)
- })
- test('waits till all are flowing', function (t) {
- t.plan(1)
- var source = from(['wa', 'dup'])
- var instance = cloneable(source)
- // we create a clone
- instance.clone()
- instance.on('data', function (chunk) {
- t.fail('this should never happen')
- })
- process.nextTick(function () {
- t.pass('wait till nextTick')
- })
- })
- test('isCloneable', function (t) {
- t.plan(4)
- var source = from(['hello', ' ', 'world'])
- t.notOk(cloneable.isCloneable(source), 'a generic readable is not cloneable')
- var instance = cloneable(source)
- t.ok(cloneable.isCloneable(instance), 'a cloneable is cloneable')
- var clone = instance.clone()
- t.ok(cloneable.isCloneable(clone), 'a clone is cloneable')
- var cloneClone = clone.clone()
- t.ok(cloneable.isCloneable(cloneClone), 'a clone of a clone is cloneable')
- })
- test('emits finish', function (t) {
- var chunks = ['a', 'b', 'c', 'd', null]
- var e1 = ['a', 'b', 'c', 'd']
- var e2 = ['a', 'b', 'c', 'd']
- t.plan(2 + e1.length + e2.length)
- var source = from(function (size, next) {
- setImmediate(next, null, chunks.shift())
- })
- var instance = cloneable(source)
- var clone = instance.clone()
- clone.on('finish', t.pass.bind(null, 'clone emits finish'))
- instance.on('finish', t.pass.bind(null, 'main emits finish'))
- instance.pipe(sink(function (chunk, enc, cb) {
- t.equal(chunk.toString(), e1.shift(), 'chunk matches')
- cb()
- }))
- clone.on('data', function (chunk) {
- t.equal(chunk.toString(), e2.shift(), 'chunk matches')
- })
- })
- test('clone async w resume', function (t) {
- t.plan(4)
- var read = false
- var source = from(function (size, next) {
- if (read) {
- this.push(null)
- } else {
- read = true
- this.push('hello world')
- }
- next()
- })
- var instance = cloneable(source)
- t.notOk(read, 'stream not started')
- var cloned = instance.clone()
- t.notOk(read, 'stream not started')
- instance.on('end', t.pass.bind(null, 'end emitted'))
- instance.resume()
- setImmediate(function () {
- cloned.on('end', t.pass.bind(null, 'end emitted'))
- cloned.resume()
- })
- })
- test('big file', function (t) {
- t.plan(13)
- var stream = cloneable(fs.createReadStream(path.join(__dirname, 'big')))
- var hash = crypto.createHash('sha1')
- hash.setEncoding('hex')
- var toCheck
- fs.createReadStream(path.join(__dirname, 'big'))
- .pipe(hash)
- .once('readable', function () {
- toCheck = hash.read()
- t.ok(toCheck)
- })
- function pipe (s, num) {
- s.on('end', function () {
- t.pass('end for ' + num)
- })
- var dest = path.join(__dirname, 'out')
- s.pipe(fs.createWriteStream(dest))
- .on('finish', function () {
- t.pass('finish for ' + num)
- var destHash = crypto.createHash('sha1')
- destHash.setEncoding('hex')
- fs.createReadStream(dest)
- .pipe(destHash)
- .once('readable', function () {
- var hash = destHash.read()
- t.ok(hash)
- t.equal(hash, toCheck)
- })
- })
- }
- // Pipe in another event loop tick <-- this one finished only, it's the original cloneable.
- setImmediate(pipe.bind(null, stream, 1))
- // Pipe in the same event loop tick
- pipe(stream.clone(), 0)
- // Pipe a long time after
- setTimeout(pipe.bind(null, stream.clone(), 2), 1000)
- })
|