delayWhen.js 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. import { Subscriber } from '../Subscriber';
  2. import { Observable } from '../Observable';
  3. import { OuterSubscriber } from '../OuterSubscriber';
  4. import { subscribeToResult } from '../util/subscribeToResult';
  5. export function delayWhen(delayDurationSelector, subscriptionDelay) {
  6. if (subscriptionDelay) {
  7. return (source) => new SubscriptionDelayObservable(source, subscriptionDelay)
  8. .lift(new DelayWhenOperator(delayDurationSelector));
  9. }
  10. return (source) => source.lift(new DelayWhenOperator(delayDurationSelector));
  11. }
  12. class DelayWhenOperator {
  13. constructor(delayDurationSelector) {
  14. this.delayDurationSelector = delayDurationSelector;
  15. }
  16. call(subscriber, source) {
  17. return source.subscribe(new DelayWhenSubscriber(subscriber, this.delayDurationSelector));
  18. }
  19. }
  20. class DelayWhenSubscriber extends OuterSubscriber {
  21. constructor(destination, delayDurationSelector) {
  22. super(destination);
  23. this.delayDurationSelector = delayDurationSelector;
  24. this.completed = false;
  25. this.delayNotifierSubscriptions = [];
  26. this.index = 0;
  27. }
  28. notifyNext(outerValue, innerValue, outerIndex, innerIndex, innerSub) {
  29. this.destination.next(outerValue);
  30. this.removeSubscription(innerSub);
  31. this.tryComplete();
  32. }
  33. notifyError(error, innerSub) {
  34. this._error(error);
  35. }
  36. notifyComplete(innerSub) {
  37. const value = this.removeSubscription(innerSub);
  38. if (value) {
  39. this.destination.next(value);
  40. }
  41. this.tryComplete();
  42. }
  43. _next(value) {
  44. const index = this.index++;
  45. try {
  46. const delayNotifier = this.delayDurationSelector(value, index);
  47. if (delayNotifier) {
  48. this.tryDelay(delayNotifier, value);
  49. }
  50. }
  51. catch (err) {
  52. this.destination.error(err);
  53. }
  54. }
  55. _complete() {
  56. this.completed = true;
  57. this.tryComplete();
  58. this.unsubscribe();
  59. }
  60. removeSubscription(subscription) {
  61. subscription.unsubscribe();
  62. const subscriptionIdx = this.delayNotifierSubscriptions.indexOf(subscription);
  63. if (subscriptionIdx !== -1) {
  64. this.delayNotifierSubscriptions.splice(subscriptionIdx, 1);
  65. }
  66. return subscription.outerValue;
  67. }
  68. tryDelay(delayNotifier, value) {
  69. const notifierSubscription = subscribeToResult(this, delayNotifier, value);
  70. if (notifierSubscription && !notifierSubscription.closed) {
  71. const destination = this.destination;
  72. destination.add(notifierSubscription);
  73. this.delayNotifierSubscriptions.push(notifierSubscription);
  74. }
  75. }
  76. tryComplete() {
  77. if (this.completed && this.delayNotifierSubscriptions.length === 0) {
  78. this.destination.complete();
  79. }
  80. }
  81. }
  82. class SubscriptionDelayObservable extends Observable {
  83. constructor(source, subscriptionDelay) {
  84. super();
  85. this.source = source;
  86. this.subscriptionDelay = subscriptionDelay;
  87. }
  88. _subscribe(subscriber) {
  89. this.subscriptionDelay.subscribe(new SubscriptionDelaySubscriber(subscriber, this.source));
  90. }
  91. }
  92. class SubscriptionDelaySubscriber extends Subscriber {
  93. constructor(parent, source) {
  94. super();
  95. this.parent = parent;
  96. this.source = source;
  97. this.sourceSubscribed = false;
  98. }
  99. _next(unused) {
  100. this.subscribeToSource();
  101. }
  102. _error(err) {
  103. this.unsubscribe();
  104. this.parent.error(err);
  105. }
  106. _complete() {
  107. this.unsubscribe();
  108. this.subscribeToSource();
  109. }
  110. subscribeToSource() {
  111. if (!this.sourceSubscribed) {
  112. this.sourceSubscribed = true;
  113. this.unsubscribe();
  114. this.source.subscribe(this.parent);
  115. }
  116. }
  117. }
  118. //# sourceMappingURL=delayWhen.js.map