mergeMap.js 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. import { subscribeToResult } from '../util/subscribeToResult';
  2. import { OuterSubscriber } from '../OuterSubscriber';
  3. import { InnerSubscriber } from '../InnerSubscriber';
  4. import { map } from './map';
  5. import { from } from '../observable/from';
  6. export function mergeMap(project, resultSelector, concurrent = Number.POSITIVE_INFINITY) {
  7. if (typeof resultSelector === 'function') {
  8. return (source) => source.pipe(mergeMap((a, i) => from(project(a, i)).pipe(map((b, ii) => resultSelector(a, b, i, ii))), concurrent));
  9. }
  10. else if (typeof resultSelector === 'number') {
  11. concurrent = resultSelector;
  12. }
  13. return (source) => source.lift(new MergeMapOperator(project, concurrent));
  14. }
  15. export class MergeMapOperator {
  16. constructor(project, concurrent = Number.POSITIVE_INFINITY) {
  17. this.project = project;
  18. this.concurrent = concurrent;
  19. }
  20. call(observer, source) {
  21. return source.subscribe(new MergeMapSubscriber(observer, this.project, this.concurrent));
  22. }
  23. }
  24. export class MergeMapSubscriber extends OuterSubscriber {
  25. constructor(destination, project, concurrent = Number.POSITIVE_INFINITY) {
  26. super(destination);
  27. this.project = project;
  28. this.concurrent = concurrent;
  29. this.hasCompleted = false;
  30. this.buffer = [];
  31. this.active = 0;
  32. this.index = 0;
  33. }
  34. _next(value) {
  35. if (this.active < this.concurrent) {
  36. this._tryNext(value);
  37. }
  38. else {
  39. this.buffer.push(value);
  40. }
  41. }
  42. _tryNext(value) {
  43. let result;
  44. const index = this.index++;
  45. try {
  46. result = this.project(value, index);
  47. }
  48. catch (err) {
  49. this.destination.error(err);
  50. return;
  51. }
  52. this.active++;
  53. this._innerSub(result, value, index);
  54. }
  55. _innerSub(ish, value, index) {
  56. const innerSubscriber = new InnerSubscriber(this, value, index);
  57. const destination = this.destination;
  58. destination.add(innerSubscriber);
  59. const innerSubscription = subscribeToResult(this, ish, undefined, undefined, innerSubscriber);
  60. if (innerSubscription !== innerSubscriber) {
  61. destination.add(innerSubscription);
  62. }
  63. }
  64. _complete() {
  65. this.hasCompleted = true;
  66. if (this.active === 0 && this.buffer.length === 0) {
  67. this.destination.complete();
  68. }
  69. this.unsubscribe();
  70. }
  71. notifyNext(outerValue, innerValue, outerIndex, innerIndex, innerSub) {
  72. this.destination.next(innerValue);
  73. }
  74. notifyComplete(innerSub) {
  75. const buffer = this.buffer;
  76. this.remove(innerSub);
  77. this.active--;
  78. if (buffer.length > 0) {
  79. this._next(buffer.shift());
  80. }
  81. else if (this.active === 0 && this.hasCompleted) {
  82. this.destination.complete();
  83. }
  84. }
  85. }
  86. export const flatMap = mergeMap;
  87. //# sourceMappingURL=mergeMap.js.map