observeOn.js 4.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. import { Subscriber } from '../Subscriber';
  2. import { Notification } from '../Notification';
  3. /**
  4. *
  5. * Re-emits all notifications from source Observable with specified scheduler.
  6. *
  7. * <span class="informal">Ensure a specific scheduler is used, from outside of an Observable.</span>
  8. *
  9. * `observeOn` is an operator that accepts a scheduler as a first parameter, which will be used to reschedule
  10. * notifications emitted by the source Observable. It might be useful, if you do not have control over
  11. * internal scheduler of a given Observable, but want to control when its values are emitted nevertheless.
  12. *
  13. * Returned Observable emits the same notifications (nexted values, complete and error events) as the source Observable,
  14. * but rescheduled with provided scheduler. Note that this doesn't mean that source Observables internal
  15. * scheduler will be replaced in any way. Original scheduler still will be used, but when the source Observable emits
  16. * notification, it will be immediately scheduled again - this time with scheduler passed to `observeOn`.
  17. * An anti-pattern would be calling `observeOn` on Observable that emits lots of values synchronously, to split
  18. * that emissions into asynchronous chunks. For this to happen, scheduler would have to be passed into the source
  19. * Observable directly (usually into the operator that creates it). `observeOn` simply delays notifications a
  20. * little bit more, to ensure that they are emitted at expected moments.
  21. *
  22. * As a matter of fact, `observeOn` accepts second parameter, which specifies in milliseconds with what delay notifications
  23. * will be emitted. The main difference between {@link delay} operator and `observeOn` is that `observeOn`
  24. * will delay all notifications - including error notifications - while `delay` will pass through error
  25. * from source Observable immediately when it is emitted. In general it is highly recommended to use `delay` operator
  26. * for any kind of delaying of values in the stream, while using `observeOn` to specify which scheduler should be used
  27. * for notification emissions in general.
  28. *
  29. * @example <caption>Ensure values in subscribe are called just before browser repaint.</caption>
  30. * const intervals = Rx.Observable.interval(10); // Intervals are scheduled
  31. * // with async scheduler by default...
  32. *
  33. * intervals
  34. * .observeOn(Rx.Scheduler.animationFrame) // ...but we will observe on animationFrame
  35. * .subscribe(val => { // scheduler to ensure smooth animation.
  36. * someDiv.style.height = val + 'px';
  37. * });
  38. *
  39. * @see {@link delay}
  40. *
  41. * @param {IScheduler} scheduler Scheduler that will be used to reschedule notifications from source Observable.
  42. * @param {number} [delay] Number of milliseconds that states with what delay every notification should be rescheduled.
  43. * @return {Observable<T>} Observable that emits the same notifications as the source Observable,
  44. * but with provided scheduler.
  45. *
  46. * @method observeOn
  47. * @owner Observable
  48. */
  49. export function observeOn(scheduler, delay = 0) {
  50. return function observeOnOperatorFunction(source) {
  51. return source.lift(new ObserveOnOperator(scheduler, delay));
  52. };
  53. }
  54. export class ObserveOnOperator {
  55. constructor(scheduler, delay = 0) {
  56. this.scheduler = scheduler;
  57. this.delay = delay;
  58. }
  59. call(subscriber, source) {
  60. return source.subscribe(new ObserveOnSubscriber(subscriber, this.scheduler, this.delay));
  61. }
  62. }
  63. /**
  64. * We need this JSDoc comment for affecting ESDoc.
  65. * @ignore
  66. * @extends {Ignored}
  67. */
  68. export class ObserveOnSubscriber extends Subscriber {
  69. constructor(destination, scheduler, delay = 0) {
  70. super(destination);
  71. this.scheduler = scheduler;
  72. this.delay = delay;
  73. }
  74. static dispatch(arg) {
  75. const { notification, destination } = arg;
  76. notification.observe(destination);
  77. this.unsubscribe();
  78. }
  79. scheduleMessage(notification) {
  80. this.add(this.scheduler.schedule(ObserveOnSubscriber.dispatch, this.delay, new ObserveOnMessage(notification, this.destination)));
  81. }
  82. _next(value) {
  83. this.scheduleMessage(Notification.createNext(value));
  84. }
  85. _error(err) {
  86. this.scheduleMessage(Notification.createError(err));
  87. }
  88. _complete() {
  89. this.scheduleMessage(Notification.createComplete());
  90. }
  91. }
  92. export class ObserveOnMessage {
  93. constructor(notification, destination) {
  94. this.notification = notification;
  95. this.destination = destination;
  96. }
  97. }
  98. //# sourceMappingURL=observeOn.js.map