refCount.js 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. import { Subscriber } from '../Subscriber';
  2. export function refCount() {
  3. return function refCountOperatorFunction(source) {
  4. return source.lift(new RefCountOperator(source));
  5. };
  6. }
  7. class RefCountOperator {
  8. constructor(connectable) {
  9. this.connectable = connectable;
  10. }
  11. call(subscriber, source) {
  12. const { connectable } = this;
  13. connectable._refCount++;
  14. const refCounter = new RefCountSubscriber(subscriber, connectable);
  15. const subscription = source.subscribe(refCounter);
  16. if (!refCounter.closed) {
  17. refCounter.connection = connectable.connect();
  18. }
  19. return subscription;
  20. }
  21. }
  22. class RefCountSubscriber extends Subscriber {
  23. constructor(destination, connectable) {
  24. super(destination);
  25. this.connectable = connectable;
  26. }
  27. /** @deprecated internal use only */ _unsubscribe() {
  28. const { connectable } = this;
  29. if (!connectable) {
  30. this.connection = null;
  31. return;
  32. }
  33. this.connectable = null;
  34. const refCount = connectable._refCount;
  35. if (refCount <= 0) {
  36. this.connection = null;
  37. return;
  38. }
  39. connectable._refCount = refCount - 1;
  40. if (refCount > 1) {
  41. this.connection = null;
  42. return;
  43. }
  44. ///
  45. // Compare the local RefCountSubscriber's connection Subscription to the
  46. // connection Subscription on the shared ConnectableObservable. In cases
  47. // where the ConnectableObservable source synchronously emits values, and
  48. // the RefCountSubscriber's downstream Observers synchronously unsubscribe,
  49. // execution continues to here before the RefCountOperator has a chance to
  50. // supply the RefCountSubscriber with the shared connection Subscription.
  51. // For example:
  52. // ```
  53. // Observable.range(0, 10)
  54. // .publish()
  55. // .refCount()
  56. // .take(5)
  57. // .subscribe();
  58. // ```
  59. // In order to account for this case, RefCountSubscriber should only dispose
  60. // the ConnectableObservable's shared connection Subscription if the
  61. // connection Subscription exists, *and* either:
  62. // a. RefCountSubscriber doesn't have a reference to the shared connection
  63. // Subscription yet, or,
  64. // b. RefCountSubscriber's connection Subscription reference is identical
  65. // to the shared connection Subscription
  66. ///
  67. const { connection } = this;
  68. const sharedConnection = connectable._connection;
  69. this.connection = null;
  70. if (sharedConnection && (!connection || sharedConnection === connection)) {
  71. sharedConnection.unsubscribe();
  72. }
  73. }
  74. }
  75. //# sourceMappingURL=refCount.js.map