Subject.ts 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. import { Operator } from './Operator';
  2. import { Observer } from './Observer';
  3. import { Observable } from './Observable';
  4. import { Subscriber } from './Subscriber';
  5. import { ISubscription, Subscription, TeardownLogic } from './Subscription';
  6. import { ObjectUnsubscribedError } from './util/ObjectUnsubscribedError';
  7. import { SubjectSubscription } from './SubjectSubscription';
  8. import { rxSubscriber as rxSubscriberSymbol } from './symbol/rxSubscriber';
  9. /**
  10. * @class SubjectSubscriber<T>
  11. */
  12. export class SubjectSubscriber<T> extends Subscriber<T> {
  13. constructor(protected destination: Subject<T>) {
  14. super(destination);
  15. }
  16. }
  17. /**
  18. * @class Subject<T>
  19. */
  20. export class Subject<T> extends Observable<T> implements ISubscription {
  21. [rxSubscriberSymbol]() {
  22. return new SubjectSubscriber(this);
  23. }
  24. observers: Observer<T>[] = [];
  25. closed = false;
  26. isStopped = false;
  27. hasError = false;
  28. thrownError: any = null;
  29. constructor() {
  30. super();
  31. }
  32. static create: Function = <T>(destination: Observer<T>, source: Observable<T>): AnonymousSubject<T> => {
  33. return new AnonymousSubject<T>(destination, source);
  34. }
  35. lift<R>(operator: Operator<T, R>): Observable<R> {
  36. const subject = new AnonymousSubject(this, this);
  37. subject.operator = <any>operator;
  38. return <any>subject;
  39. }
  40. next(value?: T) {
  41. if (this.closed) {
  42. throw new ObjectUnsubscribedError();
  43. }
  44. if (!this.isStopped) {
  45. const { observers } = this;
  46. const len = observers.length;
  47. const copy = observers.slice();
  48. for (let i = 0; i < len; i++) {
  49. copy[i].next(value);
  50. }
  51. }
  52. }
  53. error(err: any) {
  54. if (this.closed) {
  55. throw new ObjectUnsubscribedError();
  56. }
  57. this.hasError = true;
  58. this.thrownError = err;
  59. this.isStopped = true;
  60. const { observers } = this;
  61. const len = observers.length;
  62. const copy = observers.slice();
  63. for (let i = 0; i < len; i++) {
  64. copy[i].error(err);
  65. }
  66. this.observers.length = 0;
  67. }
  68. complete() {
  69. if (this.closed) {
  70. throw new ObjectUnsubscribedError();
  71. }
  72. this.isStopped = true;
  73. const { observers } = this;
  74. const len = observers.length;
  75. const copy = observers.slice();
  76. for (let i = 0; i < len; i++) {
  77. copy[i].complete();
  78. }
  79. this.observers.length = 0;
  80. }
  81. unsubscribe() {
  82. this.isStopped = true;
  83. this.closed = true;
  84. this.observers = null;
  85. }
  86. protected _trySubscribe(subscriber: Subscriber<T>): TeardownLogic {
  87. if (this.closed) {
  88. throw new ObjectUnsubscribedError();
  89. } else {
  90. return super._trySubscribe(subscriber);
  91. }
  92. }
  93. /** @deprecated internal use only */ _subscribe(subscriber: Subscriber<T>): Subscription {
  94. if (this.closed) {
  95. throw new ObjectUnsubscribedError();
  96. } else if (this.hasError) {
  97. subscriber.error(this.thrownError);
  98. return Subscription.EMPTY;
  99. } else if (this.isStopped) {
  100. subscriber.complete();
  101. return Subscription.EMPTY;
  102. } else {
  103. this.observers.push(subscriber);
  104. return new SubjectSubscription(this, subscriber);
  105. }
  106. }
  107. asObservable(): Observable<T> {
  108. const observable = new Observable<T>();
  109. (<any>observable).source = this;
  110. return observable;
  111. }
  112. }
  113. /**
  114. * @class AnonymousSubject<T>
  115. */
  116. export class AnonymousSubject<T> extends Subject<T> {
  117. constructor(protected destination?: Observer<T>, source?: Observable<T>) {
  118. super();
  119. this.source = source;
  120. }
  121. next(value: T) {
  122. const { destination } = this;
  123. if (destination && destination.next) {
  124. destination.next(value);
  125. }
  126. }
  127. error(err: any) {
  128. const { destination } = this;
  129. if (destination && destination.error) {
  130. this.destination.error(err);
  131. }
  132. }
  133. complete() {
  134. const { destination } = this;
  135. if (destination && destination.complete) {
  136. this.destination.complete();
  137. }
  138. }
  139. /** @deprecated internal use only */ _subscribe(subscriber: Subscriber<T>): Subscription {
  140. const { source } = this;
  141. if (source) {
  142. return this.source.subscribe(subscriber);
  143. } else {
  144. return Subscription.EMPTY;
  145. }
  146. }
  147. }