eachAsync.js 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. 'use strict';
  2. /*!
  3. * Module dependencies.
  4. */
  5. const EachAsyncMultiError = require('../../error/eachAsyncMultiError');
  6. const immediate = require('../immediate');
  7. const promiseOrCallback = require('../promiseOrCallback');
  8. /**
  9. * Execute `fn` for every document in the cursor. If `fn` returns a promise,
  10. * will wait for the promise to resolve before iterating on to the next one.
  11. * Returns a promise that resolves when done.
  12. *
  13. * @param {Function} next the thunk to call to get the next document
  14. * @param {Function} fn
  15. * @param {Object} options
  16. * @param {Function} [callback] executed when all docs have been processed
  17. * @return {Promise}
  18. * @api public
  19. * @method eachAsync
  20. */
  21. module.exports = function eachAsync(next, fn, options, callback) {
  22. const parallel = options.parallel || 1;
  23. const batchSize = options.batchSize;
  24. const continueOnError = options.continueOnError;
  25. const aggregatedErrors = [];
  26. const enqueue = asyncQueue();
  27. return promiseOrCallback(callback, cb => {
  28. if (batchSize != null) {
  29. if (typeof batchSize !== 'number') {
  30. throw new TypeError('batchSize must be a number');
  31. } else if (!Number.isInteger(batchSize)) {
  32. throw new TypeError('batchSize must be an integer');
  33. } else if (batchSize < 1) {
  34. throw new TypeError('batchSize must be at least 1');
  35. }
  36. }
  37. iterate(cb);
  38. });
  39. function iterate(finalCallback) {
  40. let drained = false;
  41. let handleResultsInProgress = 0;
  42. let currentDocumentIndex = 0;
  43. let documentsBatch = [];
  44. let error = null;
  45. for (let i = 0; i < parallel; ++i) {
  46. enqueue(fetch);
  47. }
  48. function fetch(done) {
  49. if (drained || error) {
  50. return done();
  51. }
  52. next(function(err, doc) {
  53. if (drained || error != null) {
  54. return done();
  55. }
  56. if (err != null) {
  57. if (continueOnError) {
  58. aggregatedErrors.push(err);
  59. } else {
  60. error = err;
  61. finalCallback(err);
  62. return done();
  63. }
  64. }
  65. if (doc == null) {
  66. drained = true;
  67. if (handleResultsInProgress <= 0) {
  68. const finalErr = continueOnError ?
  69. createEachAsyncMultiError(aggregatedErrors) :
  70. error;
  71. finalCallback(finalErr);
  72. } else if (batchSize && documentsBatch.length) {
  73. handleNextResult(documentsBatch, currentDocumentIndex++, handleNextResultCallBack);
  74. }
  75. return done();
  76. }
  77. ++handleResultsInProgress;
  78. // Kick off the subsequent `next()` before handling the result, but
  79. // make sure we know that we still have a result to handle re: #8422
  80. immediate(() => done());
  81. if (batchSize) {
  82. documentsBatch.push(doc);
  83. }
  84. // If the current documents size is less than the provided patch size don't process the documents yet
  85. if (batchSize && documentsBatch.length !== batchSize) {
  86. immediate(() => enqueue(fetch));
  87. return;
  88. }
  89. const docsToProcess = batchSize ? documentsBatch : doc;
  90. function handleNextResultCallBack(err) {
  91. if (batchSize) {
  92. handleResultsInProgress -= documentsBatch.length;
  93. documentsBatch = [];
  94. } else {
  95. --handleResultsInProgress;
  96. }
  97. if (err != null) {
  98. if (continueOnError) {
  99. aggregatedErrors.push(err);
  100. } else {
  101. error = err;
  102. return finalCallback(err);
  103. }
  104. }
  105. if (drained && handleResultsInProgress <= 0) {
  106. const finalErr = continueOnError ?
  107. createEachAsyncMultiError(aggregatedErrors) :
  108. error;
  109. return finalCallback(finalErr);
  110. }
  111. immediate(() => enqueue(fetch));
  112. }
  113. handleNextResult(docsToProcess, currentDocumentIndex++, handleNextResultCallBack);
  114. });
  115. }
  116. }
  117. function handleNextResult(doc, i, callback) {
  118. let maybePromise;
  119. try {
  120. maybePromise = fn(doc, i);
  121. } catch (err) {
  122. return callback(err);
  123. }
  124. if (maybePromise && typeof maybePromise.then === 'function') {
  125. maybePromise.then(
  126. function() { callback(null); },
  127. function(error) {
  128. callback(error || new Error('`eachAsync()` promise rejected without error'));
  129. });
  130. } else {
  131. callback(null);
  132. }
  133. }
  134. };
  135. // `next()` can only execute one at a time, so make sure we always execute
  136. // `next()` in series, while still allowing multiple `fn()` instances to run
  137. // in parallel.
  138. function asyncQueue() {
  139. const _queue = [];
  140. let inProgress = null;
  141. let id = 0;
  142. return function enqueue(fn) {
  143. if (
  144. inProgress === null &&
  145. _queue.length === 0
  146. ) {
  147. inProgress = id++;
  148. return fn(_step);
  149. }
  150. _queue.push(fn);
  151. };
  152. function _step() {
  153. if (_queue.length !== 0) {
  154. inProgress = id++;
  155. const fn = _queue.shift();
  156. fn(_step);
  157. } else {
  158. inProgress = null;
  159. }
  160. }
  161. }
  162. function createEachAsyncMultiError(aggregatedErrors) {
  163. if (aggregatedErrors.length === 0) {
  164. return null;
  165. }
  166. return new EachAsyncMultiError(aggregatedErrors);
  167. }