123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147 |
- import { Subject } from '../Subject';
- import { async } from '../scheduler/async';
- import { Subscriber } from '../Subscriber';
- import { isNumeric } from '../util/isNumeric';
- import { isScheduler } from '../util/isScheduler';
- export function windowTime(windowTimeSpan) {
- let scheduler = async;
- let windowCreationInterval = null;
- let maxWindowSize = Number.POSITIVE_INFINITY;
- if (isScheduler(arguments[3])) {
- scheduler = arguments[3];
- }
- if (isScheduler(arguments[2])) {
- scheduler = arguments[2];
- }
- else if (isNumeric(arguments[2])) {
- maxWindowSize = arguments[2];
- }
- if (isScheduler(arguments[1])) {
- scheduler = arguments[1];
- }
- else if (isNumeric(arguments[1])) {
- windowCreationInterval = arguments[1];
- }
- return function windowTimeOperatorFunction(source) {
- return source.lift(new WindowTimeOperator(windowTimeSpan, windowCreationInterval, maxWindowSize, scheduler));
- };
- }
- class WindowTimeOperator {
- constructor(windowTimeSpan, windowCreationInterval, maxWindowSize, scheduler) {
- this.windowTimeSpan = windowTimeSpan;
- this.windowCreationInterval = windowCreationInterval;
- this.maxWindowSize = maxWindowSize;
- this.scheduler = scheduler;
- }
- call(subscriber, source) {
- return source.subscribe(new WindowTimeSubscriber(subscriber, this.windowTimeSpan, this.windowCreationInterval, this.maxWindowSize, this.scheduler));
- }
- }
- class CountedSubject extends Subject {
- constructor() {
- super(...arguments);
- this._numberOfNextedValues = 0;
- }
- next(value) {
- this._numberOfNextedValues++;
- super.next(value);
- }
- get numberOfNextedValues() {
- return this._numberOfNextedValues;
- }
- }
- /**
- * We need this JSDoc comment for affecting ESDoc.
- * @ignore
- * @extends {Ignored}
- */
- class WindowTimeSubscriber extends Subscriber {
- constructor(destination, windowTimeSpan, windowCreationInterval, maxWindowSize, scheduler) {
- super(destination);
- this.destination = destination;
- this.windowTimeSpan = windowTimeSpan;
- this.windowCreationInterval = windowCreationInterval;
- this.maxWindowSize = maxWindowSize;
- this.scheduler = scheduler;
- this.windows = [];
- const window = this.openWindow();
- if (windowCreationInterval !== null && windowCreationInterval >= 0) {
- const closeState = { subscriber: this, window, context: null };
- const creationState = { windowTimeSpan, windowCreationInterval, subscriber: this, scheduler };
- this.add(scheduler.schedule(dispatchWindowClose, windowTimeSpan, closeState));
- this.add(scheduler.schedule(dispatchWindowCreation, windowCreationInterval, creationState));
- }
- else {
- const timeSpanOnlyState = { subscriber: this, window, windowTimeSpan };
- this.add(scheduler.schedule(dispatchWindowTimeSpanOnly, windowTimeSpan, timeSpanOnlyState));
- }
- }
- _next(value) {
- const windows = this.windows;
- const len = windows.length;
- for (let i = 0; i < len; i++) {
- const window = windows[i];
- if (!window.closed) {
- window.next(value);
- if (window.numberOfNextedValues >= this.maxWindowSize) {
- this.closeWindow(window);
- }
- }
- }
- }
- _error(err) {
- const windows = this.windows;
- while (windows.length > 0) {
- windows.shift().error(err);
- }
- this.destination.error(err);
- }
- _complete() {
- const windows = this.windows;
- while (windows.length > 0) {
- const window = windows.shift();
- if (!window.closed) {
- window.complete();
- }
- }
- this.destination.complete();
- }
- openWindow() {
- const window = new CountedSubject();
- this.windows.push(window);
- const destination = this.destination;
- destination.next(window);
- return window;
- }
- closeWindow(window) {
- window.complete();
- const windows = this.windows;
- windows.splice(windows.indexOf(window), 1);
- }
- }
- function dispatchWindowTimeSpanOnly(state) {
- const { subscriber, windowTimeSpan, window } = state;
- if (window) {
- subscriber.closeWindow(window);
- }
- state.window = subscriber.openWindow();
- this.schedule(state, windowTimeSpan);
- }
- function dispatchWindowCreation(state) {
- const { windowTimeSpan, subscriber, scheduler, windowCreationInterval } = state;
- const window = subscriber.openWindow();
- const action = this;
- let context = { action, subscription: null };
- const timeSpanState = { subscriber, window, context };
- context.subscription = scheduler.schedule(dispatchWindowClose, windowTimeSpan, timeSpanState);
- action.add(context.subscription);
- action.schedule(state, windowCreationInterval);
- }
- function dispatchWindowClose(state) {
- const { subscriber, window, context } = state;
- if (context && context.action && context.subscription) {
- context.action.remove(context.subscription);
- }
- subscriber.closeWindow(window);
- }
- //# sourceMappingURL=windowTime.js.map
|