test.js 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564
  1. 'use strict'
  2. var test = require('tape')
  3. var buildQueue = require('../')
  4. test('concurrency', function (t) {
  5. t.plan(2)
  6. t.throws(buildQueue.bind(null, worker, 0))
  7. t.doesNotThrow(buildQueue.bind(null, worker, 1))
  8. function worker (arg, cb) {
  9. cb(null, true)
  10. }
  11. })
  12. test('worker execution', function (t) {
  13. t.plan(3)
  14. var queue = buildQueue(worker, 1)
  15. queue.push(42, function (err, result) {
  16. t.error(err, 'no error')
  17. t.equal(result, true, 'result matches')
  18. })
  19. function worker (arg, cb) {
  20. t.equal(arg, 42)
  21. cb(null, true)
  22. }
  23. })
  24. test('limit', function (t) {
  25. t.plan(4)
  26. var expected = [10, 0]
  27. var queue = buildQueue(worker, 1)
  28. queue.push(10, result)
  29. queue.push(0, result)
  30. function result (err, arg) {
  31. t.error(err, 'no error')
  32. t.equal(arg, expected.shift(), 'the result matches')
  33. }
  34. function worker (arg, cb) {
  35. setTimeout(cb, arg, null, arg)
  36. }
  37. })
  38. test('multiple executions', function (t) {
  39. t.plan(15)
  40. var queue = buildQueue(worker, 1)
  41. var toExec = [1, 2, 3, 4, 5]
  42. var count = 0
  43. toExec.forEach(function (task) {
  44. queue.push(task, done)
  45. })
  46. function done (err, result) {
  47. t.error(err, 'no error')
  48. t.equal(result, toExec[count - 1], 'the result matches')
  49. }
  50. function worker (arg, cb) {
  51. t.equal(arg, toExec[count], 'arg matches')
  52. count++
  53. setImmediate(cb, null, arg)
  54. }
  55. })
  56. test('multiple executions, one after another', function (t) {
  57. t.plan(15)
  58. var queue = buildQueue(worker, 1)
  59. var toExec = [1, 2, 3, 4, 5]
  60. var count = 0
  61. queue.push(toExec[0], done)
  62. function done (err, result) {
  63. t.error(err, 'no error')
  64. t.equal(result, toExec[count - 1], 'the result matches')
  65. if (count < toExec.length) {
  66. queue.push(toExec[count], done)
  67. }
  68. }
  69. function worker (arg, cb) {
  70. t.equal(arg, toExec[count], 'arg matches')
  71. count++
  72. setImmediate(cb, null, arg)
  73. }
  74. })
  75. test('set this', function (t) {
  76. t.plan(3)
  77. var that = {}
  78. var queue = buildQueue(that, worker, 1)
  79. queue.push(42, function (err, result) {
  80. t.error(err, 'no error')
  81. t.equal(this, that, 'this matches')
  82. })
  83. function worker (arg, cb) {
  84. t.equal(this, that, 'this matches')
  85. cb(null, true)
  86. }
  87. })
  88. test('drain', function (t) {
  89. t.plan(4)
  90. var queue = buildQueue(worker, 1)
  91. var worked = false
  92. queue.push(42, function (err, result) {
  93. t.error(err, 'no error')
  94. t.equal(result, true, 'result matches')
  95. })
  96. queue.drain = function () {
  97. t.equal(true, worked, 'drained')
  98. }
  99. function worker (arg, cb) {
  100. t.equal(arg, 42)
  101. worked = true
  102. setImmediate(cb, null, true)
  103. }
  104. })
  105. test('pause && resume', function (t) {
  106. t.plan(7)
  107. var queue = buildQueue(worker, 1)
  108. var worked = false
  109. t.notOk(queue.paused, 'it should not be paused')
  110. queue.pause()
  111. queue.push(42, function (err, result) {
  112. t.error(err, 'no error')
  113. t.equal(result, true, 'result matches')
  114. })
  115. t.notOk(worked, 'it should be paused')
  116. t.ok(queue.paused, 'it should be paused')
  117. queue.resume()
  118. queue.resume() // second resume is a no-op
  119. t.notOk(queue.paused, 'it should not be paused')
  120. function worker (arg, cb) {
  121. t.equal(arg, 42)
  122. worked = true
  123. cb(null, true)
  124. }
  125. })
  126. test('pause in flight && resume', function (t) {
  127. t.plan(9)
  128. var queue = buildQueue(worker, 1)
  129. var expected = [42, 24]
  130. t.notOk(queue.paused, 'it should not be paused')
  131. queue.push(42, function (err, result) {
  132. t.error(err, 'no error')
  133. t.equal(result, true, 'result matches')
  134. t.ok(queue.paused, 'it should be paused')
  135. process.nextTick(function () { queue.resume() })
  136. })
  137. queue.push(24, function (err, result) {
  138. t.error(err, 'no error')
  139. t.equal(result, true, 'result matches')
  140. t.notOk(queue.paused, 'it should not be paused')
  141. })
  142. queue.pause()
  143. function worker (arg, cb) {
  144. t.equal(arg, expected.shift())
  145. process.nextTick(function () { cb(null, true) })
  146. }
  147. })
  148. test('altering concurrency', function (t) {
  149. t.plan(7)
  150. var queue = buildQueue(worker, 1)
  151. var count = 0
  152. queue.pause()
  153. queue.push(24, workDone)
  154. queue.push(24, workDone)
  155. queue.concurrency = 2
  156. queue.resume()
  157. t.equal(queue.running(), 2, '2 jobs running')
  158. function workDone (err, result) {
  159. t.error(err, 'no error')
  160. t.equal(result, true, 'result matches')
  161. }
  162. function worker (arg, cb) {
  163. t.equal(0, count, 'works in parallel')
  164. setImmediate(function () {
  165. count++
  166. cb(null, true)
  167. })
  168. }
  169. })
  170. test('idle()', function (t) {
  171. t.plan(12)
  172. var queue = buildQueue(worker, 1)
  173. t.ok(queue.idle(), 'queue is idle')
  174. queue.push(42, function (err, result) {
  175. t.error(err, 'no error')
  176. t.equal(result, true, 'result matches')
  177. t.notOk(queue.idle(), 'queue is not idle')
  178. })
  179. queue.push(42, function (err, result) {
  180. t.error(err, 'no error')
  181. t.equal(result, true, 'result matches')
  182. // it will go idle after executing this function
  183. setImmediate(function () {
  184. t.ok(queue.idle(), 'queue is now idle')
  185. })
  186. })
  187. t.notOk(queue.idle(), 'queue is not idle')
  188. function worker (arg, cb) {
  189. t.notOk(queue.idle(), 'queue is not idle')
  190. t.equal(arg, 42)
  191. setImmediate(cb, null, true)
  192. }
  193. })
  194. test('saturated', function (t) {
  195. t.plan(9)
  196. var queue = buildQueue(worker, 1)
  197. var preworked = 0
  198. var worked = 0
  199. queue.saturated = function () {
  200. t.pass('saturated')
  201. t.equal(preworked, 1, 'started 1 task')
  202. t.equal(worked, 0, 'worked zero task')
  203. }
  204. queue.push(42, done)
  205. queue.push(42, done)
  206. function done (err, result) {
  207. t.error(err, 'no error')
  208. t.equal(result, true, 'result matches')
  209. }
  210. function worker (arg, cb) {
  211. t.equal(arg, 42)
  212. preworked++
  213. setImmediate(function () {
  214. worked++
  215. cb(null, true)
  216. })
  217. }
  218. })
  219. test('length', function (t) {
  220. t.plan(7)
  221. var queue = buildQueue(worker, 1)
  222. t.equal(queue.length(), 0, 'nothing waiting')
  223. queue.push(42, done)
  224. t.equal(queue.length(), 0, 'nothing waiting')
  225. queue.push(42, done)
  226. t.equal(queue.length(), 1, 'one task waiting')
  227. queue.push(42, done)
  228. t.equal(queue.length(), 2, 'two tasks waiting')
  229. function done (err, result) {
  230. t.error(err, 'no error')
  231. }
  232. function worker (arg, cb) {
  233. setImmediate(function () {
  234. cb(null, true)
  235. })
  236. }
  237. })
  238. test('getQueue', function (t) {
  239. t.plan(10)
  240. var queue = buildQueue(worker, 1)
  241. t.equal(queue.getQueue().length, 0, 'nothing waiting')
  242. queue.push(42, done)
  243. t.equal(queue.getQueue().length, 0, 'nothing waiting')
  244. queue.push(42, done)
  245. t.equal(queue.getQueue().length, 1, 'one task waiting')
  246. t.equal(queue.getQueue()[0], 42, 'should be equal')
  247. queue.push(43, done)
  248. t.equal(queue.getQueue().length, 2, 'two tasks waiting')
  249. t.equal(queue.getQueue()[0], 42, 'should be equal')
  250. t.equal(queue.getQueue()[1], 43, 'should be equal')
  251. function done (err, result) {
  252. t.error(err, 'no error')
  253. }
  254. function worker (arg, cb) {
  255. setImmediate(function () {
  256. cb(null, true)
  257. })
  258. }
  259. })
  260. test('unshift', function (t) {
  261. t.plan(8)
  262. var queue = buildQueue(worker, 1)
  263. var expected = [1, 2, 3, 4]
  264. queue.push(1, done)
  265. queue.push(4, done)
  266. queue.unshift(3, done)
  267. queue.unshift(2, done)
  268. function done (err, result) {
  269. t.error(err, 'no error')
  270. }
  271. function worker (arg, cb) {
  272. t.equal(expected.shift(), arg, 'tasks come in order')
  273. setImmediate(function () {
  274. cb(null, true)
  275. })
  276. }
  277. })
  278. test('unshift && empty', function (t) {
  279. t.plan(2)
  280. var queue = buildQueue(worker, 1)
  281. var completed = false
  282. queue.pause()
  283. queue.empty = function () {
  284. t.notOk(completed, 'the task has not completed yet')
  285. }
  286. queue.unshift(1, done)
  287. queue.resume()
  288. function done (err, result) {
  289. completed = true
  290. t.error(err, 'no error')
  291. }
  292. function worker (arg, cb) {
  293. setImmediate(function () {
  294. cb(null, true)
  295. })
  296. }
  297. })
  298. test('push && empty', function (t) {
  299. t.plan(2)
  300. var queue = buildQueue(worker, 1)
  301. var completed = false
  302. queue.pause()
  303. queue.empty = function () {
  304. t.notOk(completed, 'the task has not completed yet')
  305. }
  306. queue.push(1, done)
  307. queue.resume()
  308. function done (err, result) {
  309. completed = true
  310. t.error(err, 'no error')
  311. }
  312. function worker (arg, cb) {
  313. setImmediate(function () {
  314. cb(null, true)
  315. })
  316. }
  317. })
  318. test('kill', function (t) {
  319. t.plan(5)
  320. var queue = buildQueue(worker, 1)
  321. var expected = [1]
  322. var predrain = queue.drain
  323. queue.drain = function drain () {
  324. t.fail('drain should never be called')
  325. }
  326. queue.push(1, done)
  327. queue.push(4, done)
  328. queue.unshift(3, done)
  329. queue.unshift(2, done)
  330. queue.kill()
  331. function done (err, result) {
  332. t.error(err, 'no error')
  333. setImmediate(function () {
  334. t.equal(queue.length(), 0, 'no queued tasks')
  335. t.equal(queue.running(), 0, 'no running tasks')
  336. t.equal(queue.drain, predrain, 'drain is back to default')
  337. })
  338. }
  339. function worker (arg, cb) {
  340. t.equal(expected.shift(), arg, 'tasks come in order')
  341. setImmediate(function () {
  342. cb(null, true)
  343. })
  344. }
  345. })
  346. test('killAndDrain', function (t) {
  347. t.plan(6)
  348. var queue = buildQueue(worker, 1)
  349. var expected = [1]
  350. var predrain = queue.drain
  351. queue.drain = function drain () {
  352. t.pass('drain has been called')
  353. }
  354. queue.push(1, done)
  355. queue.push(4, done)
  356. queue.unshift(3, done)
  357. queue.unshift(2, done)
  358. queue.killAndDrain()
  359. function done (err, result) {
  360. t.error(err, 'no error')
  361. setImmediate(function () {
  362. t.equal(queue.length(), 0, 'no queued tasks')
  363. t.equal(queue.running(), 0, 'no running tasks')
  364. t.equal(queue.drain, predrain, 'drain is back to default')
  365. })
  366. }
  367. function worker (arg, cb) {
  368. t.equal(expected.shift(), arg, 'tasks come in order')
  369. setImmediate(function () {
  370. cb(null, true)
  371. })
  372. }
  373. })
  374. test('pause && idle', function (t) {
  375. t.plan(11)
  376. var queue = buildQueue(worker, 1)
  377. var worked = false
  378. t.notOk(queue.paused, 'it should not be paused')
  379. t.ok(queue.idle(), 'should be idle')
  380. queue.pause()
  381. queue.push(42, function (err, result) {
  382. t.error(err, 'no error')
  383. t.equal(result, true, 'result matches')
  384. })
  385. t.notOk(worked, 'it should be paused')
  386. t.ok(queue.paused, 'it should be paused')
  387. t.notOk(queue.idle(), 'should not be idle')
  388. queue.resume()
  389. t.notOk(queue.paused, 'it should not be paused')
  390. t.notOk(queue.idle(), 'it should not be idle')
  391. function worker (arg, cb) {
  392. t.equal(arg, 42)
  393. worked = true
  394. process.nextTick(cb.bind(null, null, true))
  395. process.nextTick(function () {
  396. t.ok(queue.idle(), 'is should be idle')
  397. })
  398. }
  399. })
  400. test('push without cb', function (t) {
  401. t.plan(1)
  402. var queue = buildQueue(worker, 1)
  403. queue.push(42)
  404. function worker (arg, cb) {
  405. t.equal(arg, 42)
  406. cb()
  407. }
  408. })
  409. test('unshift without cb', function (t) {
  410. t.plan(1)
  411. var queue = buildQueue(worker, 1)
  412. queue.unshift(42)
  413. function worker (arg, cb) {
  414. t.equal(arg, 42)
  415. cb()
  416. }
  417. })
  418. test('push with worker throwing error', function (t) {
  419. t.plan(5)
  420. var q = buildQueue(function (task, cb) {
  421. cb(new Error('test error'), null)
  422. }, 1)
  423. q.error(function (err, task) {
  424. t.ok(err instanceof Error, 'global error handler should catch the error')
  425. t.match(err.message, /test error/, 'error message should be "test error"')
  426. t.equal(task, 42, 'The task executed should be passed')
  427. })
  428. q.push(42, function (err) {
  429. t.ok(err instanceof Error, 'push callback should catch the error')
  430. t.match(err.message, /test error/, 'error message should be "test error"')
  431. })
  432. })