123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164 |
- import { Subscriber } from '../Subscriber';
- import { Subscription } from '../Subscription';
- import { Observable } from '../Observable';
- import { Subject } from '../Subject';
- export function groupBy(keySelector, elementSelector, durationSelector, subjectSelector) {
- return (source) => source.lift(new GroupByOperator(keySelector, elementSelector, durationSelector, subjectSelector));
- }
- class GroupByOperator {
- constructor(keySelector, elementSelector, durationSelector, subjectSelector) {
- this.keySelector = keySelector;
- this.elementSelector = elementSelector;
- this.durationSelector = durationSelector;
- this.subjectSelector = subjectSelector;
- }
- call(subscriber, source) {
- return source.subscribe(new GroupBySubscriber(subscriber, this.keySelector, this.elementSelector, this.durationSelector, this.subjectSelector));
- }
- }
- class GroupBySubscriber extends Subscriber {
- constructor(destination, keySelector, elementSelector, durationSelector, subjectSelector) {
- super(destination);
- this.keySelector = keySelector;
- this.elementSelector = elementSelector;
- this.durationSelector = durationSelector;
- this.subjectSelector = subjectSelector;
- this.groups = null;
- this.attemptedToUnsubscribe = false;
- this.count = 0;
- }
- _next(value) {
- let key;
- try {
- key = this.keySelector(value);
- }
- catch (err) {
- this.error(err);
- return;
- }
- this._group(value, key);
- }
- _group(value, key) {
- let groups = this.groups;
- if (!groups) {
- groups = this.groups = new Map();
- }
- let group = groups.get(key);
- let element;
- if (this.elementSelector) {
- try {
- element = this.elementSelector(value);
- }
- catch (err) {
- this.error(err);
- }
- }
- else {
- element = value;
- }
- if (!group) {
- group = (this.subjectSelector ? this.subjectSelector() : new Subject());
- groups.set(key, group);
- const groupedObservable = new GroupedObservable(key, group, this);
- this.destination.next(groupedObservable);
- if (this.durationSelector) {
- let duration;
- try {
- duration = this.durationSelector(new GroupedObservable(key, group));
- }
- catch (err) {
- this.error(err);
- return;
- }
- this.add(duration.subscribe(new GroupDurationSubscriber(key, group, this)));
- }
- }
- if (!group.closed) {
- group.next(element);
- }
- }
- _error(err) {
- const groups = this.groups;
- if (groups) {
- groups.forEach((group, key) => {
- group.error(err);
- });
- groups.clear();
- }
- this.destination.error(err);
- }
- _complete() {
- const groups = this.groups;
- if (groups) {
- groups.forEach((group, key) => {
- group.complete();
- });
- groups.clear();
- }
- this.destination.complete();
- }
- removeGroup(key) {
- this.groups.delete(key);
- }
- unsubscribe() {
- if (!this.closed) {
- this.attemptedToUnsubscribe = true;
- if (this.count === 0) {
- super.unsubscribe();
- }
- }
- }
- }
- class GroupDurationSubscriber extends Subscriber {
- constructor(key, group, parent) {
- super(group);
- this.key = key;
- this.group = group;
- this.parent = parent;
- }
- _next(value) {
- this.complete();
- }
- _unsubscribe() {
- const { parent, key } = this;
- this.key = this.parent = null;
- if (parent) {
- parent.removeGroup(key);
- }
- }
- }
- export class GroupedObservable extends Observable {
- constructor(key, groupSubject, refCountSubscription) {
- super();
- this.key = key;
- this.groupSubject = groupSubject;
- this.refCountSubscription = refCountSubscription;
- }
- _subscribe(subscriber) {
- const subscription = new Subscription();
- const { refCountSubscription, groupSubject } = this;
- if (refCountSubscription && !refCountSubscription.closed) {
- subscription.add(new InnerRefCountSubscription(refCountSubscription));
- }
- subscription.add(groupSubject.subscribe(subscriber));
- return subscription;
- }
- }
- class InnerRefCountSubscription extends Subscription {
- constructor(parent) {
- super();
- this.parent = parent;
- parent.count++;
- }
- unsubscribe() {
- const parent = this.parent;
- if (!parent.closed && !this.closed) {
- super.unsubscribe();
- parent.count -= 1;
- if (parent.count === 0 && parent.attemptedToUnsubscribe) {
- parent.unsubscribe();
- }
- }
- }
- }
- //# sourceMappingURL=groupBy.js.map
|