delay.js 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. import { async } from '../scheduler/async';
  2. import { isDate } from '../util/isDate';
  3. import { Subscriber } from '../Subscriber';
  4. import { Notification } from '../Notification';
  5. /**
  6. * Delays the emission of items from the source Observable by a given timeout or
  7. * until a given Date.
  8. *
  9. * <span class="informal">Time shifts each item by some specified amount of
  10. * milliseconds.</span>
  11. *
  12. * <img src="./img/delay.png" width="100%">
  13. *
  14. * If the delay argument is a Number, this operator time shifts the source
  15. * Observable by that amount of time expressed in milliseconds. The relative
  16. * time intervals between the values are preserved.
  17. *
  18. * If the delay argument is a Date, this operator time shifts the start of the
  19. * Observable execution until the given date occurs.
  20. *
  21. * @example <caption>Delay each click by one second</caption>
  22. * var clicks = Rx.Observable.fromEvent(document, 'click');
  23. * var delayedClicks = clicks.delay(1000); // each click emitted after 1 second
  24. * delayedClicks.subscribe(x => console.log(x));
  25. *
  26. * @example <caption>Delay all clicks until a future date happens</caption>
  27. * var clicks = Rx.Observable.fromEvent(document, 'click');
  28. * var date = new Date('March 15, 2050 12:00:00'); // in the future
  29. * var delayedClicks = clicks.delay(date); // click emitted only after that date
  30. * delayedClicks.subscribe(x => console.log(x));
  31. *
  32. * @see {@link debounceTime}
  33. * @see {@link delayWhen}
  34. *
  35. * @param {number|Date} delay The delay duration in milliseconds (a `number`) or
  36. * a `Date` until which the emission of the source items is delayed.
  37. * @param {Scheduler} [scheduler=async] The IScheduler to use for
  38. * managing the timers that handle the time-shift for each item.
  39. * @return {Observable} An Observable that delays the emissions of the source
  40. * Observable by the specified timeout or Date.
  41. * @method delay
  42. * @owner Observable
  43. */
  44. export function delay(delay, scheduler = async) {
  45. const absoluteDelay = isDate(delay);
  46. const delayFor = absoluteDelay ? (+delay - scheduler.now()) : Math.abs(delay);
  47. return (source) => source.lift(new DelayOperator(delayFor, scheduler));
  48. }
  49. class DelayOperator {
  50. constructor(delay, scheduler) {
  51. this.delay = delay;
  52. this.scheduler = scheduler;
  53. }
  54. call(subscriber, source) {
  55. return source.subscribe(new DelaySubscriber(subscriber, this.delay, this.scheduler));
  56. }
  57. }
  58. /**
  59. * We need this JSDoc comment for affecting ESDoc.
  60. * @ignore
  61. * @extends {Ignored}
  62. */
  63. class DelaySubscriber extends Subscriber {
  64. constructor(destination, delay, scheduler) {
  65. super(destination);
  66. this.delay = delay;
  67. this.scheduler = scheduler;
  68. this.queue = [];
  69. this.active = false;
  70. this.errored = false;
  71. }
  72. static dispatch(state) {
  73. const source = state.source;
  74. const queue = source.queue;
  75. const scheduler = state.scheduler;
  76. const destination = state.destination;
  77. while (queue.length > 0 && (queue[0].time - scheduler.now()) <= 0) {
  78. queue.shift().notification.observe(destination);
  79. }
  80. if (queue.length > 0) {
  81. const delay = Math.max(0, queue[0].time - scheduler.now());
  82. this.schedule(state, delay);
  83. }
  84. else {
  85. this.unsubscribe();
  86. source.active = false;
  87. }
  88. }
  89. _schedule(scheduler) {
  90. this.active = true;
  91. this.add(scheduler.schedule(DelaySubscriber.dispatch, this.delay, {
  92. source: this, destination: this.destination, scheduler: scheduler
  93. }));
  94. }
  95. scheduleNotification(notification) {
  96. if (this.errored === true) {
  97. return;
  98. }
  99. const scheduler = this.scheduler;
  100. const message = new DelayMessage(scheduler.now() + this.delay, notification);
  101. this.queue.push(message);
  102. if (this.active === false) {
  103. this._schedule(scheduler);
  104. }
  105. }
  106. _next(value) {
  107. this.scheduleNotification(Notification.createNext(value));
  108. }
  109. _error(err) {
  110. this.errored = true;
  111. this.queue = [];
  112. this.destination.error(err);
  113. }
  114. _complete() {
  115. this.scheduleNotification(Notification.createComplete());
  116. }
  117. }
  118. class DelayMessage {
  119. constructor(time, notification) {
  120. this.time = time;
  121. this.notification = notification;
  122. }
  123. }
  124. //# sourceMappingURL=delay.js.map