Subject.js 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. import { Observable } from './Observable';
  2. import { Subscriber } from './Subscriber';
  3. import { Subscription } from './Subscription';
  4. import { ObjectUnsubscribedError } from './util/ObjectUnsubscribedError';
  5. import { SubjectSubscription } from './SubjectSubscription';
  6. import { rxSubscriber as rxSubscriberSymbol } from './symbol/rxSubscriber';
  7. /**
  8. * @class SubjectSubscriber<T>
  9. */
  10. export class SubjectSubscriber extends Subscriber {
  11. constructor(destination) {
  12. super(destination);
  13. this.destination = destination;
  14. }
  15. }
  16. /**
  17. * @class Subject<T>
  18. */
  19. export class Subject extends Observable {
  20. constructor() {
  21. super();
  22. this.observers = [];
  23. this.closed = false;
  24. this.isStopped = false;
  25. this.hasError = false;
  26. this.thrownError = null;
  27. }
  28. [rxSubscriberSymbol]() {
  29. return new SubjectSubscriber(this);
  30. }
  31. lift(operator) {
  32. const subject = new AnonymousSubject(this, this);
  33. subject.operator = operator;
  34. return subject;
  35. }
  36. next(value) {
  37. if (this.closed) {
  38. throw new ObjectUnsubscribedError();
  39. }
  40. if (!this.isStopped) {
  41. const { observers } = this;
  42. const len = observers.length;
  43. const copy = observers.slice();
  44. for (let i = 0; i < len; i++) {
  45. copy[i].next(value);
  46. }
  47. }
  48. }
  49. error(err) {
  50. if (this.closed) {
  51. throw new ObjectUnsubscribedError();
  52. }
  53. this.hasError = true;
  54. this.thrownError = err;
  55. this.isStopped = true;
  56. const { observers } = this;
  57. const len = observers.length;
  58. const copy = observers.slice();
  59. for (let i = 0; i < len; i++) {
  60. copy[i].error(err);
  61. }
  62. this.observers.length = 0;
  63. }
  64. complete() {
  65. if (this.closed) {
  66. throw new ObjectUnsubscribedError();
  67. }
  68. this.isStopped = true;
  69. const { observers } = this;
  70. const len = observers.length;
  71. const copy = observers.slice();
  72. for (let i = 0; i < len; i++) {
  73. copy[i].complete();
  74. }
  75. this.observers.length = 0;
  76. }
  77. unsubscribe() {
  78. this.isStopped = true;
  79. this.closed = true;
  80. this.observers = null;
  81. }
  82. _trySubscribe(subscriber) {
  83. if (this.closed) {
  84. throw new ObjectUnsubscribedError();
  85. }
  86. else {
  87. return super._trySubscribe(subscriber);
  88. }
  89. }
  90. /** @deprecated internal use only */ _subscribe(subscriber) {
  91. if (this.closed) {
  92. throw new ObjectUnsubscribedError();
  93. }
  94. else if (this.hasError) {
  95. subscriber.error(this.thrownError);
  96. return Subscription.EMPTY;
  97. }
  98. else if (this.isStopped) {
  99. subscriber.complete();
  100. return Subscription.EMPTY;
  101. }
  102. else {
  103. this.observers.push(subscriber);
  104. return new SubjectSubscription(this, subscriber);
  105. }
  106. }
  107. asObservable() {
  108. const observable = new Observable();
  109. observable.source = this;
  110. return observable;
  111. }
  112. }
  113. Subject.create = (destination, source) => {
  114. return new AnonymousSubject(destination, source);
  115. };
  116. /**
  117. * @class AnonymousSubject<T>
  118. */
  119. export class AnonymousSubject extends Subject {
  120. constructor(destination, source) {
  121. super();
  122. this.destination = destination;
  123. this.source = source;
  124. }
  125. next(value) {
  126. const { destination } = this;
  127. if (destination && destination.next) {
  128. destination.next(value);
  129. }
  130. }
  131. error(err) {
  132. const { destination } = this;
  133. if (destination && destination.error) {
  134. this.destination.error(err);
  135. }
  136. }
  137. complete() {
  138. const { destination } = this;
  139. if (destination && destination.complete) {
  140. this.destination.complete();
  141. }
  142. }
  143. /** @deprecated internal use only */ _subscribe(subscriber) {
  144. const { source } = this;
  145. if (source) {
  146. return this.source.subscribe(subscriber);
  147. }
  148. else {
  149. return Subscription.EMPTY;
  150. }
  151. }
  152. }
  153. //# sourceMappingURL=Subject.js.map