window.js 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. import { Subject } from '../Subject';
  2. import { OuterSubscriber } from '../OuterSubscriber';
  3. import { subscribeToResult } from '../util/subscribeToResult';
  4. /**
  5. * Branch out the source Observable values as a nested Observable whenever
  6. * `windowBoundaries` emits.
  7. *
  8. * <span class="informal">It's like {@link buffer}, but emits a nested Observable
  9. * instead of an array.</span>
  10. *
  11. * <img src="./img/window.png" width="100%">
  12. *
  13. * Returns an Observable that emits windows of items it collects from the source
  14. * Observable. The output Observable emits connected, non-overlapping
  15. * windows. It emits the current window and opens a new one whenever the
  16. * Observable `windowBoundaries` emits an item. Because each window is an
  17. * Observable, the output is a higher-order Observable.
  18. *
  19. * @example <caption>In every window of 1 second each, emit at most 2 click events</caption>
  20. * var clicks = Rx.Observable.fromEvent(document, 'click');
  21. * var interval = Rx.Observable.interval(1000);
  22. * var result = clicks.window(interval)
  23. * .map(win => win.take(2)) // each window has at most 2 emissions
  24. * .mergeAll(); // flatten the Observable-of-Observables
  25. * result.subscribe(x => console.log(x));
  26. *
  27. * @see {@link windowCount}
  28. * @see {@link windowTime}
  29. * @see {@link windowToggle}
  30. * @see {@link windowWhen}
  31. * @see {@link buffer}
  32. *
  33. * @param {Observable<any>} windowBoundaries An Observable that completes the
  34. * previous window and starts a new window.
  35. * @return {Observable<Observable<T>>} An Observable of windows, which are
  36. * Observables emitting values of the source Observable.
  37. * @method window
  38. * @owner Observable
  39. */
  40. export function window(windowBoundaries) {
  41. return function windowOperatorFunction(source) {
  42. return source.lift(new WindowOperator(windowBoundaries));
  43. };
  44. }
  45. class WindowOperator {
  46. constructor(windowBoundaries) {
  47. this.windowBoundaries = windowBoundaries;
  48. }
  49. call(subscriber, source) {
  50. const windowSubscriber = new WindowSubscriber(subscriber);
  51. const sourceSubscription = source.subscribe(windowSubscriber);
  52. if (!sourceSubscription.closed) {
  53. windowSubscriber.add(subscribeToResult(windowSubscriber, this.windowBoundaries));
  54. }
  55. return sourceSubscription;
  56. }
  57. }
  58. /**
  59. * We need this JSDoc comment for affecting ESDoc.
  60. * @ignore
  61. * @extends {Ignored}
  62. */
  63. class WindowSubscriber extends OuterSubscriber {
  64. constructor(destination) {
  65. super(destination);
  66. this.window = new Subject();
  67. destination.next(this.window);
  68. }
  69. notifyNext(outerValue, innerValue, outerIndex, innerIndex, innerSub) {
  70. this.openWindow();
  71. }
  72. notifyError(error, innerSub) {
  73. this._error(error);
  74. }
  75. notifyComplete(innerSub) {
  76. this._complete();
  77. }
  78. _next(value) {
  79. this.window.next(value);
  80. }
  81. _error(err) {
  82. this.window.error(err);
  83. this.destination.error(err);
  84. }
  85. _complete() {
  86. this.window.complete();
  87. this.destination.complete();
  88. }
  89. /** @deprecated internal use only */ _unsubscribe() {
  90. this.window = null;
  91. }
  92. openWindow() {
  93. const prevWindow = this.window;
  94. if (prevWindow) {
  95. prevWindow.complete();
  96. }
  97. const destination = this.destination;
  98. const newWindow = this.window = new Subject();
  99. destination.next(newWindow);
  100. }
  101. }
  102. //# sourceMappingURL=window.js.map