123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170 |
- import { Subject } from '../Subject';
- import { Subscription } from '../Subscription';
- import { tryCatch } from '../util/tryCatch';
- import { errorObject } from '../util/errorObject';
- import { OuterSubscriber } from '../OuterSubscriber';
- import { subscribeToResult } from '../util/subscribeToResult';
- /**
- * Branch out the source Observable values as a nested Observable starting from
- * an emission from `openings` and ending when the output of `closingSelector`
- * emits.
- *
- * <span class="informal">It's like {@link bufferToggle}, but emits a nested
- * Observable instead of an array.</span>
- *
- * <img src="./img/windowToggle.png" width="100%">
- *
- * Returns an Observable that emits windows of items it collects from the source
- * Observable. The output Observable emits windows that contain those items
- * emitted by the source Observable between the time when the `openings`
- * Observable emits an item and when the Observable returned by
- * `closingSelector` emits an item.
- *
- * @example <caption>Every other second, emit the click events from the next 500ms</caption>
- * var clicks = Rx.Observable.fromEvent(document, 'click');
- * var openings = Rx.Observable.interval(1000);
- * var result = clicks.windowToggle(openings, i =>
- * i % 2 ? Rx.Observable.interval(500) : Rx.Observable.empty()
- * ).mergeAll();
- * result.subscribe(x => console.log(x));
- *
- * @see {@link window}
- * @see {@link windowCount}
- * @see {@link windowTime}
- * @see {@link windowWhen}
- * @see {@link bufferToggle}
- *
- * @param {Observable<O>} openings An observable of notifications to start new
- * windows.
- * @param {function(value: O): Observable} closingSelector A function that takes
- * the value emitted by the `openings` observable and returns an Observable,
- * which, when it emits (either `next` or `complete`), signals that the
- * associated window should complete.
- * @return {Observable<Observable<T>>} An observable of windows, which in turn
- * are Observables.
- * @method windowToggle
- * @owner Observable
- */
- export function windowToggle(openings, closingSelector) {
- return (source) => source.lift(new WindowToggleOperator(openings, closingSelector));
- }
- class WindowToggleOperator {
- constructor(openings, closingSelector) {
- this.openings = openings;
- this.closingSelector = closingSelector;
- }
- call(subscriber, source) {
- return source.subscribe(new WindowToggleSubscriber(subscriber, this.openings, this.closingSelector));
- }
- }
- /**
- * We need this JSDoc comment for affecting ESDoc.
- * @ignore
- * @extends {Ignored}
- */
- class WindowToggleSubscriber extends OuterSubscriber {
- constructor(destination, openings, closingSelector) {
- super(destination);
- this.openings = openings;
- this.closingSelector = closingSelector;
- this.contexts = [];
- this.add(this.openSubscription = subscribeToResult(this, openings, openings));
- }
- _next(value) {
- const { contexts } = this;
- if (contexts) {
- const len = contexts.length;
- for (let i = 0; i < len; i++) {
- contexts[i].window.next(value);
- }
- }
- }
- _error(err) {
- const { contexts } = this;
- this.contexts = null;
- if (contexts) {
- const len = contexts.length;
- let index = -1;
- while (++index < len) {
- const context = contexts[index];
- context.window.error(err);
- context.subscription.unsubscribe();
- }
- }
- super._error(err);
- }
- _complete() {
- const { contexts } = this;
- this.contexts = null;
- if (contexts) {
- const len = contexts.length;
- let index = -1;
- while (++index < len) {
- const context = contexts[index];
- context.window.complete();
- context.subscription.unsubscribe();
- }
- }
- super._complete();
- }
- /** @deprecated internal use only */ _unsubscribe() {
- const { contexts } = this;
- this.contexts = null;
- if (contexts) {
- const len = contexts.length;
- let index = -1;
- while (++index < len) {
- const context = contexts[index];
- context.window.unsubscribe();
- context.subscription.unsubscribe();
- }
- }
- }
- notifyNext(outerValue, innerValue, outerIndex, innerIndex, innerSub) {
- if (outerValue === this.openings) {
- const { closingSelector } = this;
- const closingNotifier = tryCatch(closingSelector)(innerValue);
- if (closingNotifier === errorObject) {
- return this.error(errorObject.e);
- }
- else {
- const window = new Subject();
- const subscription = new Subscription();
- const context = { window, subscription };
- this.contexts.push(context);
- const innerSubscription = subscribeToResult(this, closingNotifier, context);
- if (innerSubscription.closed) {
- this.closeWindow(this.contexts.length - 1);
- }
- else {
- innerSubscription.context = context;
- subscription.add(innerSubscription);
- }
- this.destination.next(window);
- }
- }
- else {
- this.closeWindow(this.contexts.indexOf(outerValue));
- }
- }
- notifyError(err) {
- this.error(err);
- }
- notifyComplete(inner) {
- if (inner !== this.openSubscription) {
- this.closeWindow(this.contexts.indexOf(inner.context));
- }
- }
- closeWindow(index) {
- if (index === -1) {
- return;
- }
- const { contexts } = this;
- const context = contexts[index];
- const { window, subscription } = context;
- contexts.splice(index, 1);
- window.complete();
- subscription.unsubscribe();
- }
- }
- //# sourceMappingURL=windowToggle.js.map
|