bufferTime.js 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. import { async } from '../scheduler/async';
  2. import { Subscriber } from '../Subscriber';
  3. import { isScheduler } from '../util/isScheduler';
  4. /* tslint:enable:max-line-length */
  5. /**
  6. * Buffers the source Observable values for a specific time period.
  7. *
  8. * <span class="informal">Collects values from the past as an array, and emits
  9. * those arrays periodically in time.</span>
  10. *
  11. * <img src="./img/bufferTime.png" width="100%">
  12. *
  13. * Buffers values from the source for a specific time duration `bufferTimeSpan`.
  14. * Unless the optional argument `bufferCreationInterval` is given, it emits and
  15. * resets the buffer every `bufferTimeSpan` milliseconds. If
  16. * `bufferCreationInterval` is given, this operator opens the buffer every
  17. * `bufferCreationInterval` milliseconds and closes (emits and resets) the
  18. * buffer every `bufferTimeSpan` milliseconds. When the optional argument
  19. * `maxBufferSize` is specified, the buffer will be closed either after
  20. * `bufferTimeSpan` milliseconds or when it contains `maxBufferSize` elements.
  21. *
  22. * @example <caption>Every second, emit an array of the recent click events</caption>
  23. * var clicks = Rx.Observable.fromEvent(document, 'click');
  24. * var buffered = clicks.bufferTime(1000);
  25. * buffered.subscribe(x => console.log(x));
  26. *
  27. * @example <caption>Every 5 seconds, emit the click events from the next 2 seconds</caption>
  28. * var clicks = Rx.Observable.fromEvent(document, 'click');
  29. * var buffered = clicks.bufferTime(2000, 5000);
  30. * buffered.subscribe(x => console.log(x));
  31. *
  32. * @see {@link buffer}
  33. * @see {@link bufferCount}
  34. * @see {@link bufferToggle}
  35. * @see {@link bufferWhen}
  36. * @see {@link windowTime}
  37. *
  38. * @param {number} bufferTimeSpan The amount of time to fill each buffer array.
  39. * @param {number} [bufferCreationInterval] The interval at which to start new
  40. * buffers.
  41. * @param {number} [maxBufferSize] The maximum buffer size.
  42. * @param {Scheduler} [scheduler=async] The scheduler on which to schedule the
  43. * intervals that determine buffer boundaries.
  44. * @return {Observable<T[]>} An observable of arrays of buffered values.
  45. * @method bufferTime
  46. * @owner Observable
  47. */
  48. export function bufferTime(bufferTimeSpan) {
  49. let length = arguments.length;
  50. let scheduler = async;
  51. if (isScheduler(arguments[arguments.length - 1])) {
  52. scheduler = arguments[arguments.length - 1];
  53. length--;
  54. }
  55. let bufferCreationInterval = null;
  56. if (length >= 2) {
  57. bufferCreationInterval = arguments[1];
  58. }
  59. let maxBufferSize = Number.POSITIVE_INFINITY;
  60. if (length >= 3) {
  61. maxBufferSize = arguments[2];
  62. }
  63. return function bufferTimeOperatorFunction(source) {
  64. return source.lift(new BufferTimeOperator(bufferTimeSpan, bufferCreationInterval, maxBufferSize, scheduler));
  65. };
  66. }
  67. class BufferTimeOperator {
  68. constructor(bufferTimeSpan, bufferCreationInterval, maxBufferSize, scheduler) {
  69. this.bufferTimeSpan = bufferTimeSpan;
  70. this.bufferCreationInterval = bufferCreationInterval;
  71. this.maxBufferSize = maxBufferSize;
  72. this.scheduler = scheduler;
  73. }
  74. call(subscriber, source) {
  75. return source.subscribe(new BufferTimeSubscriber(subscriber, this.bufferTimeSpan, this.bufferCreationInterval, this.maxBufferSize, this.scheduler));
  76. }
  77. }
  78. class Context {
  79. constructor() {
  80. this.buffer = [];
  81. }
  82. }
  83. /**
  84. * We need this JSDoc comment for affecting ESDoc.
  85. * @ignore
  86. * @extends {Ignored}
  87. */
  88. class BufferTimeSubscriber extends Subscriber {
  89. constructor(destination, bufferTimeSpan, bufferCreationInterval, maxBufferSize, scheduler) {
  90. super(destination);
  91. this.bufferTimeSpan = bufferTimeSpan;
  92. this.bufferCreationInterval = bufferCreationInterval;
  93. this.maxBufferSize = maxBufferSize;
  94. this.scheduler = scheduler;
  95. this.contexts = [];
  96. const context = this.openContext();
  97. this.timespanOnly = bufferCreationInterval == null || bufferCreationInterval < 0;
  98. if (this.timespanOnly) {
  99. const timeSpanOnlyState = { subscriber: this, context, bufferTimeSpan };
  100. this.add(context.closeAction = scheduler.schedule(dispatchBufferTimeSpanOnly, bufferTimeSpan, timeSpanOnlyState));
  101. }
  102. else {
  103. const closeState = { subscriber: this, context };
  104. const creationState = { bufferTimeSpan, bufferCreationInterval, subscriber: this, scheduler };
  105. this.add(context.closeAction = scheduler.schedule(dispatchBufferClose, bufferTimeSpan, closeState));
  106. this.add(scheduler.schedule(dispatchBufferCreation, bufferCreationInterval, creationState));
  107. }
  108. }
  109. _next(value) {
  110. const contexts = this.contexts;
  111. const len = contexts.length;
  112. let filledBufferContext;
  113. for (let i = 0; i < len; i++) {
  114. const context = contexts[i];
  115. const buffer = context.buffer;
  116. buffer.push(value);
  117. if (buffer.length == this.maxBufferSize) {
  118. filledBufferContext = context;
  119. }
  120. }
  121. if (filledBufferContext) {
  122. this.onBufferFull(filledBufferContext);
  123. }
  124. }
  125. _error(err) {
  126. this.contexts.length = 0;
  127. super._error(err);
  128. }
  129. _complete() {
  130. const { contexts, destination } = this;
  131. while (contexts.length > 0) {
  132. const context = contexts.shift();
  133. destination.next(context.buffer);
  134. }
  135. super._complete();
  136. }
  137. /** @deprecated internal use only */ _unsubscribe() {
  138. this.contexts = null;
  139. }
  140. onBufferFull(context) {
  141. this.closeContext(context);
  142. const closeAction = context.closeAction;
  143. closeAction.unsubscribe();
  144. this.remove(closeAction);
  145. if (!this.closed && this.timespanOnly) {
  146. context = this.openContext();
  147. const bufferTimeSpan = this.bufferTimeSpan;
  148. const timeSpanOnlyState = { subscriber: this, context, bufferTimeSpan };
  149. this.add(context.closeAction = this.scheduler.schedule(dispatchBufferTimeSpanOnly, bufferTimeSpan, timeSpanOnlyState));
  150. }
  151. }
  152. openContext() {
  153. const context = new Context();
  154. this.contexts.push(context);
  155. return context;
  156. }
  157. closeContext(context) {
  158. this.destination.next(context.buffer);
  159. const contexts = this.contexts;
  160. const spliceIndex = contexts ? contexts.indexOf(context) : -1;
  161. if (spliceIndex >= 0) {
  162. contexts.splice(contexts.indexOf(context), 1);
  163. }
  164. }
  165. }
  166. function dispatchBufferTimeSpanOnly(state) {
  167. const subscriber = state.subscriber;
  168. const prevContext = state.context;
  169. if (prevContext) {
  170. subscriber.closeContext(prevContext);
  171. }
  172. if (!subscriber.closed) {
  173. state.context = subscriber.openContext();
  174. state.context.closeAction = this.schedule(state, state.bufferTimeSpan);
  175. }
  176. }
  177. function dispatchBufferCreation(state) {
  178. const { bufferCreationInterval, bufferTimeSpan, subscriber, scheduler } = state;
  179. const context = subscriber.openContext();
  180. const action = this;
  181. if (!subscriber.closed) {
  182. subscriber.add(context.closeAction = scheduler.schedule(dispatchBufferClose, bufferTimeSpan, { subscriber, context }));
  183. action.schedule(state, bufferCreationInterval);
  184. }
  185. }
  186. function dispatchBufferClose(arg) {
  187. const { subscriber, context } = arg;
  188. subscriber.closeContext(context);
  189. }
  190. //# sourceMappingURL=bufferTime.js.map