withLatestFrom.js 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. import { OuterSubscriber } from '../OuterSubscriber';
  2. import { subscribeToResult } from '../util/subscribeToResult';
  3. /* tslint:enable:max-line-length */
  4. /**
  5. * Combines the source Observable with other Observables to create an Observable
  6. * whose values are calculated from the latest values of each, only when the
  7. * source emits.
  8. *
  9. * <span class="informal">Whenever the source Observable emits a value, it
  10. * computes a formula using that value plus the latest values from other input
  11. * Observables, then emits the output of that formula.</span>
  12. *
  13. * <img src="./img/withLatestFrom.png" width="100%">
  14. *
  15. * `withLatestFrom` combines each value from the source Observable (the
  16. * instance) with the latest values from the other input Observables only when
  17. * the source emits a value, optionally using a `project` function to determine
  18. * the value to be emitted on the output Observable. All input Observables must
  19. * emit at least one value before the output Observable will emit a value.
  20. *
  21. * @example <caption>On every click event, emit an array with the latest timer event plus the click event</caption>
  22. * var clicks = Rx.Observable.fromEvent(document, 'click');
  23. * var timer = Rx.Observable.interval(1000);
  24. * var result = clicks.withLatestFrom(timer);
  25. * result.subscribe(x => console.log(x));
  26. *
  27. * @see {@link combineLatest}
  28. *
  29. * @param {ObservableInput} other An input Observable to combine with the source
  30. * Observable. More than one input Observables may be given as argument.
  31. * @param {Function} [project] Projection function for combining values
  32. * together. Receives all values in order of the Observables passed, where the
  33. * first parameter is a value from the source Observable. (e.g.
  34. * `a.withLatestFrom(b, c, (a1, b1, c1) => a1 + b1 + c1)`). If this is not
  35. * passed, arrays will be emitted on the output Observable.
  36. * @return {Observable} An Observable of projected values from the most recent
  37. * values from each input Observable, or an array of the most recent values from
  38. * each input Observable.
  39. * @method withLatestFrom
  40. * @owner Observable
  41. */
  42. export function withLatestFrom(...args) {
  43. return (source) => {
  44. let project;
  45. if (typeof args[args.length - 1] === 'function') {
  46. project = args.pop();
  47. }
  48. const observables = args;
  49. return source.lift(new WithLatestFromOperator(observables, project));
  50. };
  51. }
  52. class WithLatestFromOperator {
  53. constructor(observables, project) {
  54. this.observables = observables;
  55. this.project = project;
  56. }
  57. call(subscriber, source) {
  58. return source.subscribe(new WithLatestFromSubscriber(subscriber, this.observables, this.project));
  59. }
  60. }
  61. /**
  62. * We need this JSDoc comment for affecting ESDoc.
  63. * @ignore
  64. * @extends {Ignored}
  65. */
  66. class WithLatestFromSubscriber extends OuterSubscriber {
  67. constructor(destination, observables, project) {
  68. super(destination);
  69. this.observables = observables;
  70. this.project = project;
  71. this.toRespond = [];
  72. const len = observables.length;
  73. this.values = new Array(len);
  74. for (let i = 0; i < len; i++) {
  75. this.toRespond.push(i);
  76. }
  77. for (let i = 0; i < len; i++) {
  78. let observable = observables[i];
  79. this.add(subscribeToResult(this, observable, observable, i));
  80. }
  81. }
  82. notifyNext(outerValue, innerValue, outerIndex, innerIndex, innerSub) {
  83. this.values[outerIndex] = innerValue;
  84. const toRespond = this.toRespond;
  85. if (toRespond.length > 0) {
  86. const found = toRespond.indexOf(outerIndex);
  87. if (found !== -1) {
  88. toRespond.splice(found, 1);
  89. }
  90. }
  91. }
  92. notifyComplete() {
  93. // noop
  94. }
  95. _next(value) {
  96. if (this.toRespond.length === 0) {
  97. const args = [value, ...this.values];
  98. if (this.project) {
  99. this._tryProject(args);
  100. }
  101. else {
  102. this.destination.next(args);
  103. }
  104. }
  105. }
  106. _tryProject(args) {
  107. let result;
  108. try {
  109. result = this.project.apply(this, args);
  110. }
  111. catch (err) {
  112. this.destination.error(err);
  113. return;
  114. }
  115. this.destination.next(result);
  116. }
  117. }
  118. //# sourceMappingURL=withLatestFrom.js.map