observeOn.js 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. import { Subscriber } from '../Subscriber';
  2. import { Notification } from '../Notification';
  3. export function observeOn(scheduler, delay = 0) {
  4. return function observeOnOperatorFunction(source) {
  5. return source.lift(new ObserveOnOperator(scheduler, delay));
  6. };
  7. }
  8. export class ObserveOnOperator {
  9. constructor(scheduler, delay = 0) {
  10. this.scheduler = scheduler;
  11. this.delay = delay;
  12. }
  13. call(subscriber, source) {
  14. return source.subscribe(new ObserveOnSubscriber(subscriber, this.scheduler, this.delay));
  15. }
  16. }
  17. export class ObserveOnSubscriber extends Subscriber {
  18. constructor(destination, scheduler, delay = 0) {
  19. super(destination);
  20. this.scheduler = scheduler;
  21. this.delay = delay;
  22. }
  23. static dispatch(arg) {
  24. const { notification, destination } = arg;
  25. notification.observe(destination);
  26. this.unsubscribe();
  27. }
  28. scheduleMessage(notification) {
  29. const destination = this.destination;
  30. destination.add(this.scheduler.schedule(ObserveOnSubscriber.dispatch, this.delay, new ObserveOnMessage(notification, this.destination)));
  31. }
  32. _next(value) {
  33. this.scheduleMessage(Notification.createNext(value));
  34. }
  35. _error(err) {
  36. this.scheduleMessage(Notification.createError(err));
  37. this.unsubscribe();
  38. }
  39. _complete() {
  40. this.scheduleMessage(Notification.createComplete());
  41. this.unsubscribe();
  42. }
  43. }
  44. export class ObserveOnMessage {
  45. constructor(notification, destination) {
  46. this.notification = notification;
  47. this.destination = destination;
  48. }
  49. }
  50. //# sourceMappingURL=observeOn.js.map