eachAsync.js 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. 'use strict';
  2. /*!
  3. * Module dependencies.
  4. */
  5. const promiseOrCallback = require('../promiseOrCallback');
  6. /**
  7. * Execute `fn` for every document in the cursor. If `fn` returns a promise,
  8. * will wait for the promise to resolve before iterating on to the next one.
  9. * Returns a promise that resolves when done.
  10. *
  11. * @param {Function} next the thunk to call to get the next document
  12. * @param {Function} fn
  13. * @param {Object} options
  14. * @param {Function} [callback] executed when all docs have been processed
  15. * @return {Promise}
  16. * @api public
  17. * @method eachAsync
  18. */
  19. module.exports = function eachAsync(next, fn, options, callback) {
  20. const parallel = options.parallel || 1;
  21. const batchSize = options.batchSize;
  22. const enqueue = asyncQueue();
  23. return promiseOrCallback(callback, cb => {
  24. if (batchSize != null) {
  25. if (typeof batchSize !== 'number') {
  26. throw new TypeError('batchSize must be a number');
  27. }
  28. if (batchSize < 1) {
  29. throw new TypeError('batchSize must be at least 1');
  30. }
  31. if (batchSize !== Math.floor(batchSize)) {
  32. throw new TypeError('batchSize must be a positive integer');
  33. }
  34. }
  35. iterate(cb);
  36. });
  37. function iterate(finalCallback) {
  38. let drained = false;
  39. let handleResultsInProgress = 0;
  40. let currentDocumentIndex = 0;
  41. let documentsBatch = [];
  42. let error = null;
  43. for (let i = 0; i < parallel; ++i) {
  44. enqueue(fetch);
  45. }
  46. function fetch(done) {
  47. if (drained || error) {
  48. return done();
  49. }
  50. next(function(err, doc) {
  51. if (drained || error != null) {
  52. return done();
  53. }
  54. if (err != null) {
  55. error = err;
  56. finalCallback(err);
  57. return done();
  58. }
  59. if (doc == null) {
  60. drained = true;
  61. if (handleResultsInProgress <= 0) {
  62. finalCallback(null);
  63. } else if (batchSize != null && documentsBatch.length) {
  64. handleNextResult(documentsBatch, currentDocumentIndex++, handleNextResultCallBack);
  65. }
  66. return done();
  67. }
  68. ++handleResultsInProgress;
  69. // Kick off the subsequent `next()` before handling the result, but
  70. // make sure we know that we still have a result to handle re: #8422
  71. process.nextTick(() => done());
  72. if (batchSize != null) {
  73. documentsBatch.push(doc);
  74. }
  75. // If the current documents size is less than the provided patch size don't process the documents yet
  76. if (batchSize != null && documentsBatch.length !== batchSize) {
  77. setTimeout(() => enqueue(fetch), 0);
  78. return;
  79. }
  80. const docsToProcess = batchSize != null ? documentsBatch : doc;
  81. function handleNextResultCallBack(err) {
  82. if (batchSize != null) {
  83. handleResultsInProgress -= documentsBatch.length;
  84. documentsBatch = [];
  85. } else {
  86. --handleResultsInProgress;
  87. }
  88. if (err != null) {
  89. error = err;
  90. return finalCallback(err);
  91. }
  92. if (drained && handleResultsInProgress <= 0) {
  93. return finalCallback(null);
  94. }
  95. setTimeout(() => enqueue(fetch), 0);
  96. }
  97. handleNextResult(docsToProcess, currentDocumentIndex++, handleNextResultCallBack);
  98. });
  99. }
  100. }
  101. function handleNextResult(doc, i, callback) {
  102. const promise = fn(doc, i);
  103. if (promise && typeof promise.then === 'function') {
  104. promise.then(
  105. function() { callback(null); },
  106. function(error) { callback(error || new Error('`eachAsync()` promise rejected without error')); });
  107. } else {
  108. callback(null);
  109. }
  110. }
  111. };
  112. // `next()` can only execute one at a time, so make sure we always execute
  113. // `next()` in series, while still allowing multiple `fn()` instances to run
  114. // in parallel.
  115. function asyncQueue() {
  116. const _queue = [];
  117. let inProgress = null;
  118. let id = 0;
  119. return function enqueue(fn) {
  120. if (_queue.length === 0 && inProgress == null) {
  121. inProgress = id++;
  122. return fn(_step);
  123. }
  124. _queue.push(fn);
  125. };
  126. function _step() {
  127. inProgress = null;
  128. if (_queue.length > 0) {
  129. inProgress = id++;
  130. const fn = _queue.shift();
  131. fn(_step);
  132. }
  133. }
  134. }