delayWhen.js 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. import { Subscriber } from '../Subscriber';
  2. import { Observable } from '../Observable';
  3. import { OuterSubscriber } from '../OuterSubscriber';
  4. import { subscribeToResult } from '../util/subscribeToResult';
  5. /**
  6. * Delays the emission of items from the source Observable by a given time span
  7. * determined by the emissions of another Observable.
  8. *
  9. * <span class="informal">It's like {@link delay}, but the time span of the
  10. * delay duration is determined by a second Observable.</span>
  11. *
  12. * <img src="./img/delayWhen.png" width="100%">
  13. *
  14. * `delayWhen` time shifts each emitted value from the source Observable by a
  15. * time span determined by another Observable. When the source emits a value,
  16. * the `delayDurationSelector` function is called with the source value as
  17. * argument, and should return an Observable, called the "duration" Observable.
  18. * The source value is emitted on the output Observable only when the duration
  19. * Observable emits a value or completes.
  20. *
  21. * Optionally, `delayWhen` takes a second argument, `subscriptionDelay`, which
  22. * is an Observable. When `subscriptionDelay` emits its first value or
  23. * completes, the source Observable is subscribed to and starts behaving like
  24. * described in the previous paragraph. If `subscriptionDelay` is not provided,
  25. * `delayWhen` will subscribe to the source Observable as soon as the output
  26. * Observable is subscribed.
  27. *
  28. * @example <caption>Delay each click by a random amount of time, between 0 and 5 seconds</caption>
  29. * var clicks = Rx.Observable.fromEvent(document, 'click');
  30. * var delayedClicks = clicks.delayWhen(event =>
  31. * Rx.Observable.interval(Math.random() * 5000)
  32. * );
  33. * delayedClicks.subscribe(x => console.log(x));
  34. *
  35. * @see {@link debounce}
  36. * @see {@link delay}
  37. *
  38. * @param {function(value: T): Observable} delayDurationSelector A function that
  39. * returns an Observable for each value emitted by the source Observable, which
  40. * is then used to delay the emission of that item on the output Observable
  41. * until the Observable returned from this function emits a value.
  42. * @param {Observable} subscriptionDelay An Observable that triggers the
  43. * subscription to the source Observable once it emits any value.
  44. * @return {Observable} An Observable that delays the emissions of the source
  45. * Observable by an amount of time specified by the Observable returned by
  46. * `delayDurationSelector`.
  47. * @method delayWhen
  48. * @owner Observable
  49. */
  50. export function delayWhen(delayDurationSelector, subscriptionDelay) {
  51. if (subscriptionDelay) {
  52. return (source) => new SubscriptionDelayObservable(source, subscriptionDelay)
  53. .lift(new DelayWhenOperator(delayDurationSelector));
  54. }
  55. return (source) => source.lift(new DelayWhenOperator(delayDurationSelector));
  56. }
  57. class DelayWhenOperator {
  58. constructor(delayDurationSelector) {
  59. this.delayDurationSelector = delayDurationSelector;
  60. }
  61. call(subscriber, source) {
  62. return source.subscribe(new DelayWhenSubscriber(subscriber, this.delayDurationSelector));
  63. }
  64. }
  65. /**
  66. * We need this JSDoc comment for affecting ESDoc.
  67. * @ignore
  68. * @extends {Ignored}
  69. */
  70. class DelayWhenSubscriber extends OuterSubscriber {
  71. constructor(destination, delayDurationSelector) {
  72. super(destination);
  73. this.delayDurationSelector = delayDurationSelector;
  74. this.completed = false;
  75. this.delayNotifierSubscriptions = [];
  76. this.values = [];
  77. }
  78. notifyNext(outerValue, innerValue, outerIndex, innerIndex, innerSub) {
  79. this.destination.next(outerValue);
  80. this.removeSubscription(innerSub);
  81. this.tryComplete();
  82. }
  83. notifyError(error, innerSub) {
  84. this._error(error);
  85. }
  86. notifyComplete(innerSub) {
  87. const value = this.removeSubscription(innerSub);
  88. if (value) {
  89. this.destination.next(value);
  90. }
  91. this.tryComplete();
  92. }
  93. _next(value) {
  94. try {
  95. const delayNotifier = this.delayDurationSelector(value);
  96. if (delayNotifier) {
  97. this.tryDelay(delayNotifier, value);
  98. }
  99. }
  100. catch (err) {
  101. this.destination.error(err);
  102. }
  103. }
  104. _complete() {
  105. this.completed = true;
  106. this.tryComplete();
  107. }
  108. removeSubscription(subscription) {
  109. subscription.unsubscribe();
  110. const subscriptionIdx = this.delayNotifierSubscriptions.indexOf(subscription);
  111. let value = null;
  112. if (subscriptionIdx !== -1) {
  113. value = this.values[subscriptionIdx];
  114. this.delayNotifierSubscriptions.splice(subscriptionIdx, 1);
  115. this.values.splice(subscriptionIdx, 1);
  116. }
  117. return value;
  118. }
  119. tryDelay(delayNotifier, value) {
  120. const notifierSubscription = subscribeToResult(this, delayNotifier, value);
  121. if (notifierSubscription && !notifierSubscription.closed) {
  122. this.add(notifierSubscription);
  123. this.delayNotifierSubscriptions.push(notifierSubscription);
  124. }
  125. this.values.push(value);
  126. }
  127. tryComplete() {
  128. if (this.completed && this.delayNotifierSubscriptions.length === 0) {
  129. this.destination.complete();
  130. }
  131. }
  132. }
  133. /**
  134. * We need this JSDoc comment for affecting ESDoc.
  135. * @ignore
  136. * @extends {Ignored}
  137. */
  138. class SubscriptionDelayObservable extends Observable {
  139. constructor(/** @deprecated internal use only */ source, subscriptionDelay) {
  140. super();
  141. this.source = source;
  142. this.subscriptionDelay = subscriptionDelay;
  143. }
  144. /** @deprecated internal use only */ _subscribe(subscriber) {
  145. this.subscriptionDelay.subscribe(new SubscriptionDelaySubscriber(subscriber, this.source));
  146. }
  147. }
  148. /**
  149. * We need this JSDoc comment for affecting ESDoc.
  150. * @ignore
  151. * @extends {Ignored}
  152. */
  153. class SubscriptionDelaySubscriber extends Subscriber {
  154. constructor(parent, source) {
  155. super();
  156. this.parent = parent;
  157. this.source = source;
  158. this.sourceSubscribed = false;
  159. }
  160. _next(unused) {
  161. this.subscribeToSource();
  162. }
  163. _error(err) {
  164. this.unsubscribe();
  165. this.parent.error(err);
  166. }
  167. _complete() {
  168. this.subscribeToSource();
  169. }
  170. subscribeToSource() {
  171. if (!this.sourceSubscribed) {
  172. this.sourceSubscribed = true;
  173. this.unsubscribe();
  174. this.source.subscribe(this.parent);
  175. }
  176. }
  177. }
  178. //# sourceMappingURL=delayWhen.js.map