repeatWhen.js 3.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. import { Subject } from '../Subject';
  2. import { tryCatch } from '../util/tryCatch';
  3. import { errorObject } from '../util/errorObject';
  4. import { OuterSubscriber } from '../OuterSubscriber';
  5. import { subscribeToResult } from '../util/subscribeToResult';
  6. /**
  7. * Returns an Observable that mirrors the source Observable with the exception of a `complete`. If the source
  8. * Observable calls `complete`, this method will emit to the Observable returned from `notifier`. If that Observable
  9. * calls `complete` or `error`, then this method will call `complete` or `error` on the child subscription. Otherwise
  10. * this method will resubscribe to the source Observable.
  11. *
  12. * <img src="./img/repeatWhen.png" width="100%">
  13. *
  14. * @param {function(notifications: Observable): Observable} notifier - Receives an Observable of notifications with
  15. * which a user can `complete` or `error`, aborting the repetition.
  16. * @return {Observable} The source Observable modified with repeat logic.
  17. * @method repeatWhen
  18. * @owner Observable
  19. */
  20. export function repeatWhen(notifier) {
  21. return (source) => source.lift(new RepeatWhenOperator(notifier));
  22. }
  23. class RepeatWhenOperator {
  24. constructor(notifier) {
  25. this.notifier = notifier;
  26. }
  27. call(subscriber, source) {
  28. return source.subscribe(new RepeatWhenSubscriber(subscriber, this.notifier, source));
  29. }
  30. }
  31. /**
  32. * We need this JSDoc comment for affecting ESDoc.
  33. * @ignore
  34. * @extends {Ignored}
  35. */
  36. class RepeatWhenSubscriber extends OuterSubscriber {
  37. constructor(destination, notifier, source) {
  38. super(destination);
  39. this.notifier = notifier;
  40. this.source = source;
  41. this.sourceIsBeingSubscribedTo = true;
  42. }
  43. notifyNext(outerValue, innerValue, outerIndex, innerIndex, innerSub) {
  44. this.sourceIsBeingSubscribedTo = true;
  45. this.source.subscribe(this);
  46. }
  47. notifyComplete(innerSub) {
  48. if (this.sourceIsBeingSubscribedTo === false) {
  49. return super.complete();
  50. }
  51. }
  52. complete() {
  53. this.sourceIsBeingSubscribedTo = false;
  54. if (!this.isStopped) {
  55. if (!this.retries) {
  56. this.subscribeToRetries();
  57. }
  58. if (!this.retriesSubscription || this.retriesSubscription.closed) {
  59. return super.complete();
  60. }
  61. this._unsubscribeAndRecycle();
  62. this.notifications.next();
  63. }
  64. }
  65. /** @deprecated internal use only */ _unsubscribe() {
  66. const { notifications, retriesSubscription } = this;
  67. if (notifications) {
  68. notifications.unsubscribe();
  69. this.notifications = null;
  70. }
  71. if (retriesSubscription) {
  72. retriesSubscription.unsubscribe();
  73. this.retriesSubscription = null;
  74. }
  75. this.retries = null;
  76. }
  77. /** @deprecated internal use only */ _unsubscribeAndRecycle() {
  78. const { notifications, retries, retriesSubscription } = this;
  79. this.notifications = null;
  80. this.retries = null;
  81. this.retriesSubscription = null;
  82. super._unsubscribeAndRecycle();
  83. this.notifications = notifications;
  84. this.retries = retries;
  85. this.retriesSubscription = retriesSubscription;
  86. return this;
  87. }
  88. subscribeToRetries() {
  89. this.notifications = new Subject();
  90. const retries = tryCatch(this.notifier)(this.notifications);
  91. if (retries === errorObject) {
  92. return super.complete();
  93. }
  94. this.retries = retries;
  95. this.retriesSubscription = subscribeToResult(this, retries);
  96. }
  97. }
  98. //# sourceMappingURL=repeatWhen.js.map