windowWhen.js 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. "use strict";
  2. var __extends = (this && this.__extends) || function (d, b) {
  3. for (var p in b) if (b.hasOwnProperty(p)) d[p] = b[p];
  4. function __() { this.constructor = d; }
  5. d.prototype = b === null ? Object.create(b) : (__.prototype = b.prototype, new __());
  6. };
  7. var Subject_1 = require('../Subject');
  8. var tryCatch_1 = require('../util/tryCatch');
  9. var errorObject_1 = require('../util/errorObject');
  10. var OuterSubscriber_1 = require('../OuterSubscriber');
  11. var subscribeToResult_1 = require('../util/subscribeToResult');
  12. /**
  13. * Branch out the source Observable values as a nested Observable using a
  14. * factory function of closing Observables to determine when to start a new
  15. * window.
  16. *
  17. * <span class="informal">It's like {@link bufferWhen}, but emits a nested
  18. * Observable instead of an array.</span>
  19. *
  20. * <img src="./img/windowWhen.png" width="100%">
  21. *
  22. * Returns an Observable that emits windows of items it collects from the source
  23. * Observable. The output Observable emits connected, non-overlapping windows.
  24. * It emits the current window and opens a new one whenever the Observable
  25. * produced by the specified `closingSelector` function emits an item. The first
  26. * window is opened immediately when subscribing to the output Observable.
  27. *
  28. * @example <caption>Emit only the first two clicks events in every window of [1-5] random seconds</caption>
  29. * var clicks = Rx.Observable.fromEvent(document, 'click');
  30. * var result = clicks
  31. * .windowWhen(() => Rx.Observable.interval(1000 + Math.random() * 4000))
  32. * .map(win => win.take(2)) // each window has at most 2 emissions
  33. * .mergeAll(); // flatten the Observable-of-Observables
  34. * result.subscribe(x => console.log(x));
  35. *
  36. * @see {@link window}
  37. * @see {@link windowCount}
  38. * @see {@link windowTime}
  39. * @see {@link windowToggle}
  40. * @see {@link bufferWhen}
  41. *
  42. * @param {function(): Observable} closingSelector A function that takes no
  43. * arguments and returns an Observable that signals (on either `next` or
  44. * `complete`) when to close the previous window and start a new one.
  45. * @return {Observable<Observable<T>>} An observable of windows, which in turn
  46. * are Observables.
  47. * @method windowWhen
  48. * @owner Observable
  49. */
  50. function windowWhen(closingSelector) {
  51. return function windowWhenOperatorFunction(source) {
  52. return source.lift(new WindowOperator(closingSelector));
  53. };
  54. }
  55. exports.windowWhen = windowWhen;
  56. var WindowOperator = (function () {
  57. function WindowOperator(closingSelector) {
  58. this.closingSelector = closingSelector;
  59. }
  60. WindowOperator.prototype.call = function (subscriber, source) {
  61. return source.subscribe(new WindowSubscriber(subscriber, this.closingSelector));
  62. };
  63. return WindowOperator;
  64. }());
  65. /**
  66. * We need this JSDoc comment for affecting ESDoc.
  67. * @ignore
  68. * @extends {Ignored}
  69. */
  70. var WindowSubscriber = (function (_super) {
  71. __extends(WindowSubscriber, _super);
  72. function WindowSubscriber(destination, closingSelector) {
  73. _super.call(this, destination);
  74. this.destination = destination;
  75. this.closingSelector = closingSelector;
  76. this.openWindow();
  77. }
  78. WindowSubscriber.prototype.notifyNext = function (outerValue, innerValue, outerIndex, innerIndex, innerSub) {
  79. this.openWindow(innerSub);
  80. };
  81. WindowSubscriber.prototype.notifyError = function (error, innerSub) {
  82. this._error(error);
  83. };
  84. WindowSubscriber.prototype.notifyComplete = function (innerSub) {
  85. this.openWindow(innerSub);
  86. };
  87. WindowSubscriber.prototype._next = function (value) {
  88. this.window.next(value);
  89. };
  90. WindowSubscriber.prototype._error = function (err) {
  91. this.window.error(err);
  92. this.destination.error(err);
  93. this.unsubscribeClosingNotification();
  94. };
  95. WindowSubscriber.prototype._complete = function () {
  96. this.window.complete();
  97. this.destination.complete();
  98. this.unsubscribeClosingNotification();
  99. };
  100. WindowSubscriber.prototype.unsubscribeClosingNotification = function () {
  101. if (this.closingNotification) {
  102. this.closingNotification.unsubscribe();
  103. }
  104. };
  105. WindowSubscriber.prototype.openWindow = function (innerSub) {
  106. if (innerSub === void 0) { innerSub = null; }
  107. if (innerSub) {
  108. this.remove(innerSub);
  109. innerSub.unsubscribe();
  110. }
  111. var prevWindow = this.window;
  112. if (prevWindow) {
  113. prevWindow.complete();
  114. }
  115. var window = this.window = new Subject_1.Subject();
  116. this.destination.next(window);
  117. var closingNotifier = tryCatch_1.tryCatch(this.closingSelector)();
  118. if (closingNotifier === errorObject_1.errorObject) {
  119. var err = errorObject_1.errorObject.e;
  120. this.destination.error(err);
  121. this.window.error(err);
  122. }
  123. else {
  124. this.add(this.closingNotification = subscribeToResult_1.subscribeToResult(this, closingNotifier));
  125. }
  126. };
  127. return WindowSubscriber;
  128. }(OuterSubscriber_1.OuterSubscriber));
  129. //# sourceMappingURL=windowWhen.js.map