bufferTime.js 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. "use strict";
  2. var __extends = (this && this.__extends) || function (d, b) {
  3. for (var p in b) if (b.hasOwnProperty(p)) d[p] = b[p];
  4. function __() { this.constructor = d; }
  5. d.prototype = b === null ? Object.create(b) : (__.prototype = b.prototype, new __());
  6. };
  7. var async_1 = require('../scheduler/async');
  8. var Subscriber_1 = require('../Subscriber');
  9. var isScheduler_1 = require('../util/isScheduler');
  10. /* tslint:enable:max-line-length */
  11. /**
  12. * Buffers the source Observable values for a specific time period.
  13. *
  14. * <span class="informal">Collects values from the past as an array, and emits
  15. * those arrays periodically in time.</span>
  16. *
  17. * <img src="./img/bufferTime.png" width="100%">
  18. *
  19. * Buffers values from the source for a specific time duration `bufferTimeSpan`.
  20. * Unless the optional argument `bufferCreationInterval` is given, it emits and
  21. * resets the buffer every `bufferTimeSpan` milliseconds. If
  22. * `bufferCreationInterval` is given, this operator opens the buffer every
  23. * `bufferCreationInterval` milliseconds and closes (emits and resets) the
  24. * buffer every `bufferTimeSpan` milliseconds. When the optional argument
  25. * `maxBufferSize` is specified, the buffer will be closed either after
  26. * `bufferTimeSpan` milliseconds or when it contains `maxBufferSize` elements.
  27. *
  28. * @example <caption>Every second, emit an array of the recent click events</caption>
  29. * var clicks = Rx.Observable.fromEvent(document, 'click');
  30. * var buffered = clicks.bufferTime(1000);
  31. * buffered.subscribe(x => console.log(x));
  32. *
  33. * @example <caption>Every 5 seconds, emit the click events from the next 2 seconds</caption>
  34. * var clicks = Rx.Observable.fromEvent(document, 'click');
  35. * var buffered = clicks.bufferTime(2000, 5000);
  36. * buffered.subscribe(x => console.log(x));
  37. *
  38. * @see {@link buffer}
  39. * @see {@link bufferCount}
  40. * @see {@link bufferToggle}
  41. * @see {@link bufferWhen}
  42. * @see {@link windowTime}
  43. *
  44. * @param {number} bufferTimeSpan The amount of time to fill each buffer array.
  45. * @param {number} [bufferCreationInterval] The interval at which to start new
  46. * buffers.
  47. * @param {number} [maxBufferSize] The maximum buffer size.
  48. * @param {Scheduler} [scheduler=async] The scheduler on which to schedule the
  49. * intervals that determine buffer boundaries.
  50. * @return {Observable<T[]>} An observable of arrays of buffered values.
  51. * @method bufferTime
  52. * @owner Observable
  53. */
  54. function bufferTime(bufferTimeSpan) {
  55. var length = arguments.length;
  56. var scheduler = async_1.async;
  57. if (isScheduler_1.isScheduler(arguments[arguments.length - 1])) {
  58. scheduler = arguments[arguments.length - 1];
  59. length--;
  60. }
  61. var bufferCreationInterval = null;
  62. if (length >= 2) {
  63. bufferCreationInterval = arguments[1];
  64. }
  65. var maxBufferSize = Number.POSITIVE_INFINITY;
  66. if (length >= 3) {
  67. maxBufferSize = arguments[2];
  68. }
  69. return function bufferTimeOperatorFunction(source) {
  70. return source.lift(new BufferTimeOperator(bufferTimeSpan, bufferCreationInterval, maxBufferSize, scheduler));
  71. };
  72. }
  73. exports.bufferTime = bufferTime;
  74. var BufferTimeOperator = (function () {
  75. function BufferTimeOperator(bufferTimeSpan, bufferCreationInterval, maxBufferSize, scheduler) {
  76. this.bufferTimeSpan = bufferTimeSpan;
  77. this.bufferCreationInterval = bufferCreationInterval;
  78. this.maxBufferSize = maxBufferSize;
  79. this.scheduler = scheduler;
  80. }
  81. BufferTimeOperator.prototype.call = function (subscriber, source) {
  82. return source.subscribe(new BufferTimeSubscriber(subscriber, this.bufferTimeSpan, this.bufferCreationInterval, this.maxBufferSize, this.scheduler));
  83. };
  84. return BufferTimeOperator;
  85. }());
  86. var Context = (function () {
  87. function Context() {
  88. this.buffer = [];
  89. }
  90. return Context;
  91. }());
  92. /**
  93. * We need this JSDoc comment for affecting ESDoc.
  94. * @ignore
  95. * @extends {Ignored}
  96. */
  97. var BufferTimeSubscriber = (function (_super) {
  98. __extends(BufferTimeSubscriber, _super);
  99. function BufferTimeSubscriber(destination, bufferTimeSpan, bufferCreationInterval, maxBufferSize, scheduler) {
  100. _super.call(this, destination);
  101. this.bufferTimeSpan = bufferTimeSpan;
  102. this.bufferCreationInterval = bufferCreationInterval;
  103. this.maxBufferSize = maxBufferSize;
  104. this.scheduler = scheduler;
  105. this.contexts = [];
  106. var context = this.openContext();
  107. this.timespanOnly = bufferCreationInterval == null || bufferCreationInterval < 0;
  108. if (this.timespanOnly) {
  109. var timeSpanOnlyState = { subscriber: this, context: context, bufferTimeSpan: bufferTimeSpan };
  110. this.add(context.closeAction = scheduler.schedule(dispatchBufferTimeSpanOnly, bufferTimeSpan, timeSpanOnlyState));
  111. }
  112. else {
  113. var closeState = { subscriber: this, context: context };
  114. var creationState = { bufferTimeSpan: bufferTimeSpan, bufferCreationInterval: bufferCreationInterval, subscriber: this, scheduler: scheduler };
  115. this.add(context.closeAction = scheduler.schedule(dispatchBufferClose, bufferTimeSpan, closeState));
  116. this.add(scheduler.schedule(dispatchBufferCreation, bufferCreationInterval, creationState));
  117. }
  118. }
  119. BufferTimeSubscriber.prototype._next = function (value) {
  120. var contexts = this.contexts;
  121. var len = contexts.length;
  122. var filledBufferContext;
  123. for (var i = 0; i < len; i++) {
  124. var context = contexts[i];
  125. var buffer = context.buffer;
  126. buffer.push(value);
  127. if (buffer.length == this.maxBufferSize) {
  128. filledBufferContext = context;
  129. }
  130. }
  131. if (filledBufferContext) {
  132. this.onBufferFull(filledBufferContext);
  133. }
  134. };
  135. BufferTimeSubscriber.prototype._error = function (err) {
  136. this.contexts.length = 0;
  137. _super.prototype._error.call(this, err);
  138. };
  139. BufferTimeSubscriber.prototype._complete = function () {
  140. var _a = this, contexts = _a.contexts, destination = _a.destination;
  141. while (contexts.length > 0) {
  142. var context = contexts.shift();
  143. destination.next(context.buffer);
  144. }
  145. _super.prototype._complete.call(this);
  146. };
  147. /** @deprecated internal use only */ BufferTimeSubscriber.prototype._unsubscribe = function () {
  148. this.contexts = null;
  149. };
  150. BufferTimeSubscriber.prototype.onBufferFull = function (context) {
  151. this.closeContext(context);
  152. var closeAction = context.closeAction;
  153. closeAction.unsubscribe();
  154. this.remove(closeAction);
  155. if (!this.closed && this.timespanOnly) {
  156. context = this.openContext();
  157. var bufferTimeSpan = this.bufferTimeSpan;
  158. var timeSpanOnlyState = { subscriber: this, context: context, bufferTimeSpan: bufferTimeSpan };
  159. this.add(context.closeAction = this.scheduler.schedule(dispatchBufferTimeSpanOnly, bufferTimeSpan, timeSpanOnlyState));
  160. }
  161. };
  162. BufferTimeSubscriber.prototype.openContext = function () {
  163. var context = new Context();
  164. this.contexts.push(context);
  165. return context;
  166. };
  167. BufferTimeSubscriber.prototype.closeContext = function (context) {
  168. this.destination.next(context.buffer);
  169. var contexts = this.contexts;
  170. var spliceIndex = contexts ? contexts.indexOf(context) : -1;
  171. if (spliceIndex >= 0) {
  172. contexts.splice(contexts.indexOf(context), 1);
  173. }
  174. };
  175. return BufferTimeSubscriber;
  176. }(Subscriber_1.Subscriber));
  177. function dispatchBufferTimeSpanOnly(state) {
  178. var subscriber = state.subscriber;
  179. var prevContext = state.context;
  180. if (prevContext) {
  181. subscriber.closeContext(prevContext);
  182. }
  183. if (!subscriber.closed) {
  184. state.context = subscriber.openContext();
  185. state.context.closeAction = this.schedule(state, state.bufferTimeSpan);
  186. }
  187. }
  188. function dispatchBufferCreation(state) {
  189. var bufferCreationInterval = state.bufferCreationInterval, bufferTimeSpan = state.bufferTimeSpan, subscriber = state.subscriber, scheduler = state.scheduler;
  190. var context = subscriber.openContext();
  191. var action = this;
  192. if (!subscriber.closed) {
  193. subscriber.add(context.closeAction = scheduler.schedule(dispatchBufferClose, bufferTimeSpan, { subscriber: subscriber, context: context }));
  194. action.schedule(state, bufferCreationInterval);
  195. }
  196. }
  197. function dispatchBufferClose(arg) {
  198. var subscriber = arg.subscriber, context = arg.context;
  199. subscriber.closeContext(context);
  200. }
  201. //# sourceMappingURL=bufferTime.js.map