ConnectableObservable.js 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. import { SubjectSubscriber } from '../Subject';
  2. import { Observable } from '../Observable';
  3. import { Subscriber } from '../Subscriber';
  4. import { Subscription } from '../Subscription';
  5. import { refCount as higherOrderRefCount } from '../operators/refCount';
  6. /**
  7. * @class ConnectableObservable<T>
  8. */
  9. export class ConnectableObservable extends Observable {
  10. constructor(/** @deprecated internal use only */ source,
  11. /** @deprecated internal use only */ subjectFactory) {
  12. super();
  13. this.source = source;
  14. this.subjectFactory = subjectFactory;
  15. /** @deprecated internal use only */ this._refCount = 0;
  16. this._isComplete = false;
  17. }
  18. /** @deprecated internal use only */ _subscribe(subscriber) {
  19. return this.getSubject().subscribe(subscriber);
  20. }
  21. /** @deprecated internal use only */ getSubject() {
  22. const subject = this._subject;
  23. if (!subject || subject.isStopped) {
  24. this._subject = this.subjectFactory();
  25. }
  26. return this._subject;
  27. }
  28. connect() {
  29. let connection = this._connection;
  30. if (!connection) {
  31. this._isComplete = false;
  32. connection = this._connection = new Subscription();
  33. connection.add(this.source
  34. .subscribe(new ConnectableSubscriber(this.getSubject(), this)));
  35. if (connection.closed) {
  36. this._connection = null;
  37. connection = Subscription.EMPTY;
  38. }
  39. else {
  40. this._connection = connection;
  41. }
  42. }
  43. return connection;
  44. }
  45. refCount() {
  46. return higherOrderRefCount()(this);
  47. }
  48. }
  49. const connectableProto = ConnectableObservable.prototype;
  50. export const connectableObservableDescriptor = {
  51. operator: { value: null },
  52. _refCount: { value: 0, writable: true },
  53. _subject: { value: null, writable: true },
  54. _connection: { value: null, writable: true },
  55. _subscribe: { value: connectableProto._subscribe },
  56. _isComplete: { value: connectableProto._isComplete, writable: true },
  57. getSubject: { value: connectableProto.getSubject },
  58. connect: { value: connectableProto.connect },
  59. refCount: { value: connectableProto.refCount }
  60. };
  61. class ConnectableSubscriber extends SubjectSubscriber {
  62. constructor(destination, connectable) {
  63. super(destination);
  64. this.connectable = connectable;
  65. }
  66. _error(err) {
  67. this._unsubscribe();
  68. super._error(err);
  69. }
  70. _complete() {
  71. this.connectable._isComplete = true;
  72. this._unsubscribe();
  73. super._complete();
  74. }
  75. /** @deprecated internal use only */ _unsubscribe() {
  76. const connectable = this.connectable;
  77. if (connectable) {
  78. this.connectable = null;
  79. const connection = connectable._connection;
  80. connectable._refCount = 0;
  81. connectable._subject = null;
  82. connectable._connection = null;
  83. if (connection) {
  84. connection.unsubscribe();
  85. }
  86. }
  87. }
  88. }
  89. class RefCountOperator {
  90. constructor(connectable) {
  91. this.connectable = connectable;
  92. }
  93. call(subscriber, source) {
  94. const { connectable } = this;
  95. connectable._refCount++;
  96. const refCounter = new RefCountSubscriber(subscriber, connectable);
  97. const subscription = source.subscribe(refCounter);
  98. if (!refCounter.closed) {
  99. refCounter.connection = connectable.connect();
  100. }
  101. return subscription;
  102. }
  103. }
  104. class RefCountSubscriber extends Subscriber {
  105. constructor(destination, connectable) {
  106. super(destination);
  107. this.connectable = connectable;
  108. }
  109. /** @deprecated internal use only */ _unsubscribe() {
  110. const { connectable } = this;
  111. if (!connectable) {
  112. this.connection = null;
  113. return;
  114. }
  115. this.connectable = null;
  116. const refCount = connectable._refCount;
  117. if (refCount <= 0) {
  118. this.connection = null;
  119. return;
  120. }
  121. connectable._refCount = refCount - 1;
  122. if (refCount > 1) {
  123. this.connection = null;
  124. return;
  125. }
  126. ///
  127. // Compare the local RefCountSubscriber's connection Subscription to the
  128. // connection Subscription on the shared ConnectableObservable. In cases
  129. // where the ConnectableObservable source synchronously emits values, and
  130. // the RefCountSubscriber's downstream Observers synchronously unsubscribe,
  131. // execution continues to here before the RefCountOperator has a chance to
  132. // supply the RefCountSubscriber with the shared connection Subscription.
  133. // For example:
  134. // ```
  135. // Observable.range(0, 10)
  136. // .publish()
  137. // .refCount()
  138. // .take(5)
  139. // .subscribe();
  140. // ```
  141. // In order to account for this case, RefCountSubscriber should only dispose
  142. // the ConnectableObservable's shared connection Subscription if the
  143. // connection Subscription exists, *and* either:
  144. // a. RefCountSubscriber doesn't have a reference to the shared connection
  145. // Subscription yet, or,
  146. // b. RefCountSubscriber's connection Subscription reference is identical
  147. // to the shared connection Subscription
  148. ///
  149. const { connection } = this;
  150. const sharedConnection = connectable._connection;
  151. this.connection = null;
  152. if (sharedConnection && (!connection || sharedConnection === connection)) {
  153. sharedConnection.unsubscribe();
  154. }
  155. }
  156. }
  157. //# sourceMappingURL=ConnectableObservable.js.map