test.js 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684
  1. 'use strict'
  2. var fs = require('fs')
  3. var path = require('path')
  4. var test = require('tape').test
  5. var from = require('from2')
  6. var crypto = require('crypto')
  7. var sink = require('flush-write-stream')
  8. var cloneable = require('./')
  9. test('basic passthrough', function (t) {
  10. t.plan(2)
  11. var read = false
  12. var source = from(function (size, next) {
  13. if (read) {
  14. this.push(null)
  15. } else {
  16. read = true
  17. this.push('hello world')
  18. }
  19. next()
  20. })
  21. var instance = cloneable(source)
  22. t.notOk(read, 'stream not started')
  23. instance.pipe(sink(function (chunk, enc, cb) {
  24. t.equal(chunk.toString(), 'hello world', 'chunk matches')
  25. cb()
  26. }))
  27. })
  28. test('clone sync', function (t) {
  29. t.plan(4)
  30. var read = false
  31. var source = from(function (size, next) {
  32. if (read) {
  33. this.push(null)
  34. } else {
  35. read = true
  36. this.push('hello world')
  37. }
  38. next()
  39. })
  40. var instance = cloneable(source)
  41. t.notOk(read, 'stream not started')
  42. var cloned = instance.clone()
  43. t.notOk(read, 'stream not started')
  44. instance.pipe(sink(function (chunk, enc, cb) {
  45. t.equal(chunk.toString(), 'hello world', 'chunk matches')
  46. cb()
  47. }))
  48. cloned.pipe(sink(function (chunk, enc, cb) {
  49. t.equal(chunk.toString(), 'hello world', 'chunk matches')
  50. cb()
  51. }))
  52. })
  53. test('clone async', function (t) {
  54. t.plan(4)
  55. var read = false
  56. var source = from(function (size, next) {
  57. if (read) {
  58. this.push(null)
  59. } else {
  60. read = true
  61. this.push('hello world')
  62. }
  63. next()
  64. })
  65. var instance = cloneable(source)
  66. t.notOk(read, 'stream not started')
  67. var cloned = instance.clone()
  68. t.notOk(read, 'stream not started')
  69. instance.pipe(sink(function (chunk, enc, cb) {
  70. t.equal(chunk.toString(), 'hello world', 'chunk matches')
  71. cb()
  72. }))
  73. setImmediate(function () {
  74. cloned.pipe(sink(function (chunk, enc, cb) {
  75. t.equal(chunk.toString(), 'hello world', 'chunk matches')
  76. cb()
  77. }))
  78. })
  79. })
  80. test('basic passthrough in obj mode', function (t) {
  81. t.plan(2)
  82. var read = false
  83. var source = from.obj(function (size, next) {
  84. if (read) {
  85. return this.push(null)
  86. } else {
  87. read = true
  88. this.push({ hello: 'world' })
  89. }
  90. next()
  91. })
  92. var instance = cloneable(source)
  93. t.notOk(read, 'stream not started')
  94. instance.pipe(sink.obj(function (chunk, enc, cb) {
  95. t.deepEqual(chunk, { hello: 'world' }, 'chunk matches')
  96. cb()
  97. }))
  98. })
  99. test('multiple clone in object mode', function (t) {
  100. t.plan(4)
  101. var read = false
  102. var source = from.obj(function (size, next) {
  103. if (read) {
  104. return this.push(null)
  105. } else {
  106. read = true
  107. this.push({ hello: 'world' })
  108. }
  109. next()
  110. })
  111. var instance = cloneable(source)
  112. t.notOk(read, 'stream not started')
  113. var cloned = instance.clone()
  114. t.notOk(read, 'stream not started')
  115. instance.pipe(sink.obj(function (chunk, enc, cb) {
  116. t.deepEqual(chunk, { hello: 'world' }, 'chunk matches')
  117. cb()
  118. }))
  119. setImmediate(function () {
  120. cloned.pipe(sink.obj(function (chunk, enc, cb) {
  121. t.deepEqual(chunk, { hello: 'world' }, 'chunk matches')
  122. cb()
  123. }))
  124. })
  125. })
  126. test('basic passthrough with data event', function (t) {
  127. t.plan(2)
  128. var read = false
  129. var source = from(function (size, next) {
  130. if (read) {
  131. this.push(null)
  132. } else {
  133. read = true
  134. this.push('hello world')
  135. }
  136. next()
  137. })
  138. var instance = cloneable(source)
  139. t.notOk(read, 'stream not started')
  140. var data = ''
  141. instance.on('data', function (chunk) {
  142. data += chunk.toString()
  143. })
  144. instance.on('end', function () {
  145. t.equal(data, 'hello world', 'chunk matches')
  146. })
  147. })
  148. test('basic passthrough with data event on clone', function (t) {
  149. t.plan(3)
  150. var read = false
  151. var source = from(function (size, next) {
  152. if (read) {
  153. this.push(null)
  154. } else {
  155. read = true
  156. this.push('hello world')
  157. }
  158. next()
  159. })
  160. var instance = cloneable(source)
  161. var cloned = instance.clone()
  162. t.notOk(read, 'stream not started')
  163. var data = ''
  164. cloned.on('data', function (chunk) {
  165. data += chunk.toString()
  166. })
  167. cloned.on('end', function () {
  168. t.equal(data, 'hello world', 'chunk matches in clone')
  169. })
  170. instance.pipe(sink(function (chunk, enc, cb) {
  171. t.equal(chunk.toString(), 'hello world', 'chunk matches in instance')
  172. cb()
  173. }))
  174. })
  175. test('errors if cloned after start', function (t) {
  176. t.plan(2)
  177. var source = from(function (size, next) {
  178. this.push('hello world')
  179. this.push(null)
  180. next()
  181. })
  182. var instance = cloneable(source)
  183. instance.pipe(sink(function (chunk, enc, cb) {
  184. t.equal(chunk.toString(), 'hello world', 'chunk matches')
  185. t.throws(function () {
  186. instance.clone()
  187. }, 'throws if cloned after start')
  188. cb()
  189. }))
  190. })
  191. test('basic passthrough with readable event', function (t) {
  192. t.plan(2)
  193. var read = false
  194. var source = from(function (size, next) {
  195. if (read) {
  196. this.push(null)
  197. } else {
  198. read = true
  199. this.push('hello world')
  200. }
  201. next()
  202. })
  203. var instance = cloneable(source)
  204. t.notOk(read, 'stream not started')
  205. var data = ''
  206. instance.on('readable', function () {
  207. var chunk
  208. while ((chunk = this.read()) !== null) {
  209. data += chunk.toString()
  210. }
  211. })
  212. instance.on('end', function () {
  213. t.equal(data, 'hello world', 'chunk matches')
  214. })
  215. })
  216. test('basic passthrough with readable event on clone', function (t) {
  217. t.plan(3)
  218. var read = false
  219. var source = from(function (size, next) {
  220. if (read) {
  221. this.push(null)
  222. } else {
  223. read = true
  224. this.push('hello world')
  225. }
  226. next()
  227. })
  228. var instance = cloneable(source)
  229. var cloned = instance.clone()
  230. t.notOk(read, 'stream not started')
  231. var data = ''
  232. cloned.on('readable', function () {
  233. var chunk
  234. while ((chunk = this.read()) !== null) {
  235. data += chunk.toString()
  236. }
  237. })
  238. cloned.on('end', function () {
  239. t.equal(data, 'hello world', 'chunk matches in clone')
  240. })
  241. instance.pipe(sink(function (chunk, enc, cb) {
  242. t.equal(chunk.toString(), 'hello world', 'chunk matches in instance')
  243. cb()
  244. }))
  245. })
  246. test('source error destroys all', function (t) {
  247. t.plan(3)
  248. var source = from()
  249. var instance = cloneable(source)
  250. var clone = instance.clone()
  251. source.on('error', function (err) {
  252. t.ok(err, 'source errors')
  253. instance.on('error', function (err2) {
  254. t.ok(err === err2, 'instance receives same error')
  255. })
  256. clone.on('error', function (err3) {
  257. t.ok(err === err3, 'clone receives same error')
  258. })
  259. })
  260. source.emit('error', new Error())
  261. })
  262. test('source destroy destroys all', function (t) {
  263. t.plan(2)
  264. var source = from()
  265. var instance = cloneable(source)
  266. var clone = instance.clone()
  267. instance.on('end', function () {
  268. t.pass('instance has ended')
  269. })
  270. clone.on('end', function () {
  271. t.pass('clone has ended')
  272. })
  273. clone.resume()
  274. instance.resume()
  275. source.destroy()
  276. })
  277. test('instance error destroys all but the source', function (t) {
  278. t.plan(2)
  279. var source = from()
  280. var instance = cloneable(source)
  281. var clone = instance.clone()
  282. source.on('close', function () {
  283. t.fail('source should not be closed')
  284. })
  285. instance.on('error', function (err) {
  286. t.is(err.message, 'beep', 'instance errors')
  287. })
  288. instance.on('close', function () {
  289. t.fail('close should not be emitted')
  290. })
  291. clone.on('error', function (err) {
  292. t.is(err.message, 'beep', 'instance errors')
  293. })
  294. clone.on('close', function () {
  295. t.fail('close should not be emitted')
  296. })
  297. instance.destroy(new Error('beep'))
  298. })
  299. test('instance destroy destroys all but the source', function (t) {
  300. t.plan(2)
  301. var source = from()
  302. var instance = cloneable(source)
  303. var clone = instance.clone()
  304. source.on('close', function () {
  305. t.fail('source should not be closed')
  306. })
  307. instance.on('end', function () {
  308. t.pass('instance has ended')
  309. })
  310. clone.on('end', function () {
  311. t.pass('clone has ended')
  312. })
  313. instance.resume()
  314. clone.resume()
  315. instance.destroy()
  316. })
  317. test('clone destroy does not affect other clones, cloneable or source', function (t) {
  318. t.plan(1)
  319. var source = from()
  320. var instance = cloneable(source)
  321. var clone = instance.clone()
  322. var other = instance.clone()
  323. source.on('close', function () {
  324. t.fail('source should not be closed')
  325. })
  326. instance.on('close', function () {
  327. t.fail('instance should not be closed')
  328. })
  329. other.on('close', function () {
  330. t.fail('other clone should not be closed')
  331. })
  332. clone.on('close', function () {
  333. t.pass('clone is closed')
  334. })
  335. clone.destroy()
  336. })
  337. test('clone remains readable if other is destroyed', function (t) {
  338. t.plan(3)
  339. var read = false
  340. var source = from(function (size, next) {
  341. if (read) {
  342. this.push(null)
  343. } else {
  344. read = true
  345. this.push('hello')
  346. }
  347. next()
  348. })
  349. var instance = cloneable(source)
  350. var clone = instance.clone()
  351. var other = instance.clone()
  352. instance.pipe(sink.obj(function (chunk, enc, cb) {
  353. t.deepEqual(chunk.toString(), 'hello', 'instance chunk matches')
  354. cb()
  355. }))
  356. clone.pipe(sink.obj(function (chunk, enc, cb) {
  357. t.deepEqual(chunk.toString(), 'hello', 'clone chunk matches')
  358. cb()
  359. }))
  360. clone.on('close', function () {
  361. t.fail('clone should not be closed')
  362. })
  363. instance.on('close', function () {
  364. t.fail('instance should not be closed')
  365. })
  366. other.on('close', function () {
  367. t.pass('other is closed')
  368. })
  369. other.destroy()
  370. })
  371. test('clone of clone', function (t) {
  372. t.plan(6)
  373. var read = false
  374. var source = from(function (size, next) {
  375. if (read) {
  376. this.push(null)
  377. } else {
  378. read = true
  379. this.push('hello world')
  380. }
  381. next()
  382. })
  383. var instance = cloneable(source)
  384. t.notOk(read, 'stream not started')
  385. var cloned = instance.clone()
  386. t.notOk(read, 'stream not started')
  387. var replica = cloned.clone()
  388. t.notOk(read, 'stream not started')
  389. instance.pipe(sink(function (chunk, enc, cb) {
  390. t.equal(chunk.toString(), 'hello world', 'chunk matches')
  391. cb()
  392. }))
  393. cloned.pipe(sink(function (chunk, enc, cb) {
  394. t.equal(chunk.toString(), 'hello world', 'chunk matches')
  395. cb()
  396. }))
  397. replica.pipe(sink(function (chunk, enc, cb) {
  398. t.equal(chunk.toString(), 'hello world', 'chunk matches')
  399. cb()
  400. }))
  401. })
  402. test('from vinyl', function (t) {
  403. t.plan(3)
  404. var source = from(['wa', 'dup'])
  405. var instance = cloneable(source)
  406. var clone = instance.clone()
  407. var data = ''
  408. var data2 = ''
  409. var ends = 2
  410. function latch () {
  411. if (--ends === 0) {
  412. t.equal(data, data2)
  413. }
  414. }
  415. instance.on('data', function (chunk) {
  416. data += chunk.toString()
  417. })
  418. process.nextTick(function () {
  419. t.equal('', data, 'nothing was written yet')
  420. t.equal('', data2, 'nothing was written yet')
  421. clone.on('data', function (chunk) {
  422. data2 += chunk.toString()
  423. })
  424. })
  425. instance.on('end', latch)
  426. clone.on('end', latch)
  427. })
  428. test('waits till all are flowing', function (t) {
  429. t.plan(1)
  430. var source = from(['wa', 'dup'])
  431. var instance = cloneable(source)
  432. // we create a clone
  433. instance.clone()
  434. instance.on('data', function (chunk) {
  435. t.fail('this should never happen')
  436. })
  437. process.nextTick(function () {
  438. t.pass('wait till nextTick')
  439. })
  440. })
  441. test('isCloneable', function (t) {
  442. t.plan(4)
  443. var source = from(['hello', ' ', 'world'])
  444. t.notOk(cloneable.isCloneable(source), 'a generic readable is not cloneable')
  445. var instance = cloneable(source)
  446. t.ok(cloneable.isCloneable(instance), 'a cloneable is cloneable')
  447. var clone = instance.clone()
  448. t.ok(cloneable.isCloneable(clone), 'a clone is cloneable')
  449. var cloneClone = clone.clone()
  450. t.ok(cloneable.isCloneable(cloneClone), 'a clone of a clone is cloneable')
  451. })
  452. test('emits finish', function (t) {
  453. var chunks = ['a', 'b', 'c', 'd', null]
  454. var e1 = ['a', 'b', 'c', 'd']
  455. var e2 = ['a', 'b', 'c', 'd']
  456. t.plan(2 + e1.length + e2.length)
  457. var source = from(function (size, next) {
  458. setImmediate(next, null, chunks.shift())
  459. })
  460. var instance = cloneable(source)
  461. var clone = instance.clone()
  462. clone.on('finish', t.pass.bind(null, 'clone emits finish'))
  463. instance.on('finish', t.pass.bind(null, 'main emits finish'))
  464. instance.pipe(sink(function (chunk, enc, cb) {
  465. t.equal(chunk.toString(), e1.shift(), 'chunk matches')
  466. cb()
  467. }))
  468. clone.on('data', function (chunk) {
  469. t.equal(chunk.toString(), e2.shift(), 'chunk matches')
  470. })
  471. })
  472. test('clone async w resume', function (t) {
  473. t.plan(4)
  474. var read = false
  475. var source = from(function (size, next) {
  476. if (read) {
  477. this.push(null)
  478. } else {
  479. read = true
  480. this.push('hello world')
  481. }
  482. next()
  483. })
  484. var instance = cloneable(source)
  485. t.notOk(read, 'stream not started')
  486. var cloned = instance.clone()
  487. t.notOk(read, 'stream not started')
  488. instance.on('end', t.pass.bind(null, 'end emitted'))
  489. instance.resume()
  490. setImmediate(function () {
  491. cloned.on('end', t.pass.bind(null, 'end emitted'))
  492. cloned.resume()
  493. })
  494. })
  495. test('big file', function (t) {
  496. t.plan(13)
  497. var stream = cloneable(fs.createReadStream(path.join(__dirname, 'big')))
  498. var hash = crypto.createHash('sha1')
  499. hash.setEncoding('hex')
  500. var toCheck
  501. fs.createReadStream(path.join(__dirname, 'big'))
  502. .pipe(hash)
  503. .once('readable', function () {
  504. toCheck = hash.read()
  505. t.ok(toCheck)
  506. })
  507. function pipe (s, num) {
  508. s.on('end', function () {
  509. t.pass('end for ' + num)
  510. })
  511. var dest = path.join(__dirname, 'out')
  512. s.pipe(fs.createWriteStream(dest))
  513. .on('finish', function () {
  514. t.pass('finish for ' + num)
  515. var destHash = crypto.createHash('sha1')
  516. destHash.setEncoding('hex')
  517. fs.createReadStream(dest)
  518. .pipe(destHash)
  519. .once('readable', function () {
  520. var hash = destHash.read()
  521. t.ok(hash)
  522. t.equal(hash, toCheck)
  523. })
  524. })
  525. }
  526. // Pipe in another event loop tick <-- this one finished only, it's the original cloneable.
  527. setImmediate(pipe.bind(null, stream, 1))
  528. // Pipe in the same event loop tick
  529. pipe(stream.clone(), 0)
  530. // Pipe a long time after
  531. setTimeout(pipe.bind(null, stream.clone(), 2), 1000)
  532. })