ForkJoinObservable.js 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. /** PURE_IMPORTS_START .._Observable,._EmptyObservable,.._util_isArray,.._util_subscribeToResult,.._OuterSubscriber PURE_IMPORTS_END */
  2. var __extends = (this && this.__extends) || function (d, b) {
  3. for (var p in b)
  4. if (b.hasOwnProperty(p))
  5. d[p] = b[p];
  6. function __() { this.constructor = d; }
  7. d.prototype = b === null ? Object.create(b) : (__.prototype = b.prototype, new __());
  8. };
  9. import { Observable } from '../Observable';
  10. import { EmptyObservable } from './EmptyObservable';
  11. import { isArray } from '../util/isArray';
  12. import { subscribeToResult } from '../util/subscribeToResult';
  13. import { OuterSubscriber } from '../OuterSubscriber';
  14. /**
  15. * We need this JSDoc comment for affecting ESDoc.
  16. * @extends {Ignored}
  17. * @hide true
  18. */
  19. export var ForkJoinObservable = /*@__PURE__*/ (/*@__PURE__*/ function (_super) {
  20. __extends(ForkJoinObservable, _super);
  21. function ForkJoinObservable(sources, resultSelector) {
  22. _super.call(this);
  23. this.sources = sources;
  24. this.resultSelector = resultSelector;
  25. }
  26. /* tslint:enable:max-line-length */
  27. /**
  28. * Joins last values emitted by passed Observables.
  29. *
  30. * <span class="informal">Wait for Observables to complete and then combine last values they emitted.</span>
  31. *
  32. * <img src="./img/forkJoin.png" width="100%">
  33. *
  34. * `forkJoin` is an operator that takes any number of Observables which can be passed either as an array
  35. * or directly as arguments. If no input Observables are provided, resulting stream will complete
  36. * immediately.
  37. *
  38. * `forkJoin` will wait for all passed Observables to complete and then it will emit an array with last
  39. * values from corresponding Observables. So if you pass `n` Observables to the operator, resulting
  40. * array will have `n` values, where first value is the last thing emitted by the first Observable,
  41. * second value is the last thing emitted by the second Observable and so on. That means `forkJoin` will
  42. * not emit more than once and it will complete after that. If you need to emit combined values not only
  43. * at the end of lifecycle of passed Observables, but also throughout it, try out {@link combineLatest}
  44. * or {@link zip} instead.
  45. *
  46. * In order for resulting array to have the same length as the number of input Observables, whenever any of
  47. * that Observables completes without emitting any value, `forkJoin` will complete at that moment as well
  48. * and it will not emit anything either, even if it already has some last values from other Observables.
  49. * Conversely, if there is an Observable that never completes, `forkJoin` will never complete as well,
  50. * unless at any point some other Observable completes without emitting value, which brings us back to
  51. * the previous case. Overall, in order for `forkJoin` to emit a value, all Observables passed as arguments
  52. * have to emit something at least once and complete.
  53. *
  54. * If any input Observable errors at some point, `forkJoin` will error as well and all other Observables
  55. * will be immediately unsubscribed.
  56. *
  57. * Optionally `forkJoin` accepts project function, that will be called with values which normally
  58. * would land in emitted array. Whatever is returned by project function, will appear in output
  59. * Observable instead. This means that default project can be thought of as a function that takes
  60. * all its arguments and puts them into an array. Note that project function will be called only
  61. * when output Observable is supposed to emit a result.
  62. *
  63. * @example <caption>Use forkJoin with operator emitting immediately</caption>
  64. * const observable = Rx.Observable.forkJoin(
  65. * Rx.Observable.of(1, 2, 3, 4),
  66. * Rx.Observable.of(5, 6, 7, 8)
  67. * );
  68. * observable.subscribe(
  69. * value => console.log(value),
  70. * err => {},
  71. * () => console.log('This is how it ends!')
  72. * );
  73. *
  74. * // Logs:
  75. * // [4, 8]
  76. * // "This is how it ends!"
  77. *
  78. *
  79. * @example <caption>Use forkJoin with operator emitting after some time</caption>
  80. * const observable = Rx.Observable.forkJoin(
  81. * Rx.Observable.interval(1000).take(3), // emit 0, 1, 2 every second and complete
  82. * Rx.Observable.interval(500).take(4) // emit 0, 1, 2, 3 every half a second and complete
  83. * );
  84. * observable.subscribe(
  85. * value => console.log(value),
  86. * err => {},
  87. * () => console.log('This is how it ends!')
  88. * );
  89. *
  90. * // Logs:
  91. * // [2, 3] after 3 seconds
  92. * // "This is how it ends!" immediately after
  93. *
  94. *
  95. * @example <caption>Use forkJoin with project function</caption>
  96. * const observable = Rx.Observable.forkJoin(
  97. * Rx.Observable.interval(1000).take(3), // emit 0, 1, 2 every second and complete
  98. * Rx.Observable.interval(500).take(4), // emit 0, 1, 2, 3 every half a second and complete
  99. * (n, m) => n + m
  100. * );
  101. * observable.subscribe(
  102. * value => console.log(value),
  103. * err => {},
  104. * () => console.log('This is how it ends!')
  105. * );
  106. *
  107. * // Logs:
  108. * // 5 after 3 seconds
  109. * // "This is how it ends!" immediately after
  110. *
  111. * @see {@link combineLatest}
  112. * @see {@link zip}
  113. *
  114. * @param {...SubscribableOrPromise} sources Any number of Observables provided either as an array or as an arguments
  115. * passed directly to the operator.
  116. * @param {function} [project] Function that takes values emitted by input Observables and returns value
  117. * that will appear in resulting Observable instead of default array.
  118. * @return {Observable} Observable emitting either an array of last values emitted by passed Observables
  119. * or value from project function.
  120. * @static true
  121. * @name forkJoin
  122. * @owner Observable
  123. */
  124. ForkJoinObservable.create = function () {
  125. var sources = [];
  126. for (var _i = 0; _i < arguments.length; _i++) {
  127. sources[_i - 0] = arguments[_i];
  128. }
  129. if (sources === null || arguments.length === 0) {
  130. return new EmptyObservable();
  131. }
  132. var resultSelector = null;
  133. if (typeof sources[sources.length - 1] === 'function') {
  134. resultSelector = sources.pop();
  135. }
  136. // if the first and only other argument besides the resultSelector is an array
  137. // assume it's been called with `forkJoin([obs1, obs2, obs3], resultSelector)`
  138. if (sources.length === 1 && isArray(sources[0])) {
  139. sources = sources[0];
  140. }
  141. if (sources.length === 0) {
  142. return new EmptyObservable();
  143. }
  144. return new ForkJoinObservable(sources, resultSelector);
  145. };
  146. /** @deprecated internal use only */ ForkJoinObservable.prototype._subscribe = function (subscriber) {
  147. return new ForkJoinSubscriber(subscriber, this.sources, this.resultSelector);
  148. };
  149. return ForkJoinObservable;
  150. }(Observable));
  151. /**
  152. * We need this JSDoc comment for affecting ESDoc.
  153. * @ignore
  154. * @extends {Ignored}
  155. */
  156. var ForkJoinSubscriber = /*@__PURE__*/ (/*@__PURE__*/ function (_super) {
  157. __extends(ForkJoinSubscriber, _super);
  158. function ForkJoinSubscriber(destination, sources, resultSelector) {
  159. _super.call(this, destination);
  160. this.sources = sources;
  161. this.resultSelector = resultSelector;
  162. this.completed = 0;
  163. this.haveValues = 0;
  164. var len = sources.length;
  165. this.total = len;
  166. this.values = new Array(len);
  167. for (var i = 0; i < len; i++) {
  168. var source = sources[i];
  169. var innerSubscription = subscribeToResult(this, source, null, i);
  170. if (innerSubscription) {
  171. innerSubscription.outerIndex = i;
  172. this.add(innerSubscription);
  173. }
  174. }
  175. }
  176. ForkJoinSubscriber.prototype.notifyNext = function (outerValue, innerValue, outerIndex, innerIndex, innerSub) {
  177. this.values[outerIndex] = innerValue;
  178. if (!innerSub._hasValue) {
  179. innerSub._hasValue = true;
  180. this.haveValues++;
  181. }
  182. };
  183. ForkJoinSubscriber.prototype.notifyComplete = function (innerSub) {
  184. var destination = this.destination;
  185. var _a = this, haveValues = _a.haveValues, resultSelector = _a.resultSelector, values = _a.values;
  186. var len = values.length;
  187. if (!innerSub._hasValue) {
  188. destination.complete();
  189. return;
  190. }
  191. this.completed++;
  192. if (this.completed !== len) {
  193. return;
  194. }
  195. if (haveValues === len) {
  196. var value = resultSelector ? resultSelector.apply(this, values) : values;
  197. destination.next(value);
  198. }
  199. destination.complete();
  200. };
  201. return ForkJoinSubscriber;
  202. }(OuterSubscriber));
  203. //# sourceMappingURL=ForkJoinObservable.js.map