Subject.js 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. import { Observable } from './Observable';
  2. import { Subscriber } from './Subscriber';
  3. import { Subscription } from './Subscription';
  4. import { ObjectUnsubscribedError } from './util/ObjectUnsubscribedError';
  5. import { SubjectSubscription } from './SubjectSubscription';
  6. import { rxSubscriber as rxSubscriberSymbol } from '../internal/symbol/rxSubscriber';
  7. export class SubjectSubscriber extends Subscriber {
  8. constructor(destination) {
  9. super(destination);
  10. this.destination = destination;
  11. }
  12. }
  13. export class Subject extends Observable {
  14. constructor() {
  15. super();
  16. this.observers = [];
  17. this.closed = false;
  18. this.isStopped = false;
  19. this.hasError = false;
  20. this.thrownError = null;
  21. }
  22. [rxSubscriberSymbol]() {
  23. return new SubjectSubscriber(this);
  24. }
  25. lift(operator) {
  26. const subject = new AnonymousSubject(this, this);
  27. subject.operator = operator;
  28. return subject;
  29. }
  30. next(value) {
  31. if (this.closed) {
  32. throw new ObjectUnsubscribedError();
  33. }
  34. if (!this.isStopped) {
  35. const { observers } = this;
  36. const len = observers.length;
  37. const copy = observers.slice();
  38. for (let i = 0; i < len; i++) {
  39. copy[i].next(value);
  40. }
  41. }
  42. }
  43. error(err) {
  44. if (this.closed) {
  45. throw new ObjectUnsubscribedError();
  46. }
  47. this.hasError = true;
  48. this.thrownError = err;
  49. this.isStopped = true;
  50. const { observers } = this;
  51. const len = observers.length;
  52. const copy = observers.slice();
  53. for (let i = 0; i < len; i++) {
  54. copy[i].error(err);
  55. }
  56. this.observers.length = 0;
  57. }
  58. complete() {
  59. if (this.closed) {
  60. throw new ObjectUnsubscribedError();
  61. }
  62. this.isStopped = true;
  63. const { observers } = this;
  64. const len = observers.length;
  65. const copy = observers.slice();
  66. for (let i = 0; i < len; i++) {
  67. copy[i].complete();
  68. }
  69. this.observers.length = 0;
  70. }
  71. unsubscribe() {
  72. this.isStopped = true;
  73. this.closed = true;
  74. this.observers = null;
  75. }
  76. _trySubscribe(subscriber) {
  77. if (this.closed) {
  78. throw new ObjectUnsubscribedError();
  79. }
  80. else {
  81. return super._trySubscribe(subscriber);
  82. }
  83. }
  84. _subscribe(subscriber) {
  85. if (this.closed) {
  86. throw new ObjectUnsubscribedError();
  87. }
  88. else if (this.hasError) {
  89. subscriber.error(this.thrownError);
  90. return Subscription.EMPTY;
  91. }
  92. else if (this.isStopped) {
  93. subscriber.complete();
  94. return Subscription.EMPTY;
  95. }
  96. else {
  97. this.observers.push(subscriber);
  98. return new SubjectSubscription(this, subscriber);
  99. }
  100. }
  101. asObservable() {
  102. const observable = new Observable();
  103. observable.source = this;
  104. return observable;
  105. }
  106. }
  107. Subject.create = (destination, source) => {
  108. return new AnonymousSubject(destination, source);
  109. };
  110. export class AnonymousSubject extends Subject {
  111. constructor(destination, source) {
  112. super();
  113. this.destination = destination;
  114. this.source = source;
  115. }
  116. next(value) {
  117. const { destination } = this;
  118. if (destination && destination.next) {
  119. destination.next(value);
  120. }
  121. }
  122. error(err) {
  123. const { destination } = this;
  124. if (destination && destination.error) {
  125. this.destination.error(err);
  126. }
  127. }
  128. complete() {
  129. const { destination } = this;
  130. if (destination && destination.complete) {
  131. this.destination.complete();
  132. }
  133. }
  134. _subscribe(subscriber) {
  135. const { source } = this;
  136. if (source) {
  137. return this.source.subscribe(subscriber);
  138. }
  139. else {
  140. return Subscription.EMPTY;
  141. }
  142. }
  143. }
  144. //# sourceMappingURL=Subject.js.map