eachAsync.js 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. 'use strict';
  2. /*!
  3. * Module dependencies.
  4. */
  5. const immediate = require('../immediate');
  6. const promiseOrCallback = require('../promiseOrCallback');
  7. /**
  8. * Execute `fn` for every document in the cursor. If `fn` returns a promise,
  9. * will wait for the promise to resolve before iterating on to the next one.
  10. * Returns a promise that resolves when done.
  11. *
  12. * @param {Function} next the thunk to call to get the next document
  13. * @param {Function} fn
  14. * @param {Object} options
  15. * @param {Function} [callback] executed when all docs have been processed
  16. * @return {Promise}
  17. * @api public
  18. * @method eachAsync
  19. */
  20. module.exports = function eachAsync(next, fn, options, callback) {
  21. const parallel = options.parallel || 1;
  22. const batchSize = options.batchSize;
  23. const enqueue = asyncQueue();
  24. return promiseOrCallback(callback, cb => {
  25. if (batchSize != null) {
  26. if (typeof batchSize !== 'number') {
  27. throw new TypeError('batchSize must be a number');
  28. }
  29. if (batchSize < 1) {
  30. throw new TypeError('batchSize must be at least 1');
  31. }
  32. if (batchSize !== Math.floor(batchSize)) {
  33. throw new TypeError('batchSize must be a positive integer');
  34. }
  35. }
  36. iterate(cb);
  37. });
  38. function iterate(finalCallback) {
  39. let drained = false;
  40. let handleResultsInProgress = 0;
  41. let currentDocumentIndex = 0;
  42. let documentsBatch = [];
  43. let error = null;
  44. for (let i = 0; i < parallel; ++i) {
  45. enqueue(fetch);
  46. }
  47. function fetch(done) {
  48. if (drained || error) {
  49. return done();
  50. }
  51. next(function(err, doc) {
  52. if (drained || error != null) {
  53. return done();
  54. }
  55. if (err != null) {
  56. error = err;
  57. finalCallback(err);
  58. return done();
  59. }
  60. if (doc == null) {
  61. drained = true;
  62. if (handleResultsInProgress <= 0) {
  63. finalCallback(null);
  64. } else if (batchSize != null && documentsBatch.length) {
  65. handleNextResult(documentsBatch, currentDocumentIndex++, handleNextResultCallBack);
  66. }
  67. return done();
  68. }
  69. ++handleResultsInProgress;
  70. // Kick off the subsequent `next()` before handling the result, but
  71. // make sure we know that we still have a result to handle re: #8422
  72. immediate(() => done());
  73. if (batchSize != null) {
  74. documentsBatch.push(doc);
  75. }
  76. // If the current documents size is less than the provided patch size don't process the documents yet
  77. if (batchSize != null && documentsBatch.length !== batchSize) {
  78. setTimeout(() => enqueue(fetch), 0);
  79. return;
  80. }
  81. const docsToProcess = batchSize != null ? documentsBatch : doc;
  82. function handleNextResultCallBack(err) {
  83. if (batchSize != null) {
  84. handleResultsInProgress -= documentsBatch.length;
  85. documentsBatch = [];
  86. } else {
  87. --handleResultsInProgress;
  88. }
  89. if (err != null) {
  90. error = err;
  91. return finalCallback(err);
  92. }
  93. if (drained && handleResultsInProgress <= 0) {
  94. return finalCallback(null);
  95. }
  96. setTimeout(() => enqueue(fetch), 0);
  97. }
  98. handleNextResult(docsToProcess, currentDocumentIndex++, handleNextResultCallBack);
  99. });
  100. }
  101. }
  102. function handleNextResult(doc, i, callback) {
  103. const promise = fn(doc, i);
  104. if (promise && typeof promise.then === 'function') {
  105. promise.then(
  106. function() { callback(null); },
  107. function(error) { callback(error || new Error('`eachAsync()` promise rejected without error')); });
  108. } else {
  109. callback(null);
  110. }
  111. }
  112. };
  113. // `next()` can only execute one at a time, so make sure we always execute
  114. // `next()` in series, while still allowing multiple `fn()` instances to run
  115. // in parallel.
  116. function asyncQueue() {
  117. const _queue = [];
  118. let inProgress = null;
  119. let id = 0;
  120. return function enqueue(fn) {
  121. if (_queue.length === 0 && inProgress == null) {
  122. inProgress = id++;
  123. return fn(_step);
  124. }
  125. _queue.push(fn);
  126. };
  127. function _step() {
  128. inProgress = null;
  129. if (_queue.length > 0) {
  130. inProgress = id++;
  131. const fn = _queue.shift();
  132. fn(_step);
  133. }
  134. }
  135. }