bufferToggle.js 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. import { Subscription } from '../Subscription';
  2. import { subscribeToResult } from '../util/subscribeToResult';
  3. import { OuterSubscriber } from '../OuterSubscriber';
  4. /**
  5. * Buffers the source Observable values starting from an emission from
  6. * `openings` and ending when the output of `closingSelector` emits.
  7. *
  8. * <span class="informal">Collects values from the past as an array. Starts
  9. * collecting only when `opening` emits, and calls the `closingSelector`
  10. * function to get an Observable that tells when to close the buffer.</span>
  11. *
  12. * <img src="./img/bufferToggle.png" width="100%">
  13. *
  14. * Buffers values from the source by opening the buffer via signals from an
  15. * Observable provided to `openings`, and closing and sending the buffers when
  16. * a Subscribable or Promise returned by the `closingSelector` function emits.
  17. *
  18. * @example <caption>Every other second, emit the click events from the next 500ms</caption>
  19. * var clicks = Rx.Observable.fromEvent(document, 'click');
  20. * var openings = Rx.Observable.interval(1000);
  21. * var buffered = clicks.bufferToggle(openings, i =>
  22. * i % 2 ? Rx.Observable.interval(500) : Rx.Observable.empty()
  23. * );
  24. * buffered.subscribe(x => console.log(x));
  25. *
  26. * @see {@link buffer}
  27. * @see {@link bufferCount}
  28. * @see {@link bufferTime}
  29. * @see {@link bufferWhen}
  30. * @see {@link windowToggle}
  31. *
  32. * @param {SubscribableOrPromise<O>} openings A Subscribable or Promise of notifications to start new
  33. * buffers.
  34. * @param {function(value: O): SubscribableOrPromise} closingSelector A function that takes
  35. * the value emitted by the `openings` observable and returns a Subscribable or Promise,
  36. * which, when it emits, signals that the associated buffer should be emitted
  37. * and cleared.
  38. * @return {Observable<T[]>} An observable of arrays of buffered values.
  39. * @method bufferToggle
  40. * @owner Observable
  41. */
  42. export function bufferToggle(openings, closingSelector) {
  43. return function bufferToggleOperatorFunction(source) {
  44. return source.lift(new BufferToggleOperator(openings, closingSelector));
  45. };
  46. }
  47. class BufferToggleOperator {
  48. constructor(openings, closingSelector) {
  49. this.openings = openings;
  50. this.closingSelector = closingSelector;
  51. }
  52. call(subscriber, source) {
  53. return source.subscribe(new BufferToggleSubscriber(subscriber, this.openings, this.closingSelector));
  54. }
  55. }
  56. /**
  57. * We need this JSDoc comment for affecting ESDoc.
  58. * @ignore
  59. * @extends {Ignored}
  60. */
  61. class BufferToggleSubscriber extends OuterSubscriber {
  62. constructor(destination, openings, closingSelector) {
  63. super(destination);
  64. this.openings = openings;
  65. this.closingSelector = closingSelector;
  66. this.contexts = [];
  67. this.add(subscribeToResult(this, openings));
  68. }
  69. _next(value) {
  70. const contexts = this.contexts;
  71. const len = contexts.length;
  72. for (let i = 0; i < len; i++) {
  73. contexts[i].buffer.push(value);
  74. }
  75. }
  76. _error(err) {
  77. const contexts = this.contexts;
  78. while (contexts.length > 0) {
  79. const context = contexts.shift();
  80. context.subscription.unsubscribe();
  81. context.buffer = null;
  82. context.subscription = null;
  83. }
  84. this.contexts = null;
  85. super._error(err);
  86. }
  87. _complete() {
  88. const contexts = this.contexts;
  89. while (contexts.length > 0) {
  90. const context = contexts.shift();
  91. this.destination.next(context.buffer);
  92. context.subscription.unsubscribe();
  93. context.buffer = null;
  94. context.subscription = null;
  95. }
  96. this.contexts = null;
  97. super._complete();
  98. }
  99. notifyNext(outerValue, innerValue, outerIndex, innerIndex, innerSub) {
  100. outerValue ? this.closeBuffer(outerValue) : this.openBuffer(innerValue);
  101. }
  102. notifyComplete(innerSub) {
  103. this.closeBuffer(innerSub.context);
  104. }
  105. openBuffer(value) {
  106. try {
  107. const closingSelector = this.closingSelector;
  108. const closingNotifier = closingSelector.call(this, value);
  109. if (closingNotifier) {
  110. this.trySubscribe(closingNotifier);
  111. }
  112. }
  113. catch (err) {
  114. this._error(err);
  115. }
  116. }
  117. closeBuffer(context) {
  118. const contexts = this.contexts;
  119. if (contexts && context) {
  120. const { buffer, subscription } = context;
  121. this.destination.next(buffer);
  122. contexts.splice(contexts.indexOf(context), 1);
  123. this.remove(subscription);
  124. subscription.unsubscribe();
  125. }
  126. }
  127. trySubscribe(closingNotifier) {
  128. const contexts = this.contexts;
  129. const buffer = [];
  130. const subscription = new Subscription();
  131. const context = { buffer, subscription };
  132. contexts.push(context);
  133. const innerSubscription = subscribeToResult(this, closingNotifier, context);
  134. if (!innerSubscription || innerSubscription.closed) {
  135. this.closeBuffer(context);
  136. }
  137. else {
  138. innerSubscription.context = context;
  139. this.add(innerSubscription);
  140. subscription.add(innerSubscription);
  141. }
  142. }
  143. }
  144. //# sourceMappingURL=bufferToggle.js.map