repeatWhen.js 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. import { Subject } from '../Subject';
  2. import { SimpleOuterSubscriber, innerSubscribe, SimpleInnerSubscriber } from '../innerSubscribe';
  3. export function repeatWhen(notifier) {
  4. return (source) => source.lift(new RepeatWhenOperator(notifier));
  5. }
  6. class RepeatWhenOperator {
  7. constructor(notifier) {
  8. this.notifier = notifier;
  9. }
  10. call(subscriber, source) {
  11. return source.subscribe(new RepeatWhenSubscriber(subscriber, this.notifier, source));
  12. }
  13. }
  14. class RepeatWhenSubscriber extends SimpleOuterSubscriber {
  15. constructor(destination, notifier, source) {
  16. super(destination);
  17. this.notifier = notifier;
  18. this.source = source;
  19. this.sourceIsBeingSubscribedTo = true;
  20. }
  21. notifyNext() {
  22. this.sourceIsBeingSubscribedTo = true;
  23. this.source.subscribe(this);
  24. }
  25. notifyComplete() {
  26. if (this.sourceIsBeingSubscribedTo === false) {
  27. return super.complete();
  28. }
  29. }
  30. complete() {
  31. this.sourceIsBeingSubscribedTo = false;
  32. if (!this.isStopped) {
  33. if (!this.retries) {
  34. this.subscribeToRetries();
  35. }
  36. if (!this.retriesSubscription || this.retriesSubscription.closed) {
  37. return super.complete();
  38. }
  39. this._unsubscribeAndRecycle();
  40. this.notifications.next(undefined);
  41. }
  42. }
  43. _unsubscribe() {
  44. const { notifications, retriesSubscription } = this;
  45. if (notifications) {
  46. notifications.unsubscribe();
  47. this.notifications = undefined;
  48. }
  49. if (retriesSubscription) {
  50. retriesSubscription.unsubscribe();
  51. this.retriesSubscription = undefined;
  52. }
  53. this.retries = undefined;
  54. }
  55. _unsubscribeAndRecycle() {
  56. const { _unsubscribe } = this;
  57. this._unsubscribe = null;
  58. super._unsubscribeAndRecycle();
  59. this._unsubscribe = _unsubscribe;
  60. return this;
  61. }
  62. subscribeToRetries() {
  63. this.notifications = new Subject();
  64. let retries;
  65. try {
  66. const { notifier } = this;
  67. retries = notifier(this.notifications);
  68. }
  69. catch (e) {
  70. return super.complete();
  71. }
  72. this.retries = retries;
  73. this.retriesSubscription = innerSubscribe(retries, new SimpleInnerSubscriber(this));
  74. }
  75. }
  76. //# sourceMappingURL=repeatWhen.js.map