exhaust.js 3.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. "use strict";
  2. var __extends = (this && this.__extends) || function (d, b) {
  3. for (var p in b) if (b.hasOwnProperty(p)) d[p] = b[p];
  4. function __() { this.constructor = d; }
  5. d.prototype = b === null ? Object.create(b) : (__.prototype = b.prototype, new __());
  6. };
  7. var OuterSubscriber_1 = require('../OuterSubscriber');
  8. var subscribeToResult_1 = require('../util/subscribeToResult');
  9. /**
  10. * Converts a higher-order Observable into a first-order Observable by dropping
  11. * inner Observables while the previous inner Observable has not yet completed.
  12. *
  13. * <span class="informal">Flattens an Observable-of-Observables by dropping the
  14. * next inner Observables while the current inner is still executing.</span>
  15. *
  16. * <img src="./img/exhaust.png" width="100%">
  17. *
  18. * `exhaust` subscribes to an Observable that emits Observables, also known as a
  19. * higher-order Observable. Each time it observes one of these emitted inner
  20. * Observables, the output Observable begins emitting the items emitted by that
  21. * inner Observable. So far, it behaves like {@link mergeAll}. However,
  22. * `exhaust` ignores every new inner Observable if the previous Observable has
  23. * not yet completed. Once that one completes, it will accept and flatten the
  24. * next inner Observable and repeat this process.
  25. *
  26. * @example <caption>Run a finite timer for each click, only if there is no currently active timer</caption>
  27. * var clicks = Rx.Observable.fromEvent(document, 'click');
  28. * var higherOrder = clicks.map((ev) => Rx.Observable.interval(1000).take(5));
  29. * var result = higherOrder.exhaust();
  30. * result.subscribe(x => console.log(x));
  31. *
  32. * @see {@link combineAll}
  33. * @see {@link concatAll}
  34. * @see {@link switch}
  35. * @see {@link mergeAll}
  36. * @see {@link exhaustMap}
  37. * @see {@link zipAll}
  38. *
  39. * @return {Observable} An Observable that takes a source of Observables and propagates the first observable
  40. * exclusively until it completes before subscribing to the next.
  41. * @method exhaust
  42. * @owner Observable
  43. */
  44. function exhaust() {
  45. return function (source) { return source.lift(new SwitchFirstOperator()); };
  46. }
  47. exports.exhaust = exhaust;
  48. var SwitchFirstOperator = (function () {
  49. function SwitchFirstOperator() {
  50. }
  51. SwitchFirstOperator.prototype.call = function (subscriber, source) {
  52. return source.subscribe(new SwitchFirstSubscriber(subscriber));
  53. };
  54. return SwitchFirstOperator;
  55. }());
  56. /**
  57. * We need this JSDoc comment for affecting ESDoc.
  58. * @ignore
  59. * @extends {Ignored}
  60. */
  61. var SwitchFirstSubscriber = (function (_super) {
  62. __extends(SwitchFirstSubscriber, _super);
  63. function SwitchFirstSubscriber(destination) {
  64. _super.call(this, destination);
  65. this.hasCompleted = false;
  66. this.hasSubscription = false;
  67. }
  68. SwitchFirstSubscriber.prototype._next = function (value) {
  69. if (!this.hasSubscription) {
  70. this.hasSubscription = true;
  71. this.add(subscribeToResult_1.subscribeToResult(this, value));
  72. }
  73. };
  74. SwitchFirstSubscriber.prototype._complete = function () {
  75. this.hasCompleted = true;
  76. if (!this.hasSubscription) {
  77. this.destination.complete();
  78. }
  79. };
  80. SwitchFirstSubscriber.prototype.notifyComplete = function (innerSub) {
  81. this.remove(innerSub);
  82. this.hasSubscription = false;
  83. if (this.hasCompleted) {
  84. this.destination.complete();
  85. }
  86. };
  87. return SwitchFirstSubscriber;
  88. }(OuterSubscriber_1.OuterSubscriber));
  89. //# sourceMappingURL=exhaust.js.map