bufferTime.js 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. /** PURE_IMPORTS_START .._scheduler_async,.._Subscriber,.._util_isScheduler PURE_IMPORTS_END */
  2. var __extends = (this && this.__extends) || function (d, b) {
  3. for (var p in b)
  4. if (b.hasOwnProperty(p))
  5. d[p] = b[p];
  6. function __() { this.constructor = d; }
  7. d.prototype = b === null ? Object.create(b) : (__.prototype = b.prototype, new __());
  8. };
  9. import { async } from '../scheduler/async';
  10. import { Subscriber } from '../Subscriber';
  11. import { isScheduler } from '../util/isScheduler';
  12. /* tslint:enable:max-line-length */
  13. /**
  14. * Buffers the source Observable values for a specific time period.
  15. *
  16. * <span class="informal">Collects values from the past as an array, and emits
  17. * those arrays periodically in time.</span>
  18. *
  19. * <img src="./img/bufferTime.png" width="100%">
  20. *
  21. * Buffers values from the source for a specific time duration `bufferTimeSpan`.
  22. * Unless the optional argument `bufferCreationInterval` is given, it emits and
  23. * resets the buffer every `bufferTimeSpan` milliseconds. If
  24. * `bufferCreationInterval` is given, this operator opens the buffer every
  25. * `bufferCreationInterval` milliseconds and closes (emits and resets) the
  26. * buffer every `bufferTimeSpan` milliseconds. When the optional argument
  27. * `maxBufferSize` is specified, the buffer will be closed either after
  28. * `bufferTimeSpan` milliseconds or when it contains `maxBufferSize` elements.
  29. *
  30. * @example <caption>Every second, emit an array of the recent click events</caption>
  31. * var clicks = Rx.Observable.fromEvent(document, 'click');
  32. * var buffered = clicks.bufferTime(1000);
  33. * buffered.subscribe(x => console.log(x));
  34. *
  35. * @example <caption>Every 5 seconds, emit the click events from the next 2 seconds</caption>
  36. * var clicks = Rx.Observable.fromEvent(document, 'click');
  37. * var buffered = clicks.bufferTime(2000, 5000);
  38. * buffered.subscribe(x => console.log(x));
  39. *
  40. * @see {@link buffer}
  41. * @see {@link bufferCount}
  42. * @see {@link bufferToggle}
  43. * @see {@link bufferWhen}
  44. * @see {@link windowTime}
  45. *
  46. * @param {number} bufferTimeSpan The amount of time to fill each buffer array.
  47. * @param {number} [bufferCreationInterval] The interval at which to start new
  48. * buffers.
  49. * @param {number} [maxBufferSize] The maximum buffer size.
  50. * @param {Scheduler} [scheduler=async] The scheduler on which to schedule the
  51. * intervals that determine buffer boundaries.
  52. * @return {Observable<T[]>} An observable of arrays of buffered values.
  53. * @method bufferTime
  54. * @owner Observable
  55. */
  56. export function bufferTime(bufferTimeSpan) {
  57. var length = arguments.length;
  58. var scheduler = async;
  59. if (isScheduler(arguments[arguments.length - 1])) {
  60. scheduler = arguments[arguments.length - 1];
  61. length--;
  62. }
  63. var bufferCreationInterval = null;
  64. if (length >= 2) {
  65. bufferCreationInterval = arguments[1];
  66. }
  67. var maxBufferSize = Number.POSITIVE_INFINITY;
  68. if (length >= 3) {
  69. maxBufferSize = arguments[2];
  70. }
  71. return function bufferTimeOperatorFunction(source) {
  72. return source.lift(new BufferTimeOperator(bufferTimeSpan, bufferCreationInterval, maxBufferSize, scheduler));
  73. };
  74. }
  75. var BufferTimeOperator = /*@__PURE__*/ (/*@__PURE__*/ function () {
  76. function BufferTimeOperator(bufferTimeSpan, bufferCreationInterval, maxBufferSize, scheduler) {
  77. this.bufferTimeSpan = bufferTimeSpan;
  78. this.bufferCreationInterval = bufferCreationInterval;
  79. this.maxBufferSize = maxBufferSize;
  80. this.scheduler = scheduler;
  81. }
  82. BufferTimeOperator.prototype.call = function (subscriber, source) {
  83. return source.subscribe(new BufferTimeSubscriber(subscriber, this.bufferTimeSpan, this.bufferCreationInterval, this.maxBufferSize, this.scheduler));
  84. };
  85. return BufferTimeOperator;
  86. }());
  87. var Context = /*@__PURE__*/ (/*@__PURE__*/ function () {
  88. function Context() {
  89. this.buffer = [];
  90. }
  91. return Context;
  92. }());
  93. /**
  94. * We need this JSDoc comment for affecting ESDoc.
  95. * @ignore
  96. * @extends {Ignored}
  97. */
  98. var BufferTimeSubscriber = /*@__PURE__*/ (/*@__PURE__*/ function (_super) {
  99. __extends(BufferTimeSubscriber, _super);
  100. function BufferTimeSubscriber(destination, bufferTimeSpan, bufferCreationInterval, maxBufferSize, scheduler) {
  101. _super.call(this, destination);
  102. this.bufferTimeSpan = bufferTimeSpan;
  103. this.bufferCreationInterval = bufferCreationInterval;
  104. this.maxBufferSize = maxBufferSize;
  105. this.scheduler = scheduler;
  106. this.contexts = [];
  107. var context = this.openContext();
  108. this.timespanOnly = bufferCreationInterval == null || bufferCreationInterval < 0;
  109. if (this.timespanOnly) {
  110. var timeSpanOnlyState = { subscriber: this, context: context, bufferTimeSpan: bufferTimeSpan };
  111. this.add(context.closeAction = scheduler.schedule(dispatchBufferTimeSpanOnly, bufferTimeSpan, timeSpanOnlyState));
  112. }
  113. else {
  114. var closeState = { subscriber: this, context: context };
  115. var creationState = { bufferTimeSpan: bufferTimeSpan, bufferCreationInterval: bufferCreationInterval, subscriber: this, scheduler: scheduler };
  116. this.add(context.closeAction = scheduler.schedule(dispatchBufferClose, bufferTimeSpan, closeState));
  117. this.add(scheduler.schedule(dispatchBufferCreation, bufferCreationInterval, creationState));
  118. }
  119. }
  120. BufferTimeSubscriber.prototype._next = function (value) {
  121. var contexts = this.contexts;
  122. var len = contexts.length;
  123. var filledBufferContext;
  124. for (var i = 0; i < len; i++) {
  125. var context = contexts[i];
  126. var buffer = context.buffer;
  127. buffer.push(value);
  128. if (buffer.length == this.maxBufferSize) {
  129. filledBufferContext = context;
  130. }
  131. }
  132. if (filledBufferContext) {
  133. this.onBufferFull(filledBufferContext);
  134. }
  135. };
  136. BufferTimeSubscriber.prototype._error = function (err) {
  137. this.contexts.length = 0;
  138. _super.prototype._error.call(this, err);
  139. };
  140. BufferTimeSubscriber.prototype._complete = function () {
  141. var _a = this, contexts = _a.contexts, destination = _a.destination;
  142. while (contexts.length > 0) {
  143. var context = contexts.shift();
  144. destination.next(context.buffer);
  145. }
  146. _super.prototype._complete.call(this);
  147. };
  148. /** @deprecated internal use only */ BufferTimeSubscriber.prototype._unsubscribe = function () {
  149. this.contexts = null;
  150. };
  151. BufferTimeSubscriber.prototype.onBufferFull = function (context) {
  152. this.closeContext(context);
  153. var closeAction = context.closeAction;
  154. closeAction.unsubscribe();
  155. this.remove(closeAction);
  156. if (!this.closed && this.timespanOnly) {
  157. context = this.openContext();
  158. var bufferTimeSpan = this.bufferTimeSpan;
  159. var timeSpanOnlyState = { subscriber: this, context: context, bufferTimeSpan: bufferTimeSpan };
  160. this.add(context.closeAction = this.scheduler.schedule(dispatchBufferTimeSpanOnly, bufferTimeSpan, timeSpanOnlyState));
  161. }
  162. };
  163. BufferTimeSubscriber.prototype.openContext = function () {
  164. var context = new Context();
  165. this.contexts.push(context);
  166. return context;
  167. };
  168. BufferTimeSubscriber.prototype.closeContext = function (context) {
  169. this.destination.next(context.buffer);
  170. var contexts = this.contexts;
  171. var spliceIndex = contexts ? contexts.indexOf(context) : -1;
  172. if (spliceIndex >= 0) {
  173. contexts.splice(contexts.indexOf(context), 1);
  174. }
  175. };
  176. return BufferTimeSubscriber;
  177. }(Subscriber));
  178. function dispatchBufferTimeSpanOnly(state) {
  179. var subscriber = state.subscriber;
  180. var prevContext = state.context;
  181. if (prevContext) {
  182. subscriber.closeContext(prevContext);
  183. }
  184. if (!subscriber.closed) {
  185. state.context = subscriber.openContext();
  186. state.context.closeAction = this.schedule(state, state.bufferTimeSpan);
  187. }
  188. }
  189. function dispatchBufferCreation(state) {
  190. var bufferCreationInterval = state.bufferCreationInterval, bufferTimeSpan = state.bufferTimeSpan, subscriber = state.subscriber, scheduler = state.scheduler;
  191. var context = subscriber.openContext();
  192. var action = this;
  193. if (!subscriber.closed) {
  194. subscriber.add(context.closeAction = scheduler.schedule(dispatchBufferClose, bufferTimeSpan, { subscriber: subscriber, context: context }));
  195. action.schedule(state, bufferCreationInterval);
  196. }
  197. }
  198. function dispatchBufferClose(arg) {
  199. var subscriber = arg.subscriber, context = arg.context;
  200. subscriber.closeContext(context);
  201. }
  202. //# sourceMappingURL=bufferTime.js.map