mergeScan.js 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. import { SimpleOuterSubscriber, SimpleInnerSubscriber, innerSubscribe } from '../innerSubscribe';
  2. export function mergeScan(accumulator, seed, concurrent = Number.POSITIVE_INFINITY) {
  3. return (source) => source.lift(new MergeScanOperator(accumulator, seed, concurrent));
  4. }
  5. export class MergeScanOperator {
  6. constructor(accumulator, seed, concurrent) {
  7. this.accumulator = accumulator;
  8. this.seed = seed;
  9. this.concurrent = concurrent;
  10. }
  11. call(subscriber, source) {
  12. return source.subscribe(new MergeScanSubscriber(subscriber, this.accumulator, this.seed, this.concurrent));
  13. }
  14. }
  15. export class MergeScanSubscriber extends SimpleOuterSubscriber {
  16. constructor(destination, accumulator, acc, concurrent) {
  17. super(destination);
  18. this.accumulator = accumulator;
  19. this.acc = acc;
  20. this.concurrent = concurrent;
  21. this.hasValue = false;
  22. this.hasCompleted = false;
  23. this.buffer = [];
  24. this.active = 0;
  25. this.index = 0;
  26. }
  27. _next(value) {
  28. if (this.active < this.concurrent) {
  29. const index = this.index++;
  30. const destination = this.destination;
  31. let ish;
  32. try {
  33. const { accumulator } = this;
  34. ish = accumulator(this.acc, value, index);
  35. }
  36. catch (e) {
  37. return destination.error(e);
  38. }
  39. this.active++;
  40. this._innerSub(ish);
  41. }
  42. else {
  43. this.buffer.push(value);
  44. }
  45. }
  46. _innerSub(ish) {
  47. const innerSubscriber = new SimpleInnerSubscriber(this);
  48. const destination = this.destination;
  49. destination.add(innerSubscriber);
  50. const innerSubscription = innerSubscribe(ish, innerSubscriber);
  51. if (innerSubscription !== innerSubscriber) {
  52. destination.add(innerSubscription);
  53. }
  54. }
  55. _complete() {
  56. this.hasCompleted = true;
  57. if (this.active === 0 && this.buffer.length === 0) {
  58. if (this.hasValue === false) {
  59. this.destination.next(this.acc);
  60. }
  61. this.destination.complete();
  62. }
  63. this.unsubscribe();
  64. }
  65. notifyNext(innerValue) {
  66. const { destination } = this;
  67. this.acc = innerValue;
  68. this.hasValue = true;
  69. destination.next(innerValue);
  70. }
  71. notifyComplete() {
  72. const buffer = this.buffer;
  73. this.active--;
  74. if (buffer.length > 0) {
  75. this._next(buffer.shift());
  76. }
  77. else if (this.active === 0 && this.hasCompleted) {
  78. if (this.hasValue === false) {
  79. this.destination.next(this.acc);
  80. }
  81. this.destination.complete();
  82. }
  83. }
  84. }
  85. //# sourceMappingURL=mergeScan.js.map