ForkJoinObservable.js 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. import { Observable } from '../Observable';
  2. import { EmptyObservable } from './EmptyObservable';
  3. import { isArray } from '../util/isArray';
  4. import { subscribeToResult } from '../util/subscribeToResult';
  5. import { OuterSubscriber } from '../OuterSubscriber';
  6. /**
  7. * We need this JSDoc comment for affecting ESDoc.
  8. * @extends {Ignored}
  9. * @hide true
  10. */
  11. export class ForkJoinObservable extends Observable {
  12. constructor(sources, resultSelector) {
  13. super();
  14. this.sources = sources;
  15. this.resultSelector = resultSelector;
  16. }
  17. /* tslint:enable:max-line-length */
  18. /**
  19. * Joins last values emitted by passed Observables.
  20. *
  21. * <span class="informal">Wait for Observables to complete and then combine last values they emitted.</span>
  22. *
  23. * <img src="./img/forkJoin.png" width="100%">
  24. *
  25. * `forkJoin` is an operator that takes any number of Observables which can be passed either as an array
  26. * or directly as arguments. If no input Observables are provided, resulting stream will complete
  27. * immediately.
  28. *
  29. * `forkJoin` will wait for all passed Observables to complete and then it will emit an array with last
  30. * values from corresponding Observables. So if you pass `n` Observables to the operator, resulting
  31. * array will have `n` values, where first value is the last thing emitted by the first Observable,
  32. * second value is the last thing emitted by the second Observable and so on. That means `forkJoin` will
  33. * not emit more than once and it will complete after that. If you need to emit combined values not only
  34. * at the end of lifecycle of passed Observables, but also throughout it, try out {@link combineLatest}
  35. * or {@link zip} instead.
  36. *
  37. * In order for resulting array to have the same length as the number of input Observables, whenever any of
  38. * that Observables completes without emitting any value, `forkJoin` will complete at that moment as well
  39. * and it will not emit anything either, even if it already has some last values from other Observables.
  40. * Conversely, if there is an Observable that never completes, `forkJoin` will never complete as well,
  41. * unless at any point some other Observable completes without emitting value, which brings us back to
  42. * the previous case. Overall, in order for `forkJoin` to emit a value, all Observables passed as arguments
  43. * have to emit something at least once and complete.
  44. *
  45. * If any input Observable errors at some point, `forkJoin` will error as well and all other Observables
  46. * will be immediately unsubscribed.
  47. *
  48. * Optionally `forkJoin` accepts project function, that will be called with values which normally
  49. * would land in emitted array. Whatever is returned by project function, will appear in output
  50. * Observable instead. This means that default project can be thought of as a function that takes
  51. * all its arguments and puts them into an array. Note that project function will be called only
  52. * when output Observable is supposed to emit a result.
  53. *
  54. * @example <caption>Use forkJoin with operator emitting immediately</caption>
  55. * const observable = Rx.Observable.forkJoin(
  56. * Rx.Observable.of(1, 2, 3, 4),
  57. * Rx.Observable.of(5, 6, 7, 8)
  58. * );
  59. * observable.subscribe(
  60. * value => console.log(value),
  61. * err => {},
  62. * () => console.log('This is how it ends!')
  63. * );
  64. *
  65. * // Logs:
  66. * // [4, 8]
  67. * // "This is how it ends!"
  68. *
  69. *
  70. * @example <caption>Use forkJoin with operator emitting after some time</caption>
  71. * const observable = Rx.Observable.forkJoin(
  72. * Rx.Observable.interval(1000).take(3), // emit 0, 1, 2 every second and complete
  73. * Rx.Observable.interval(500).take(4) // emit 0, 1, 2, 3 every half a second and complete
  74. * );
  75. * observable.subscribe(
  76. * value => console.log(value),
  77. * err => {},
  78. * () => console.log('This is how it ends!')
  79. * );
  80. *
  81. * // Logs:
  82. * // [2, 3] after 3 seconds
  83. * // "This is how it ends!" immediately after
  84. *
  85. *
  86. * @example <caption>Use forkJoin with project function</caption>
  87. * const observable = Rx.Observable.forkJoin(
  88. * Rx.Observable.interval(1000).take(3), // emit 0, 1, 2 every second and complete
  89. * Rx.Observable.interval(500).take(4), // emit 0, 1, 2, 3 every half a second and complete
  90. * (n, m) => n + m
  91. * );
  92. * observable.subscribe(
  93. * value => console.log(value),
  94. * err => {},
  95. * () => console.log('This is how it ends!')
  96. * );
  97. *
  98. * // Logs:
  99. * // 5 after 3 seconds
  100. * // "This is how it ends!" immediately after
  101. *
  102. * @see {@link combineLatest}
  103. * @see {@link zip}
  104. *
  105. * @param {...SubscribableOrPromise} sources Any number of Observables provided either as an array or as an arguments
  106. * passed directly to the operator.
  107. * @param {function} [project] Function that takes values emitted by input Observables and returns value
  108. * that will appear in resulting Observable instead of default array.
  109. * @return {Observable} Observable emitting either an array of last values emitted by passed Observables
  110. * or value from project function.
  111. * @static true
  112. * @name forkJoin
  113. * @owner Observable
  114. */
  115. static create(...sources) {
  116. if (sources === null || arguments.length === 0) {
  117. return new EmptyObservable();
  118. }
  119. let resultSelector = null;
  120. if (typeof sources[sources.length - 1] === 'function') {
  121. resultSelector = sources.pop();
  122. }
  123. // if the first and only other argument besides the resultSelector is an array
  124. // assume it's been called with `forkJoin([obs1, obs2, obs3], resultSelector)`
  125. if (sources.length === 1 && isArray(sources[0])) {
  126. sources = sources[0];
  127. }
  128. if (sources.length === 0) {
  129. return new EmptyObservable();
  130. }
  131. return new ForkJoinObservable(sources, resultSelector);
  132. }
  133. /** @deprecated internal use only */ _subscribe(subscriber) {
  134. return new ForkJoinSubscriber(subscriber, this.sources, this.resultSelector);
  135. }
  136. }
  137. /**
  138. * We need this JSDoc comment for affecting ESDoc.
  139. * @ignore
  140. * @extends {Ignored}
  141. */
  142. class ForkJoinSubscriber extends OuterSubscriber {
  143. constructor(destination, sources, resultSelector) {
  144. super(destination);
  145. this.sources = sources;
  146. this.resultSelector = resultSelector;
  147. this.completed = 0;
  148. this.haveValues = 0;
  149. const len = sources.length;
  150. this.total = len;
  151. this.values = new Array(len);
  152. for (let i = 0; i < len; i++) {
  153. const source = sources[i];
  154. const innerSubscription = subscribeToResult(this, source, null, i);
  155. if (innerSubscription) {
  156. innerSubscription.outerIndex = i;
  157. this.add(innerSubscription);
  158. }
  159. }
  160. }
  161. notifyNext(outerValue, innerValue, outerIndex, innerIndex, innerSub) {
  162. this.values[outerIndex] = innerValue;
  163. if (!innerSub._hasValue) {
  164. innerSub._hasValue = true;
  165. this.haveValues++;
  166. }
  167. }
  168. notifyComplete(innerSub) {
  169. const destination = this.destination;
  170. const { haveValues, resultSelector, values } = this;
  171. const len = values.length;
  172. if (!innerSub._hasValue) {
  173. destination.complete();
  174. return;
  175. }
  176. this.completed++;
  177. if (this.completed !== len) {
  178. return;
  179. }
  180. if (haveValues === len) {
  181. const value = resultSelector ? resultSelector.apply(this, values) : values;
  182. destination.next(value);
  183. }
  184. destination.complete();
  185. }
  186. }
  187. //# sourceMappingURL=ForkJoinObservable.js.map