audit.js 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. import { tryCatch } from '../util/tryCatch';
  2. import { errorObject } from '../util/errorObject';
  3. import { OuterSubscriber } from '../OuterSubscriber';
  4. import { subscribeToResult } from '../util/subscribeToResult';
  5. /**
  6. * Ignores source values for a duration determined by another Observable, then
  7. * emits the most recent value from the source Observable, then repeats this
  8. * process.
  9. *
  10. * <span class="informal">It's like {@link auditTime}, but the silencing
  11. * duration is determined by a second Observable.</span>
  12. *
  13. * <img src="./img/audit.png" width="100%">
  14. *
  15. * `audit` is similar to `throttle`, but emits the last value from the silenced
  16. * time window, instead of the first value. `audit` emits the most recent value
  17. * from the source Observable on the output Observable as soon as its internal
  18. * timer becomes disabled, and ignores source values while the timer is enabled.
  19. * Initially, the timer is disabled. As soon as the first source value arrives,
  20. * the timer is enabled by calling the `durationSelector` function with the
  21. * source value, which returns the "duration" Observable. When the duration
  22. * Observable emits a value or completes, the timer is disabled, then the most
  23. * recent source value is emitted on the output Observable, and this process
  24. * repeats for the next source value.
  25. *
  26. * @example <caption>Emit clicks at a rate of at most one click per second</caption>
  27. * var clicks = Rx.Observable.fromEvent(document, 'click');
  28. * var result = clicks.audit(ev => Rx.Observable.interval(1000));
  29. * result.subscribe(x => console.log(x));
  30. *
  31. * @see {@link auditTime}
  32. * @see {@link debounce}
  33. * @see {@link delayWhen}
  34. * @see {@link sample}
  35. * @see {@link throttle}
  36. *
  37. * @param {function(value: T): SubscribableOrPromise} durationSelector A function
  38. * that receives a value from the source Observable, for computing the silencing
  39. * duration, returned as an Observable or a Promise.
  40. * @return {Observable<T>} An Observable that performs rate-limiting of
  41. * emissions from the source Observable.
  42. * @method audit
  43. * @owner Observable
  44. */
  45. export function audit(durationSelector) {
  46. return function auditOperatorFunction(source) {
  47. return source.lift(new AuditOperator(durationSelector));
  48. };
  49. }
  50. class AuditOperator {
  51. constructor(durationSelector) {
  52. this.durationSelector = durationSelector;
  53. }
  54. call(subscriber, source) {
  55. return source.subscribe(new AuditSubscriber(subscriber, this.durationSelector));
  56. }
  57. }
  58. /**
  59. * We need this JSDoc comment for affecting ESDoc.
  60. * @ignore
  61. * @extends {Ignored}
  62. */
  63. class AuditSubscriber extends OuterSubscriber {
  64. constructor(destination, durationSelector) {
  65. super(destination);
  66. this.durationSelector = durationSelector;
  67. this.hasValue = false;
  68. }
  69. _next(value) {
  70. this.value = value;
  71. this.hasValue = true;
  72. if (!this.throttled) {
  73. const duration = tryCatch(this.durationSelector)(value);
  74. if (duration === errorObject) {
  75. this.destination.error(errorObject.e);
  76. }
  77. else {
  78. const innerSubscription = subscribeToResult(this, duration);
  79. if (innerSubscription.closed) {
  80. this.clearThrottle();
  81. }
  82. else {
  83. this.add(this.throttled = innerSubscription);
  84. }
  85. }
  86. }
  87. }
  88. clearThrottle() {
  89. const { value, hasValue, throttled } = this;
  90. if (throttled) {
  91. this.remove(throttled);
  92. this.throttled = null;
  93. throttled.unsubscribe();
  94. }
  95. if (hasValue) {
  96. this.value = null;
  97. this.hasValue = false;
  98. this.destination.next(value);
  99. }
  100. }
  101. notifyNext(outerValue, innerValue, outerIndex, innerIndex) {
  102. this.clearThrottle();
  103. }
  104. notifyComplete() {
  105. this.clearThrottle();
  106. }
  107. }
  108. //# sourceMappingURL=audit.js.map