windowToggle.js 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. import { Subject } from '../Subject';
  2. import { Subscription } from '../Subscription';
  3. import { tryCatch } from '../util/tryCatch';
  4. import { errorObject } from '../util/errorObject';
  5. import { OuterSubscriber } from '../OuterSubscriber';
  6. import { subscribeToResult } from '../util/subscribeToResult';
  7. /**
  8. * Branch out the source Observable values as a nested Observable starting from
  9. * an emission from `openings` and ending when the output of `closingSelector`
  10. * emits.
  11. *
  12. * <span class="informal">It's like {@link bufferToggle}, but emits a nested
  13. * Observable instead of an array.</span>
  14. *
  15. * <img src="./img/windowToggle.png" width="100%">
  16. *
  17. * Returns an Observable that emits windows of items it collects from the source
  18. * Observable. The output Observable emits windows that contain those items
  19. * emitted by the source Observable between the time when the `openings`
  20. * Observable emits an item and when the Observable returned by
  21. * `closingSelector` emits an item.
  22. *
  23. * @example <caption>Every other second, emit the click events from the next 500ms</caption>
  24. * var clicks = Rx.Observable.fromEvent(document, 'click');
  25. * var openings = Rx.Observable.interval(1000);
  26. * var result = clicks.windowToggle(openings, i =>
  27. * i % 2 ? Rx.Observable.interval(500) : Rx.Observable.empty()
  28. * ).mergeAll();
  29. * result.subscribe(x => console.log(x));
  30. *
  31. * @see {@link window}
  32. * @see {@link windowCount}
  33. * @see {@link windowTime}
  34. * @see {@link windowWhen}
  35. * @see {@link bufferToggle}
  36. *
  37. * @param {Observable<O>} openings An observable of notifications to start new
  38. * windows.
  39. * @param {function(value: O): Observable} closingSelector A function that takes
  40. * the value emitted by the `openings` observable and returns an Observable,
  41. * which, when it emits (either `next` or `complete`), signals that the
  42. * associated window should complete.
  43. * @return {Observable<Observable<T>>} An observable of windows, which in turn
  44. * are Observables.
  45. * @method windowToggle
  46. * @owner Observable
  47. */
  48. export function windowToggle(openings, closingSelector) {
  49. return (source) => source.lift(new WindowToggleOperator(openings, closingSelector));
  50. }
  51. class WindowToggleOperator {
  52. constructor(openings, closingSelector) {
  53. this.openings = openings;
  54. this.closingSelector = closingSelector;
  55. }
  56. call(subscriber, source) {
  57. return source.subscribe(new WindowToggleSubscriber(subscriber, this.openings, this.closingSelector));
  58. }
  59. }
  60. /**
  61. * We need this JSDoc comment for affecting ESDoc.
  62. * @ignore
  63. * @extends {Ignored}
  64. */
  65. class WindowToggleSubscriber extends OuterSubscriber {
  66. constructor(destination, openings, closingSelector) {
  67. super(destination);
  68. this.openings = openings;
  69. this.closingSelector = closingSelector;
  70. this.contexts = [];
  71. this.add(this.openSubscription = subscribeToResult(this, openings, openings));
  72. }
  73. _next(value) {
  74. const { contexts } = this;
  75. if (contexts) {
  76. const len = contexts.length;
  77. for (let i = 0; i < len; i++) {
  78. contexts[i].window.next(value);
  79. }
  80. }
  81. }
  82. _error(err) {
  83. const { contexts } = this;
  84. this.contexts = null;
  85. if (contexts) {
  86. const len = contexts.length;
  87. let index = -1;
  88. while (++index < len) {
  89. const context = contexts[index];
  90. context.window.error(err);
  91. context.subscription.unsubscribe();
  92. }
  93. }
  94. super._error(err);
  95. }
  96. _complete() {
  97. const { contexts } = this;
  98. this.contexts = null;
  99. if (contexts) {
  100. const len = contexts.length;
  101. let index = -1;
  102. while (++index < len) {
  103. const context = contexts[index];
  104. context.window.complete();
  105. context.subscription.unsubscribe();
  106. }
  107. }
  108. super._complete();
  109. }
  110. /** @deprecated internal use only */ _unsubscribe() {
  111. const { contexts } = this;
  112. this.contexts = null;
  113. if (contexts) {
  114. const len = contexts.length;
  115. let index = -1;
  116. while (++index < len) {
  117. const context = contexts[index];
  118. context.window.unsubscribe();
  119. context.subscription.unsubscribe();
  120. }
  121. }
  122. }
  123. notifyNext(outerValue, innerValue, outerIndex, innerIndex, innerSub) {
  124. if (outerValue === this.openings) {
  125. const { closingSelector } = this;
  126. const closingNotifier = tryCatch(closingSelector)(innerValue);
  127. if (closingNotifier === errorObject) {
  128. return this.error(errorObject.e);
  129. }
  130. else {
  131. const window = new Subject();
  132. const subscription = new Subscription();
  133. const context = { window, subscription };
  134. this.contexts.push(context);
  135. const innerSubscription = subscribeToResult(this, closingNotifier, context);
  136. if (innerSubscription.closed) {
  137. this.closeWindow(this.contexts.length - 1);
  138. }
  139. else {
  140. innerSubscription.context = context;
  141. subscription.add(innerSubscription);
  142. }
  143. this.destination.next(window);
  144. }
  145. }
  146. else {
  147. this.closeWindow(this.contexts.indexOf(outerValue));
  148. }
  149. }
  150. notifyError(err) {
  151. this.error(err);
  152. }
  153. notifyComplete(inner) {
  154. if (inner !== this.openSubscription) {
  155. this.closeWindow(this.contexts.indexOf(inner.context));
  156. }
  157. }
  158. closeWindow(index) {
  159. if (index === -1) {
  160. return;
  161. }
  162. const { contexts } = this;
  163. const context = contexts[index];
  164. const { window, subscription } = context;
  165. contexts.splice(index, 1);
  166. window.complete();
  167. subscription.unsubscribe();
  168. }
  169. }
  170. //# sourceMappingURL=windowToggle.js.map