bufferWhen.js 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. /** PURE_IMPORTS_START .._Subscription,.._util_tryCatch,.._util_errorObject,.._OuterSubscriber,.._util_subscribeToResult PURE_IMPORTS_END */
  2. var __extends = (this && this.__extends) || function (d, b) {
  3. for (var p in b)
  4. if (b.hasOwnProperty(p))
  5. d[p] = b[p];
  6. function __() { this.constructor = d; }
  7. d.prototype = b === null ? Object.create(b) : (__.prototype = b.prototype, new __());
  8. };
  9. import { Subscription } from '../Subscription';
  10. import { tryCatch } from '../util/tryCatch';
  11. import { errorObject } from '../util/errorObject';
  12. import { OuterSubscriber } from '../OuterSubscriber';
  13. import { subscribeToResult } from '../util/subscribeToResult';
  14. /**
  15. * Buffers the source Observable values, using a factory function of closing
  16. * Observables to determine when to close, emit, and reset the buffer.
  17. *
  18. * <span class="informal">Collects values from the past as an array. When it
  19. * starts collecting values, it calls a function that returns an Observable that
  20. * tells when to close the buffer and restart collecting.</span>
  21. *
  22. * <img src="./img/bufferWhen.png" width="100%">
  23. *
  24. * Opens a buffer immediately, then closes the buffer when the observable
  25. * returned by calling `closingSelector` function emits a value. When it closes
  26. * the buffer, it immediately opens a new buffer and repeats the process.
  27. *
  28. * @example <caption>Emit an array of the last clicks every [1-5] random seconds</caption>
  29. * var clicks = Rx.Observable.fromEvent(document, 'click');
  30. * var buffered = clicks.bufferWhen(() =>
  31. * Rx.Observable.interval(1000 + Math.random() * 4000)
  32. * );
  33. * buffered.subscribe(x => console.log(x));
  34. *
  35. * @see {@link buffer}
  36. * @see {@link bufferCount}
  37. * @see {@link bufferTime}
  38. * @see {@link bufferToggle}
  39. * @see {@link windowWhen}
  40. *
  41. * @param {function(): Observable} closingSelector A function that takes no
  42. * arguments and returns an Observable that signals buffer closure.
  43. * @return {Observable<T[]>} An observable of arrays of buffered values.
  44. * @method bufferWhen
  45. * @owner Observable
  46. */
  47. export function bufferWhen(closingSelector) {
  48. return function (source) {
  49. return source.lift(new BufferWhenOperator(closingSelector));
  50. };
  51. }
  52. var BufferWhenOperator = /*@__PURE__*/ (/*@__PURE__*/ function () {
  53. function BufferWhenOperator(closingSelector) {
  54. this.closingSelector = closingSelector;
  55. }
  56. BufferWhenOperator.prototype.call = function (subscriber, source) {
  57. return source.subscribe(new BufferWhenSubscriber(subscriber, this.closingSelector));
  58. };
  59. return BufferWhenOperator;
  60. }());
  61. /**
  62. * We need this JSDoc comment for affecting ESDoc.
  63. * @ignore
  64. * @extends {Ignored}
  65. */
  66. var BufferWhenSubscriber = /*@__PURE__*/ (/*@__PURE__*/ function (_super) {
  67. __extends(BufferWhenSubscriber, _super);
  68. function BufferWhenSubscriber(destination, closingSelector) {
  69. _super.call(this, destination);
  70. this.closingSelector = closingSelector;
  71. this.subscribing = false;
  72. this.openBuffer();
  73. }
  74. BufferWhenSubscriber.prototype._next = function (value) {
  75. this.buffer.push(value);
  76. };
  77. BufferWhenSubscriber.prototype._complete = function () {
  78. var buffer = this.buffer;
  79. if (buffer) {
  80. this.destination.next(buffer);
  81. }
  82. _super.prototype._complete.call(this);
  83. };
  84. /** @deprecated internal use only */ BufferWhenSubscriber.prototype._unsubscribe = function () {
  85. this.buffer = null;
  86. this.subscribing = false;
  87. };
  88. BufferWhenSubscriber.prototype.notifyNext = function (outerValue, innerValue, outerIndex, innerIndex, innerSub) {
  89. this.openBuffer();
  90. };
  91. BufferWhenSubscriber.prototype.notifyComplete = function () {
  92. if (this.subscribing) {
  93. this.complete();
  94. }
  95. else {
  96. this.openBuffer();
  97. }
  98. };
  99. BufferWhenSubscriber.prototype.openBuffer = function () {
  100. var closingSubscription = this.closingSubscription;
  101. if (closingSubscription) {
  102. this.remove(closingSubscription);
  103. closingSubscription.unsubscribe();
  104. }
  105. var buffer = this.buffer;
  106. if (this.buffer) {
  107. this.destination.next(buffer);
  108. }
  109. this.buffer = [];
  110. var closingNotifier = tryCatch(this.closingSelector)();
  111. if (closingNotifier === errorObject) {
  112. this.error(errorObject.e);
  113. }
  114. else {
  115. closingSubscription = new Subscription();
  116. this.closingSubscription = closingSubscription;
  117. this.add(closingSubscription);
  118. this.subscribing = true;
  119. closingSubscription.add(subscribeToResult(this, closingNotifier));
  120. this.subscribing = false;
  121. }
  122. };
  123. return BufferWhenSubscriber;
  124. }(OuterSubscriber));
  125. //# sourceMappingURL=bufferWhen.js.map