groupBy.js 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. import { Subscriber } from '../Subscriber';
  2. import { Subscription } from '../Subscription';
  3. import { Observable } from '../Observable';
  4. import { Subject } from '../Subject';
  5. import { Map } from '../util/Map';
  6. import { FastMap } from '../util/FastMap';
  7. /* tslint:enable:max-line-length */
  8. /**
  9. * Groups the items emitted by an Observable according to a specified criterion,
  10. * and emits these grouped items as `GroupedObservables`, one
  11. * {@link GroupedObservable} per group.
  12. *
  13. * <img src="./img/groupBy.png" width="100%">
  14. *
  15. * @example <caption>Group objects by id and return as array</caption>
  16. * Observable.of<Obj>({id: 1, name: 'aze1'},
  17. * {id: 2, name: 'sf2'},
  18. * {id: 2, name: 'dg2'},
  19. * {id: 1, name: 'erg1'},
  20. * {id: 1, name: 'df1'},
  21. * {id: 2, name: 'sfqfb2'},
  22. * {id: 3, name: 'qfs3'},
  23. * {id: 2, name: 'qsgqsfg2'}
  24. * )
  25. * .groupBy(p => p.id)
  26. * .flatMap( (group$) => group$.reduce((acc, cur) => [...acc, cur], []))
  27. * .subscribe(p => console.log(p));
  28. *
  29. * // displays:
  30. * // [ { id: 1, name: 'aze1' },
  31. * // { id: 1, name: 'erg1' },
  32. * // { id: 1, name: 'df1' } ]
  33. * //
  34. * // [ { id: 2, name: 'sf2' },
  35. * // { id: 2, name: 'dg2' },
  36. * // { id: 2, name: 'sfqfb2' },
  37. * // { id: 2, name: 'qsgqsfg2' } ]
  38. * //
  39. * // [ { id: 3, name: 'qfs3' } ]
  40. *
  41. * @example <caption>Pivot data on the id field</caption>
  42. * Observable.of<Obj>({id: 1, name: 'aze1'},
  43. * {id: 2, name: 'sf2'},
  44. * {id: 2, name: 'dg2'},
  45. * {id: 1, name: 'erg1'},
  46. * {id: 1, name: 'df1'},
  47. * {id: 2, name: 'sfqfb2'},
  48. * {id: 3, name: 'qfs1'},
  49. * {id: 2, name: 'qsgqsfg2'}
  50. * )
  51. * .groupBy(p => p.id, p => p.name)
  52. * .flatMap( (group$) => group$.reduce((acc, cur) => [...acc, cur], ["" + group$.key]))
  53. * .map(arr => ({'id': parseInt(arr[0]), 'values': arr.slice(1)}))
  54. * .subscribe(p => console.log(p));
  55. *
  56. * // displays:
  57. * // { id: 1, values: [ 'aze1', 'erg1', 'df1' ] }
  58. * // { id: 2, values: [ 'sf2', 'dg2', 'sfqfb2', 'qsgqsfg2' ] }
  59. * // { id: 3, values: [ 'qfs1' ] }
  60. *
  61. * @param {function(value: T): K} keySelector A function that extracts the key
  62. * for each item.
  63. * @param {function(value: T): R} [elementSelector] A function that extracts the
  64. * return element for each item.
  65. * @param {function(grouped: GroupedObservable<K,R>): Observable<any>} [durationSelector]
  66. * A function that returns an Observable to determine how long each group should
  67. * exist.
  68. * @return {Observable<GroupedObservable<K,R>>} An Observable that emits
  69. * GroupedObservables, each of which corresponds to a unique key value and each
  70. * of which emits those items from the source Observable that share that key
  71. * value.
  72. * @method groupBy
  73. * @owner Observable
  74. */
  75. export function groupBy(keySelector, elementSelector, durationSelector, subjectSelector) {
  76. return (source) => source.lift(new GroupByOperator(keySelector, elementSelector, durationSelector, subjectSelector));
  77. }
  78. class GroupByOperator {
  79. constructor(keySelector, elementSelector, durationSelector, subjectSelector) {
  80. this.keySelector = keySelector;
  81. this.elementSelector = elementSelector;
  82. this.durationSelector = durationSelector;
  83. this.subjectSelector = subjectSelector;
  84. }
  85. call(subscriber, source) {
  86. return source.subscribe(new GroupBySubscriber(subscriber, this.keySelector, this.elementSelector, this.durationSelector, this.subjectSelector));
  87. }
  88. }
  89. /**
  90. * We need this JSDoc comment for affecting ESDoc.
  91. * @ignore
  92. * @extends {Ignored}
  93. */
  94. class GroupBySubscriber extends Subscriber {
  95. constructor(destination, keySelector, elementSelector, durationSelector, subjectSelector) {
  96. super(destination);
  97. this.keySelector = keySelector;
  98. this.elementSelector = elementSelector;
  99. this.durationSelector = durationSelector;
  100. this.subjectSelector = subjectSelector;
  101. this.groups = null;
  102. this.attemptedToUnsubscribe = false;
  103. this.count = 0;
  104. }
  105. _next(value) {
  106. let key;
  107. try {
  108. key = this.keySelector(value);
  109. }
  110. catch (err) {
  111. this.error(err);
  112. return;
  113. }
  114. this._group(value, key);
  115. }
  116. _group(value, key) {
  117. let groups = this.groups;
  118. if (!groups) {
  119. groups = this.groups = typeof key === 'string' ? new FastMap() : new Map();
  120. }
  121. let group = groups.get(key);
  122. let element;
  123. if (this.elementSelector) {
  124. try {
  125. element = this.elementSelector(value);
  126. }
  127. catch (err) {
  128. this.error(err);
  129. }
  130. }
  131. else {
  132. element = value;
  133. }
  134. if (!group) {
  135. group = this.subjectSelector ? this.subjectSelector() : new Subject();
  136. groups.set(key, group);
  137. const groupedObservable = new GroupedObservable(key, group, this);
  138. this.destination.next(groupedObservable);
  139. if (this.durationSelector) {
  140. let duration;
  141. try {
  142. duration = this.durationSelector(new GroupedObservable(key, group));
  143. }
  144. catch (err) {
  145. this.error(err);
  146. return;
  147. }
  148. this.add(duration.subscribe(new GroupDurationSubscriber(key, group, this)));
  149. }
  150. }
  151. if (!group.closed) {
  152. group.next(element);
  153. }
  154. }
  155. _error(err) {
  156. const groups = this.groups;
  157. if (groups) {
  158. groups.forEach((group, key) => {
  159. group.error(err);
  160. });
  161. groups.clear();
  162. }
  163. this.destination.error(err);
  164. }
  165. _complete() {
  166. const groups = this.groups;
  167. if (groups) {
  168. groups.forEach((group, key) => {
  169. group.complete();
  170. });
  171. groups.clear();
  172. }
  173. this.destination.complete();
  174. }
  175. removeGroup(key) {
  176. this.groups.delete(key);
  177. }
  178. unsubscribe() {
  179. if (!this.closed) {
  180. this.attemptedToUnsubscribe = true;
  181. if (this.count === 0) {
  182. super.unsubscribe();
  183. }
  184. }
  185. }
  186. }
  187. /**
  188. * We need this JSDoc comment for affecting ESDoc.
  189. * @ignore
  190. * @extends {Ignored}
  191. */
  192. class GroupDurationSubscriber extends Subscriber {
  193. constructor(key, group, parent) {
  194. super(group);
  195. this.key = key;
  196. this.group = group;
  197. this.parent = parent;
  198. }
  199. _next(value) {
  200. this.complete();
  201. }
  202. /** @deprecated internal use only */ _unsubscribe() {
  203. const { parent, key } = this;
  204. this.key = this.parent = null;
  205. if (parent) {
  206. parent.removeGroup(key);
  207. }
  208. }
  209. }
  210. /**
  211. * An Observable representing values belonging to the same group represented by
  212. * a common key. The values emitted by a GroupedObservable come from the source
  213. * Observable. The common key is available as the field `key` on a
  214. * GroupedObservable instance.
  215. *
  216. * @class GroupedObservable<K, T>
  217. */
  218. export class GroupedObservable extends Observable {
  219. constructor(key, groupSubject, refCountSubscription) {
  220. super();
  221. this.key = key;
  222. this.groupSubject = groupSubject;
  223. this.refCountSubscription = refCountSubscription;
  224. }
  225. /** @deprecated internal use only */ _subscribe(subscriber) {
  226. const subscription = new Subscription();
  227. const { refCountSubscription, groupSubject } = this;
  228. if (refCountSubscription && !refCountSubscription.closed) {
  229. subscription.add(new InnerRefCountSubscription(refCountSubscription));
  230. }
  231. subscription.add(groupSubject.subscribe(subscriber));
  232. return subscription;
  233. }
  234. }
  235. /**
  236. * We need this JSDoc comment for affecting ESDoc.
  237. * @ignore
  238. * @extends {Ignored}
  239. */
  240. class InnerRefCountSubscription extends Subscription {
  241. constructor(parent) {
  242. super();
  243. this.parent = parent;
  244. parent.count++;
  245. }
  246. unsubscribe() {
  247. const parent = this.parent;
  248. if (!parent.closed && !this.closed) {
  249. super.unsubscribe();
  250. parent.count -= 1;
  251. if (parent.count === 0 && parent.attemptedToUnsubscribe) {
  252. parent.unsubscribe();
  253. }
  254. }
  255. }
  256. }
  257. //# sourceMappingURL=groupBy.js.map