expand.js 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. /** PURE_IMPORTS_START .._util_tryCatch,.._util_errorObject,.._OuterSubscriber,.._util_subscribeToResult PURE_IMPORTS_END */
  2. var __extends = (this && this.__extends) || function (d, b) {
  3. for (var p in b)
  4. if (b.hasOwnProperty(p))
  5. d[p] = b[p];
  6. function __() { this.constructor = d; }
  7. d.prototype = b === null ? Object.create(b) : (__.prototype = b.prototype, new __());
  8. };
  9. import { tryCatch } from '../util/tryCatch';
  10. import { errorObject } from '../util/errorObject';
  11. import { OuterSubscriber } from '../OuterSubscriber';
  12. import { subscribeToResult } from '../util/subscribeToResult';
  13. /* tslint:enable:max-line-length */
  14. /**
  15. * Recursively projects each source value to an Observable which is merged in
  16. * the output Observable.
  17. *
  18. * <span class="informal">It's similar to {@link mergeMap}, but applies the
  19. * projection function to every source value as well as every output value.
  20. * It's recursive.</span>
  21. *
  22. * <img src="./img/expand.png" width="100%">
  23. *
  24. * Returns an Observable that emits items based on applying a function that you
  25. * supply to each item emitted by the source Observable, where that function
  26. * returns an Observable, and then merging those resulting Observables and
  27. * emitting the results of this merger. *Expand* will re-emit on the output
  28. * Observable every source value. Then, each output value is given to the
  29. * `project` function which returns an inner Observable to be merged on the
  30. * output Observable. Those output values resulting from the projection are also
  31. * given to the `project` function to produce new output values. This is how
  32. * *expand* behaves recursively.
  33. *
  34. * @example <caption>Start emitting the powers of two on every click, at most 10 of them</caption>
  35. * var clicks = Rx.Observable.fromEvent(document, 'click');
  36. * var powersOfTwo = clicks
  37. * .mapTo(1)
  38. * .expand(x => Rx.Observable.of(2 * x).delay(1000))
  39. * .take(10);
  40. * powersOfTwo.subscribe(x => console.log(x));
  41. *
  42. * @see {@link mergeMap}
  43. * @see {@link mergeScan}
  44. *
  45. * @param {function(value: T, index: number) => Observable} project A function
  46. * that, when applied to an item emitted by the source or the output Observable,
  47. * returns an Observable.
  48. * @param {number} [concurrent=Number.POSITIVE_INFINITY] Maximum number of input
  49. * Observables being subscribed to concurrently.
  50. * @param {Scheduler} [scheduler=null] The IScheduler to use for subscribing to
  51. * each projected inner Observable.
  52. * @return {Observable} An Observable that emits the source values and also
  53. * result of applying the projection function to each value emitted on the
  54. * output Observable and and merging the results of the Observables obtained
  55. * from this transformation.
  56. * @method expand
  57. * @owner Observable
  58. */
  59. export function expand(project, concurrent, scheduler) {
  60. if (concurrent === void 0) {
  61. concurrent = Number.POSITIVE_INFINITY;
  62. }
  63. if (scheduler === void 0) {
  64. scheduler = undefined;
  65. }
  66. concurrent = (concurrent || 0) < 1 ? Number.POSITIVE_INFINITY : concurrent;
  67. return function (source) { return source.lift(new ExpandOperator(project, concurrent, scheduler)); };
  68. }
  69. export var ExpandOperator = /*@__PURE__*/ (/*@__PURE__*/ function () {
  70. function ExpandOperator(project, concurrent, scheduler) {
  71. this.project = project;
  72. this.concurrent = concurrent;
  73. this.scheduler = scheduler;
  74. }
  75. ExpandOperator.prototype.call = function (subscriber, source) {
  76. return source.subscribe(new ExpandSubscriber(subscriber, this.project, this.concurrent, this.scheduler));
  77. };
  78. return ExpandOperator;
  79. }());
  80. /**
  81. * We need this JSDoc comment for affecting ESDoc.
  82. * @ignore
  83. * @extends {Ignored}
  84. */
  85. export var ExpandSubscriber = /*@__PURE__*/ (/*@__PURE__*/ function (_super) {
  86. __extends(ExpandSubscriber, _super);
  87. function ExpandSubscriber(destination, project, concurrent, scheduler) {
  88. _super.call(this, destination);
  89. this.project = project;
  90. this.concurrent = concurrent;
  91. this.scheduler = scheduler;
  92. this.index = 0;
  93. this.active = 0;
  94. this.hasCompleted = false;
  95. if (concurrent < Number.POSITIVE_INFINITY) {
  96. this.buffer = [];
  97. }
  98. }
  99. ExpandSubscriber.dispatch = function (arg) {
  100. var subscriber = arg.subscriber, result = arg.result, value = arg.value, index = arg.index;
  101. subscriber.subscribeToProjection(result, value, index);
  102. };
  103. ExpandSubscriber.prototype._next = function (value) {
  104. var destination = this.destination;
  105. if (destination.closed) {
  106. this._complete();
  107. return;
  108. }
  109. var index = this.index++;
  110. if (this.active < this.concurrent) {
  111. destination.next(value);
  112. var result = tryCatch(this.project)(value, index);
  113. if (result === errorObject) {
  114. destination.error(errorObject.e);
  115. }
  116. else if (!this.scheduler) {
  117. this.subscribeToProjection(result, value, index);
  118. }
  119. else {
  120. var state = { subscriber: this, result: result, value: value, index: index };
  121. this.add(this.scheduler.schedule(ExpandSubscriber.dispatch, 0, state));
  122. }
  123. }
  124. else {
  125. this.buffer.push(value);
  126. }
  127. };
  128. ExpandSubscriber.prototype.subscribeToProjection = function (result, value, index) {
  129. this.active++;
  130. this.add(subscribeToResult(this, result, value, index));
  131. };
  132. ExpandSubscriber.prototype._complete = function () {
  133. this.hasCompleted = true;
  134. if (this.hasCompleted && this.active === 0) {
  135. this.destination.complete();
  136. }
  137. };
  138. ExpandSubscriber.prototype.notifyNext = function (outerValue, innerValue, outerIndex, innerIndex, innerSub) {
  139. this._next(innerValue);
  140. };
  141. ExpandSubscriber.prototype.notifyComplete = function (innerSub) {
  142. var buffer = this.buffer;
  143. this.remove(innerSub);
  144. this.active--;
  145. if (buffer && buffer.length > 0) {
  146. this._next(buffer.shift());
  147. }
  148. if (this.hasCompleted && this.active === 0) {
  149. this.destination.complete();
  150. }
  151. };
  152. return ExpandSubscriber;
  153. }(OuterSubscriber));
  154. //# sourceMappingURL=expand.js.map