eachAsync.js 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. 'use strict';
  2. /*!
  3. * Module dependencies.
  4. */
  5. const promiseOrCallback = require('../promiseOrCallback');
  6. /**
  7. * Execute `fn` for every document in the cursor. If `fn` returns a promise,
  8. * will wait for the promise to resolve before iterating on to the next one.
  9. * Returns a promise that resolves when done.
  10. *
  11. * @param {Function} next the thunk to call to get the next document
  12. * @param {Function} fn
  13. * @param {Object} options
  14. * @param {Function} [callback] executed when all docs have been processed
  15. * @return {Promise}
  16. * @api public
  17. * @method eachAsync
  18. */
  19. module.exports = function eachAsync(next, fn, options, callback) {
  20. const parallel = options.parallel || 1;
  21. const enqueue = asyncQueue();
  22. return promiseOrCallback(callback, cb => {
  23. iterate(cb);
  24. });
  25. function iterate(finalCallback) {
  26. let drained = false;
  27. let handleResultsInProgress = 0;
  28. let currentDocumentIndex = 0;
  29. let error = null;
  30. for (let i = 0; i < parallel; ++i) {
  31. enqueue(fetch);
  32. }
  33. function fetch(done) {
  34. if (drained || error) {
  35. return done();
  36. }
  37. next(function(err, doc) {
  38. if (drained || error != null) {
  39. return done();
  40. }
  41. if (err != null) {
  42. error = err;
  43. finalCallback(err);
  44. return done();
  45. }
  46. if (doc == null) {
  47. drained = true;
  48. if (handleResultsInProgress <= 0) {
  49. finalCallback(null);
  50. }
  51. return done();
  52. }
  53. ++handleResultsInProgress;
  54. // Kick off the subsequent `next()` before handling the result, but
  55. // make sure we know that we still have a result to handle re: #8422
  56. process.nextTick(() => done());
  57. handleNextResult(doc, currentDocumentIndex++, function(err) {
  58. --handleResultsInProgress;
  59. if (err != null) {
  60. error = err;
  61. return finalCallback(err);
  62. }
  63. if (drained && handleResultsInProgress <= 0) {
  64. return finalCallback(null);
  65. }
  66. setTimeout(() => enqueue(fetch), 0);
  67. });
  68. });
  69. }
  70. }
  71. function handleNextResult(doc, i, callback) {
  72. const promise = fn(doc, i);
  73. if (promise && typeof promise.then === 'function') {
  74. promise.then(
  75. function() { callback(null); },
  76. function(error) { callback(error || new Error('`eachAsync()` promise rejected without error')); });
  77. } else {
  78. callback(null);
  79. }
  80. }
  81. };
  82. // `next()` can only execute one at a time, so make sure we always execute
  83. // `next()` in series, while still allowing multiple `fn()` instances to run
  84. // in parallel.
  85. function asyncQueue() {
  86. const _queue = [];
  87. let inProgress = null;
  88. let id = 0;
  89. return function enqueue(fn) {
  90. if (_queue.length === 0 && inProgress == null) {
  91. inProgress = id++;
  92. return fn(_step);
  93. }
  94. _queue.push(fn);
  95. };
  96. function _step() {
  97. inProgress = null;
  98. if (_queue.length > 0) {
  99. inProgress = id++;
  100. const fn = _queue.shift();
  101. fn(_step);
  102. }
  103. }
  104. }