bufferWhen.js 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. "use strict";
  2. var __extends = (this && this.__extends) || function (d, b) {
  3. for (var p in b) if (b.hasOwnProperty(p)) d[p] = b[p];
  4. function __() { this.constructor = d; }
  5. d.prototype = b === null ? Object.create(b) : (__.prototype = b.prototype, new __());
  6. };
  7. var Subscription_1 = require('../Subscription');
  8. var tryCatch_1 = require('../util/tryCatch');
  9. var errorObject_1 = require('../util/errorObject');
  10. var OuterSubscriber_1 = require('../OuterSubscriber');
  11. var subscribeToResult_1 = require('../util/subscribeToResult');
  12. /**
  13. * Buffers the source Observable values, using a factory function of closing
  14. * Observables to determine when to close, emit, and reset the buffer.
  15. *
  16. * <span class="informal">Collects values from the past as an array. When it
  17. * starts collecting values, it calls a function that returns an Observable that
  18. * tells when to close the buffer and restart collecting.</span>
  19. *
  20. * <img src="./img/bufferWhen.png" width="100%">
  21. *
  22. * Opens a buffer immediately, then closes the buffer when the observable
  23. * returned by calling `closingSelector` function emits a value. When it closes
  24. * the buffer, it immediately opens a new buffer and repeats the process.
  25. *
  26. * @example <caption>Emit an array of the last clicks every [1-5] random seconds</caption>
  27. * var clicks = Rx.Observable.fromEvent(document, 'click');
  28. * var buffered = clicks.bufferWhen(() =>
  29. * Rx.Observable.interval(1000 + Math.random() * 4000)
  30. * );
  31. * buffered.subscribe(x => console.log(x));
  32. *
  33. * @see {@link buffer}
  34. * @see {@link bufferCount}
  35. * @see {@link bufferTime}
  36. * @see {@link bufferToggle}
  37. * @see {@link windowWhen}
  38. *
  39. * @param {function(): Observable} closingSelector A function that takes no
  40. * arguments and returns an Observable that signals buffer closure.
  41. * @return {Observable<T[]>} An observable of arrays of buffered values.
  42. * @method bufferWhen
  43. * @owner Observable
  44. */
  45. function bufferWhen(closingSelector) {
  46. return function (source) {
  47. return source.lift(new BufferWhenOperator(closingSelector));
  48. };
  49. }
  50. exports.bufferWhen = bufferWhen;
  51. var BufferWhenOperator = (function () {
  52. function BufferWhenOperator(closingSelector) {
  53. this.closingSelector = closingSelector;
  54. }
  55. BufferWhenOperator.prototype.call = function (subscriber, source) {
  56. return source.subscribe(new BufferWhenSubscriber(subscriber, this.closingSelector));
  57. };
  58. return BufferWhenOperator;
  59. }());
  60. /**
  61. * We need this JSDoc comment for affecting ESDoc.
  62. * @ignore
  63. * @extends {Ignored}
  64. */
  65. var BufferWhenSubscriber = (function (_super) {
  66. __extends(BufferWhenSubscriber, _super);
  67. function BufferWhenSubscriber(destination, closingSelector) {
  68. _super.call(this, destination);
  69. this.closingSelector = closingSelector;
  70. this.subscribing = false;
  71. this.openBuffer();
  72. }
  73. BufferWhenSubscriber.prototype._next = function (value) {
  74. this.buffer.push(value);
  75. };
  76. BufferWhenSubscriber.prototype._complete = function () {
  77. var buffer = this.buffer;
  78. if (buffer) {
  79. this.destination.next(buffer);
  80. }
  81. _super.prototype._complete.call(this);
  82. };
  83. /** @deprecated internal use only */ BufferWhenSubscriber.prototype._unsubscribe = function () {
  84. this.buffer = null;
  85. this.subscribing = false;
  86. };
  87. BufferWhenSubscriber.prototype.notifyNext = function (outerValue, innerValue, outerIndex, innerIndex, innerSub) {
  88. this.openBuffer();
  89. };
  90. BufferWhenSubscriber.prototype.notifyComplete = function () {
  91. if (this.subscribing) {
  92. this.complete();
  93. }
  94. else {
  95. this.openBuffer();
  96. }
  97. };
  98. BufferWhenSubscriber.prototype.openBuffer = function () {
  99. var closingSubscription = this.closingSubscription;
  100. if (closingSubscription) {
  101. this.remove(closingSubscription);
  102. closingSubscription.unsubscribe();
  103. }
  104. var buffer = this.buffer;
  105. if (this.buffer) {
  106. this.destination.next(buffer);
  107. }
  108. this.buffer = [];
  109. var closingNotifier = tryCatch_1.tryCatch(this.closingSelector)();
  110. if (closingNotifier === errorObject_1.errorObject) {
  111. this.error(errorObject_1.errorObject.e);
  112. }
  113. else {
  114. closingSubscription = new Subscription_1.Subscription();
  115. this.closingSubscription = closingSubscription;
  116. this.add(closingSubscription);
  117. this.subscribing = true;
  118. closingSubscription.add(subscribeToResult_1.subscribeToResult(this, closingNotifier));
  119. this.subscribing = false;
  120. }
  121. };
  122. return BufferWhenSubscriber;
  123. }(OuterSubscriber_1.OuterSubscriber));
  124. //# sourceMappingURL=bufferWhen.js.map