queue.js 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. 'use strict'
  2. var reusify = require('reusify')
  3. function fastqueue (context, worker, concurrency) {
  4. if (typeof context === 'function') {
  5. concurrency = worker
  6. worker = context
  7. context = null
  8. }
  9. if (concurrency < 1) {
  10. throw new Error('fastqueue concurrency must be greater than 1')
  11. }
  12. var cache = reusify(Task)
  13. var queueHead = null
  14. var queueTail = null
  15. var _running = 0
  16. var errorHandler = null
  17. var self = {
  18. push: push,
  19. drain: noop,
  20. saturated: noop,
  21. pause: pause,
  22. paused: false,
  23. concurrency: concurrency,
  24. running: running,
  25. resume: resume,
  26. idle: idle,
  27. length: length,
  28. getQueue: getQueue,
  29. unshift: unshift,
  30. empty: noop,
  31. kill: kill,
  32. killAndDrain: killAndDrain,
  33. error: error
  34. }
  35. return self
  36. function running () {
  37. return _running
  38. }
  39. function pause () {
  40. self.paused = true
  41. }
  42. function length () {
  43. var current = queueHead
  44. var counter = 0
  45. while (current) {
  46. current = current.next
  47. counter++
  48. }
  49. return counter
  50. }
  51. function getQueue () {
  52. var current = queueHead
  53. var tasks = []
  54. while (current) {
  55. tasks.push(current.value)
  56. current = current.next
  57. }
  58. return tasks
  59. }
  60. function resume () {
  61. if (!self.paused) return
  62. self.paused = false
  63. for (var i = 0; i < self.concurrency; i++) {
  64. _running++
  65. release()
  66. }
  67. }
  68. function idle () {
  69. return _running === 0 && self.length() === 0
  70. }
  71. function push (value, done) {
  72. var current = cache.get()
  73. current.context = context
  74. current.release = release
  75. current.value = value
  76. current.callback = done || noop
  77. current.errorHandler = errorHandler
  78. if (_running === self.concurrency || self.paused) {
  79. if (queueTail) {
  80. queueTail.next = current
  81. queueTail = current
  82. } else {
  83. queueHead = current
  84. queueTail = current
  85. self.saturated()
  86. }
  87. } else {
  88. _running++
  89. worker.call(context, current.value, current.worked)
  90. }
  91. }
  92. function unshift (value, done) {
  93. var current = cache.get()
  94. current.context = context
  95. current.release = release
  96. current.value = value
  97. current.callback = done || noop
  98. if (_running === self.concurrency || self.paused) {
  99. if (queueHead) {
  100. current.next = queueHead
  101. queueHead = current
  102. } else {
  103. queueHead = current
  104. queueTail = current
  105. self.saturated()
  106. }
  107. } else {
  108. _running++
  109. worker.call(context, current.value, current.worked)
  110. }
  111. }
  112. function release (holder) {
  113. if (holder) {
  114. cache.release(holder)
  115. }
  116. var next = queueHead
  117. if (next) {
  118. if (!self.paused) {
  119. if (queueTail === queueHead) {
  120. queueTail = null
  121. }
  122. queueHead = next.next
  123. next.next = null
  124. worker.call(context, next.value, next.worked)
  125. if (queueTail === null) {
  126. self.empty()
  127. }
  128. } else {
  129. _running--
  130. }
  131. } else if (--_running === 0) {
  132. self.drain()
  133. }
  134. }
  135. function kill () {
  136. queueHead = null
  137. queueTail = null
  138. self.drain = noop
  139. }
  140. function killAndDrain () {
  141. queueHead = null
  142. queueTail = null
  143. self.drain()
  144. self.drain = noop
  145. }
  146. function error (handler) {
  147. errorHandler = handler
  148. }
  149. }
  150. function noop () {}
  151. function Task () {
  152. this.value = null
  153. this.callback = noop
  154. this.next = null
  155. this.release = noop
  156. this.context = null
  157. this.errorHandler = null
  158. var self = this
  159. this.worked = function worked (err, result) {
  160. var callback = self.callback
  161. var errorHandler = self.errorHandler
  162. var val = self.value
  163. self.value = null
  164. self.callback = noop
  165. if (self.errorHandler) {
  166. errorHandler(err, val)
  167. }
  168. callback.call(self.context, err, result)
  169. self.release(self)
  170. }
  171. }
  172. function queueAsPromised (context, worker, concurrency) {
  173. if (typeof context === 'function') {
  174. concurrency = worker
  175. worker = context
  176. context = null
  177. }
  178. function asyncWrapper (arg, cb) {
  179. worker.call(this, arg)
  180. .then(function (res) {
  181. cb(null, res)
  182. }, cb)
  183. }
  184. var queue = fastqueue(context, asyncWrapper, concurrency)
  185. var pushCb = queue.push
  186. var unshiftCb = queue.unshift
  187. queue.push = push
  188. queue.unshift = unshift
  189. return queue
  190. function push (value) {
  191. return new Promise(function (resolve, reject) {
  192. pushCb(value, function (err, result) {
  193. if (err) {
  194. reject(err)
  195. return
  196. }
  197. resolve(result)
  198. })
  199. })
  200. }
  201. function unshift (value) {
  202. return new Promise(function (resolve, reject) {
  203. unshiftCb(value, function (err, result) {
  204. if (err) {
  205. reject(err)
  206. return
  207. }
  208. resolve(result)
  209. })
  210. })
  211. }
  212. }
  213. module.exports = fastqueue
  214. module.exports.promise = queueAsPromised