timeoutWith.js 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
  1. import { async } from '../scheduler/async';
  2. import { isDate } from '../util/isDate';
  3. import { OuterSubscriber } from '../OuterSubscriber';
  4. import { subscribeToResult } from '../util/subscribeToResult';
  5. export function timeoutWith(due, withObservable, scheduler = async) {
  6. return (source) => {
  7. let absoluteTimeout = isDate(due);
  8. let waitFor = absoluteTimeout ? (+due - scheduler.now()) : Math.abs(due);
  9. return source.lift(new TimeoutWithOperator(waitFor, absoluteTimeout, withObservable, scheduler));
  10. };
  11. }
  12. class TimeoutWithOperator {
  13. constructor(waitFor, absoluteTimeout, withObservable, scheduler) {
  14. this.waitFor = waitFor;
  15. this.absoluteTimeout = absoluteTimeout;
  16. this.withObservable = withObservable;
  17. this.scheduler = scheduler;
  18. }
  19. call(subscriber, source) {
  20. return source.subscribe(new TimeoutWithSubscriber(subscriber, this.absoluteTimeout, this.waitFor, this.withObservable, this.scheduler));
  21. }
  22. }
  23. class TimeoutWithSubscriber extends OuterSubscriber {
  24. constructor(destination, absoluteTimeout, waitFor, withObservable, scheduler) {
  25. super(destination);
  26. this.absoluteTimeout = absoluteTimeout;
  27. this.waitFor = waitFor;
  28. this.withObservable = withObservable;
  29. this.scheduler = scheduler;
  30. this.action = null;
  31. this.scheduleTimeout();
  32. }
  33. static dispatchTimeout(subscriber) {
  34. const { withObservable } = subscriber;
  35. subscriber._unsubscribeAndRecycle();
  36. subscriber.add(subscribeToResult(subscriber, withObservable));
  37. }
  38. scheduleTimeout() {
  39. const { action } = this;
  40. if (action) {
  41. this.action = action.schedule(this, this.waitFor);
  42. }
  43. else {
  44. this.add(this.action = this.scheduler.schedule(TimeoutWithSubscriber.dispatchTimeout, this.waitFor, this));
  45. }
  46. }
  47. _next(value) {
  48. if (!this.absoluteTimeout) {
  49. this.scheduleTimeout();
  50. }
  51. super._next(value);
  52. }
  53. _unsubscribe() {
  54. this.action = null;
  55. this.scheduler = null;
  56. this.withObservable = null;
  57. }
  58. }
  59. //# sourceMappingURL=timeoutWith.js.map