delay.js 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. import { async } from '../scheduler/async';
  2. import { isDate } from '../util/isDate';
  3. import { Subscriber } from '../Subscriber';
  4. import { Notification } from '../Notification';
  5. export function delay(delay, scheduler = async) {
  6. const absoluteDelay = isDate(delay);
  7. const delayFor = absoluteDelay ? (+delay - scheduler.now()) : Math.abs(delay);
  8. return (source) => source.lift(new DelayOperator(delayFor, scheduler));
  9. }
  10. class DelayOperator {
  11. constructor(delay, scheduler) {
  12. this.delay = delay;
  13. this.scheduler = scheduler;
  14. }
  15. call(subscriber, source) {
  16. return source.subscribe(new DelaySubscriber(subscriber, this.delay, this.scheduler));
  17. }
  18. }
  19. class DelaySubscriber extends Subscriber {
  20. constructor(destination, delay, scheduler) {
  21. super(destination);
  22. this.delay = delay;
  23. this.scheduler = scheduler;
  24. this.queue = [];
  25. this.active = false;
  26. this.errored = false;
  27. }
  28. static dispatch(state) {
  29. const source = state.source;
  30. const queue = source.queue;
  31. const scheduler = state.scheduler;
  32. const destination = state.destination;
  33. while (queue.length > 0 && (queue[0].time - scheduler.now()) <= 0) {
  34. queue.shift().notification.observe(destination);
  35. }
  36. if (queue.length > 0) {
  37. const delay = Math.max(0, queue[0].time - scheduler.now());
  38. this.schedule(state, delay);
  39. }
  40. else {
  41. this.unsubscribe();
  42. source.active = false;
  43. }
  44. }
  45. _schedule(scheduler) {
  46. this.active = true;
  47. const destination = this.destination;
  48. destination.add(scheduler.schedule(DelaySubscriber.dispatch, this.delay, {
  49. source: this, destination: this.destination, scheduler: scheduler
  50. }));
  51. }
  52. scheduleNotification(notification) {
  53. if (this.errored === true) {
  54. return;
  55. }
  56. const scheduler = this.scheduler;
  57. const message = new DelayMessage(scheduler.now() + this.delay, notification);
  58. this.queue.push(message);
  59. if (this.active === false) {
  60. this._schedule(scheduler);
  61. }
  62. }
  63. _next(value) {
  64. this.scheduleNotification(Notification.createNext(value));
  65. }
  66. _error(err) {
  67. this.errored = true;
  68. this.queue = [];
  69. this.destination.error(err);
  70. this.unsubscribe();
  71. }
  72. _complete() {
  73. this.scheduleNotification(Notification.createComplete());
  74. this.unsubscribe();
  75. }
  76. }
  77. class DelayMessage {
  78. constructor(time, notification) {
  79. this.time = time;
  80. this.notification = notification;
  81. }
  82. }
  83. //# sourceMappingURL=delay.js.map