exhaustMap.js 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. import { map } from './map';
  2. import { from } from '../observable/from';
  3. import { SimpleOuterSubscriber, SimpleInnerSubscriber, innerSubscribe } from '../innerSubscribe';
  4. export function exhaustMap(project, resultSelector) {
  5. if (resultSelector) {
  6. return (source) => source.pipe(exhaustMap((a, i) => from(project(a, i)).pipe(map((b, ii) => resultSelector(a, b, i, ii)))));
  7. }
  8. return (source) => source.lift(new ExhaustMapOperator(project));
  9. }
  10. class ExhaustMapOperator {
  11. constructor(project) {
  12. this.project = project;
  13. }
  14. call(subscriber, source) {
  15. return source.subscribe(new ExhaustMapSubscriber(subscriber, this.project));
  16. }
  17. }
  18. class ExhaustMapSubscriber extends SimpleOuterSubscriber {
  19. constructor(destination, project) {
  20. super(destination);
  21. this.project = project;
  22. this.hasSubscription = false;
  23. this.hasCompleted = false;
  24. this.index = 0;
  25. }
  26. _next(value) {
  27. if (!this.hasSubscription) {
  28. this.tryNext(value);
  29. }
  30. }
  31. tryNext(value) {
  32. let result;
  33. const index = this.index++;
  34. try {
  35. result = this.project(value, index);
  36. }
  37. catch (err) {
  38. this.destination.error(err);
  39. return;
  40. }
  41. this.hasSubscription = true;
  42. this._innerSub(result);
  43. }
  44. _innerSub(result) {
  45. const innerSubscriber = new SimpleInnerSubscriber(this);
  46. const destination = this.destination;
  47. destination.add(innerSubscriber);
  48. const innerSubscription = innerSubscribe(result, innerSubscriber);
  49. if (innerSubscription !== innerSubscriber) {
  50. destination.add(innerSubscription);
  51. }
  52. }
  53. _complete() {
  54. this.hasCompleted = true;
  55. if (!this.hasSubscription) {
  56. this.destination.complete();
  57. }
  58. this.unsubscribe();
  59. }
  60. notifyNext(innerValue) {
  61. this.destination.next(innerValue);
  62. }
  63. notifyError(err) {
  64. this.destination.error(err);
  65. }
  66. notifyComplete() {
  67. this.hasSubscription = false;
  68. if (this.hasCompleted) {
  69. this.destination.complete();
  70. }
  71. }
  72. }
  73. //# sourceMappingURL=exhaustMap.js.map