ReplaySubject.js 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. import { Subject } from './Subject';
  2. import { queue } from './scheduler/queue';
  3. import { Subscription } from './Subscription';
  4. import { ObserveOnSubscriber } from './operators/observeOn';
  5. import { ObjectUnsubscribedError } from './util/ObjectUnsubscribedError';
  6. import { SubjectSubscription } from './SubjectSubscription';
  7. /**
  8. * @class ReplaySubject<T>
  9. */
  10. export class ReplaySubject extends Subject {
  11. constructor(bufferSize = Number.POSITIVE_INFINITY, windowTime = Number.POSITIVE_INFINITY, scheduler) {
  12. super();
  13. this.scheduler = scheduler;
  14. this._events = [];
  15. this._bufferSize = bufferSize < 1 ? 1 : bufferSize;
  16. this._windowTime = windowTime < 1 ? 1 : windowTime;
  17. }
  18. next(value) {
  19. const now = this._getNow();
  20. this._events.push(new ReplayEvent(now, value));
  21. this._trimBufferThenGetEvents();
  22. super.next(value);
  23. }
  24. /** @deprecated internal use only */ _subscribe(subscriber) {
  25. const _events = this._trimBufferThenGetEvents();
  26. const scheduler = this.scheduler;
  27. let subscription;
  28. if (this.closed) {
  29. throw new ObjectUnsubscribedError();
  30. }
  31. else if (this.hasError) {
  32. subscription = Subscription.EMPTY;
  33. }
  34. else if (this.isStopped) {
  35. subscription = Subscription.EMPTY;
  36. }
  37. else {
  38. this.observers.push(subscriber);
  39. subscription = new SubjectSubscription(this, subscriber);
  40. }
  41. if (scheduler) {
  42. subscriber.add(subscriber = new ObserveOnSubscriber(subscriber, scheduler));
  43. }
  44. const len = _events.length;
  45. for (let i = 0; i < len && !subscriber.closed; i++) {
  46. subscriber.next(_events[i].value);
  47. }
  48. if (this.hasError) {
  49. subscriber.error(this.thrownError);
  50. }
  51. else if (this.isStopped) {
  52. subscriber.complete();
  53. }
  54. return subscription;
  55. }
  56. _getNow() {
  57. return (this.scheduler || queue).now();
  58. }
  59. _trimBufferThenGetEvents() {
  60. const now = this._getNow();
  61. const _bufferSize = this._bufferSize;
  62. const _windowTime = this._windowTime;
  63. const _events = this._events;
  64. let eventsCount = _events.length;
  65. let spliceCount = 0;
  66. // Trim events that fall out of the time window.
  67. // Start at the front of the list. Break early once
  68. // we encounter an event that falls within the window.
  69. while (spliceCount < eventsCount) {
  70. if ((now - _events[spliceCount].time) < _windowTime) {
  71. break;
  72. }
  73. spliceCount++;
  74. }
  75. if (eventsCount > _bufferSize) {
  76. spliceCount = Math.max(spliceCount, eventsCount - _bufferSize);
  77. }
  78. if (spliceCount > 0) {
  79. _events.splice(0, spliceCount);
  80. }
  81. return _events;
  82. }
  83. }
  84. class ReplayEvent {
  85. constructor(time, value) {
  86. this.time = time;
  87. this.value = value;
  88. }
  89. }
  90. //# sourceMappingURL=ReplaySubject.js.map