groupBy.js 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. import { Subscriber } from '../Subscriber';
  2. import { Subscription } from '../Subscription';
  3. import { Observable } from '../Observable';
  4. import { Subject } from '../Subject';
  5. export function groupBy(keySelector, elementSelector, durationSelector, subjectSelector) {
  6. return (source) => source.lift(new GroupByOperator(keySelector, elementSelector, durationSelector, subjectSelector));
  7. }
  8. class GroupByOperator {
  9. constructor(keySelector, elementSelector, durationSelector, subjectSelector) {
  10. this.keySelector = keySelector;
  11. this.elementSelector = elementSelector;
  12. this.durationSelector = durationSelector;
  13. this.subjectSelector = subjectSelector;
  14. }
  15. call(subscriber, source) {
  16. return source.subscribe(new GroupBySubscriber(subscriber, this.keySelector, this.elementSelector, this.durationSelector, this.subjectSelector));
  17. }
  18. }
  19. class GroupBySubscriber extends Subscriber {
  20. constructor(destination, keySelector, elementSelector, durationSelector, subjectSelector) {
  21. super(destination);
  22. this.keySelector = keySelector;
  23. this.elementSelector = elementSelector;
  24. this.durationSelector = durationSelector;
  25. this.subjectSelector = subjectSelector;
  26. this.groups = null;
  27. this.attemptedToUnsubscribe = false;
  28. this.count = 0;
  29. }
  30. _next(value) {
  31. let key;
  32. try {
  33. key = this.keySelector(value);
  34. }
  35. catch (err) {
  36. this.error(err);
  37. return;
  38. }
  39. this._group(value, key);
  40. }
  41. _group(value, key) {
  42. let groups = this.groups;
  43. if (!groups) {
  44. groups = this.groups = new Map();
  45. }
  46. let group = groups.get(key);
  47. let element;
  48. if (this.elementSelector) {
  49. try {
  50. element = this.elementSelector(value);
  51. }
  52. catch (err) {
  53. this.error(err);
  54. }
  55. }
  56. else {
  57. element = value;
  58. }
  59. if (!group) {
  60. group = (this.subjectSelector ? this.subjectSelector() : new Subject());
  61. groups.set(key, group);
  62. const groupedObservable = new GroupedObservable(key, group, this);
  63. this.destination.next(groupedObservable);
  64. if (this.durationSelector) {
  65. let duration;
  66. try {
  67. duration = this.durationSelector(new GroupedObservable(key, group));
  68. }
  69. catch (err) {
  70. this.error(err);
  71. return;
  72. }
  73. this.add(duration.subscribe(new GroupDurationSubscriber(key, group, this)));
  74. }
  75. }
  76. if (!group.closed) {
  77. group.next(element);
  78. }
  79. }
  80. _error(err) {
  81. const groups = this.groups;
  82. if (groups) {
  83. groups.forEach((group, key) => {
  84. group.error(err);
  85. });
  86. groups.clear();
  87. }
  88. this.destination.error(err);
  89. }
  90. _complete() {
  91. const groups = this.groups;
  92. if (groups) {
  93. groups.forEach((group, key) => {
  94. group.complete();
  95. });
  96. groups.clear();
  97. }
  98. this.destination.complete();
  99. }
  100. removeGroup(key) {
  101. this.groups.delete(key);
  102. }
  103. unsubscribe() {
  104. if (!this.closed) {
  105. this.attemptedToUnsubscribe = true;
  106. if (this.count === 0) {
  107. super.unsubscribe();
  108. }
  109. }
  110. }
  111. }
  112. class GroupDurationSubscriber extends Subscriber {
  113. constructor(key, group, parent) {
  114. super(group);
  115. this.key = key;
  116. this.group = group;
  117. this.parent = parent;
  118. }
  119. _next(value) {
  120. this.complete();
  121. }
  122. _unsubscribe() {
  123. const { parent, key } = this;
  124. this.key = this.parent = null;
  125. if (parent) {
  126. parent.removeGroup(key);
  127. }
  128. }
  129. }
  130. export class GroupedObservable extends Observable {
  131. constructor(key, groupSubject, refCountSubscription) {
  132. super();
  133. this.key = key;
  134. this.groupSubject = groupSubject;
  135. this.refCountSubscription = refCountSubscription;
  136. }
  137. _subscribe(subscriber) {
  138. const subscription = new Subscription();
  139. const { refCountSubscription, groupSubject } = this;
  140. if (refCountSubscription && !refCountSubscription.closed) {
  141. subscription.add(new InnerRefCountSubscription(refCountSubscription));
  142. }
  143. subscription.add(groupSubject.subscribe(subscriber));
  144. return subscription;
  145. }
  146. }
  147. class InnerRefCountSubscription extends Subscription {
  148. constructor(parent) {
  149. super();
  150. this.parent = parent;
  151. parent.count++;
  152. }
  153. unsubscribe() {
  154. const parent = this.parent;
  155. if (!parent.closed && !this.closed) {
  156. super.unsubscribe();
  157. parent.count -= 1;
  158. if (parent.count === 0 && parent.attemptedToUnsubscribe) {
  159. parent.unsubscribe();
  160. }
  161. }
  162. }
  163. }
  164. //# sourceMappingURL=groupBy.js.map