combineLatest.js 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. import { ArrayObservable } from '../observable/ArrayObservable';
  2. import { isArray } from '../util/isArray';
  3. import { OuterSubscriber } from '../OuterSubscriber';
  4. import { subscribeToResult } from '../util/subscribeToResult';
  5. const none = {};
  6. /* tslint:enable:max-line-length */
  7. /**
  8. * Combines multiple Observables to create an Observable whose values are
  9. * calculated from the latest values of each of its input Observables.
  10. *
  11. * <span class="informal">Whenever any input Observable emits a value, it
  12. * computes a formula using the latest values from all the inputs, then emits
  13. * the output of that formula.</span>
  14. *
  15. * <img src="./img/combineLatest.png" width="100%">
  16. *
  17. * `combineLatest` combines the values from this Observable with values from
  18. * Observables passed as arguments. This is done by subscribing to each
  19. * Observable, in order, and collecting an array of each of the most recent
  20. * values any time any of the input Observables emits, then either taking that
  21. * array and passing it as arguments to an optional `project` function and
  22. * emitting the return value of that, or just emitting the array of recent
  23. * values directly if there is no `project` function.
  24. *
  25. * @example <caption>Dynamically calculate the Body-Mass Index from an Observable of weight and one for height</caption>
  26. * var weight = Rx.Observable.of(70, 72, 76, 79, 75);
  27. * var height = Rx.Observable.of(1.76, 1.77, 1.78);
  28. * var bmi = weight.combineLatest(height, (w, h) => w / (h * h));
  29. * bmi.subscribe(x => console.log('BMI is ' + x));
  30. *
  31. * // With output to console:
  32. * // BMI is 24.212293388429753
  33. * // BMI is 23.93948099205209
  34. * // BMI is 23.671253629592222
  35. *
  36. * @see {@link combineAll}
  37. * @see {@link merge}
  38. * @see {@link withLatestFrom}
  39. *
  40. * @param {ObservableInput} other An input Observable to combine with the source
  41. * Observable. More than one input Observables may be given as argument.
  42. * @param {function} [project] An optional function to project the values from
  43. * the combined latest values into a new value on the output Observable.
  44. * @return {Observable} An Observable of projected values from the most recent
  45. * values from each input Observable, or an array of the most recent values from
  46. * each input Observable.
  47. * @method combineLatest
  48. * @owner Observable
  49. */
  50. export function combineLatest(...observables) {
  51. let project = null;
  52. if (typeof observables[observables.length - 1] === 'function') {
  53. project = observables.pop();
  54. }
  55. // if the first and only other argument besides the resultSelector is an array
  56. // assume it's been called with `combineLatest([obs1, obs2, obs3], project)`
  57. if (observables.length === 1 && isArray(observables[0])) {
  58. observables = observables[0].slice();
  59. }
  60. return (source) => source.lift.call(new ArrayObservable([source, ...observables]), new CombineLatestOperator(project));
  61. }
  62. export class CombineLatestOperator {
  63. constructor(project) {
  64. this.project = project;
  65. }
  66. call(subscriber, source) {
  67. return source.subscribe(new CombineLatestSubscriber(subscriber, this.project));
  68. }
  69. }
  70. /**
  71. * We need this JSDoc comment for affecting ESDoc.
  72. * @ignore
  73. * @extends {Ignored}
  74. */
  75. export class CombineLatestSubscriber extends OuterSubscriber {
  76. constructor(destination, project) {
  77. super(destination);
  78. this.project = project;
  79. this.active = 0;
  80. this.values = [];
  81. this.observables = [];
  82. }
  83. _next(observable) {
  84. this.values.push(none);
  85. this.observables.push(observable);
  86. }
  87. _complete() {
  88. const observables = this.observables;
  89. const len = observables.length;
  90. if (len === 0) {
  91. this.destination.complete();
  92. }
  93. else {
  94. this.active = len;
  95. this.toRespond = len;
  96. for (let i = 0; i < len; i++) {
  97. const observable = observables[i];
  98. this.add(subscribeToResult(this, observable, observable, i));
  99. }
  100. }
  101. }
  102. notifyComplete(unused) {
  103. if ((this.active -= 1) === 0) {
  104. this.destination.complete();
  105. }
  106. }
  107. notifyNext(outerValue, innerValue, outerIndex, innerIndex, innerSub) {
  108. const values = this.values;
  109. const oldVal = values[outerIndex];
  110. const toRespond = !this.toRespond
  111. ? 0
  112. : oldVal === none ? --this.toRespond : this.toRespond;
  113. values[outerIndex] = innerValue;
  114. if (toRespond === 0) {
  115. if (this.project) {
  116. this._tryProject(values);
  117. }
  118. else {
  119. this.destination.next(values.slice());
  120. }
  121. }
  122. }
  123. _tryProject(values) {
  124. let result;
  125. try {
  126. result = this.project.apply(this, values);
  127. }
  128. catch (err) {
  129. this.destination.error(err);
  130. return;
  131. }
  132. this.destination.next(result);
  133. }
  134. }
  135. //# sourceMappingURL=combineLatest.js.map