index.js 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. 'use strict';
  2. const AggregateError = require('aggregate-error');
  3. module.exports = async (
  4. iterable,
  5. mapper,
  6. {
  7. concurrency = Infinity,
  8. stopOnError = true
  9. } = {}
  10. ) => {
  11. return new Promise((resolve, reject) => {
  12. if (typeof mapper !== 'function') {
  13. throw new TypeError('Mapper function is required');
  14. }
  15. if (!((Number.isSafeInteger(concurrency) || concurrency === Infinity) && concurrency >= 1)) {
  16. throw new TypeError(`Expected \`concurrency\` to be an integer from 1 and up or \`Infinity\`, got \`${concurrency}\` (${typeof concurrency})`);
  17. }
  18. const result = [];
  19. const errors = [];
  20. const iterator = iterable[Symbol.iterator]();
  21. let isRejected = false;
  22. let isIterableDone = false;
  23. let resolvingCount = 0;
  24. let currentIndex = 0;
  25. const next = () => {
  26. if (isRejected) {
  27. return;
  28. }
  29. const nextItem = iterator.next();
  30. const index = currentIndex;
  31. currentIndex++;
  32. if (nextItem.done) {
  33. isIterableDone = true;
  34. if (resolvingCount === 0) {
  35. if (!stopOnError && errors.length !== 0) {
  36. reject(new AggregateError(errors));
  37. } else {
  38. resolve(result);
  39. }
  40. }
  41. return;
  42. }
  43. resolvingCount++;
  44. (async () => {
  45. try {
  46. const element = await nextItem.value;
  47. result[index] = await mapper(element, index);
  48. resolvingCount--;
  49. next();
  50. } catch (error) {
  51. if (stopOnError) {
  52. isRejected = true;
  53. reject(error);
  54. } else {
  55. errors.push(error);
  56. resolvingCount--;
  57. next();
  58. }
  59. }
  60. })();
  61. };
  62. for (let i = 0; i < concurrency; i++) {
  63. next();
  64. if (isIterableDone) {
  65. break;
  66. }
  67. }
  68. });
  69. };