bufferToggle.js 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. import { Subscription } from '../Subscription';
  2. import { subscribeToResult } from '../util/subscribeToResult';
  3. import { OuterSubscriber } from '../OuterSubscriber';
  4. export function bufferToggle(openings, closingSelector) {
  5. return function bufferToggleOperatorFunction(source) {
  6. return source.lift(new BufferToggleOperator(openings, closingSelector));
  7. };
  8. }
  9. class BufferToggleOperator {
  10. constructor(openings, closingSelector) {
  11. this.openings = openings;
  12. this.closingSelector = closingSelector;
  13. }
  14. call(subscriber, source) {
  15. return source.subscribe(new BufferToggleSubscriber(subscriber, this.openings, this.closingSelector));
  16. }
  17. }
  18. class BufferToggleSubscriber extends OuterSubscriber {
  19. constructor(destination, openings, closingSelector) {
  20. super(destination);
  21. this.closingSelector = closingSelector;
  22. this.contexts = [];
  23. this.add(subscribeToResult(this, openings));
  24. }
  25. _next(value) {
  26. const contexts = this.contexts;
  27. const len = contexts.length;
  28. for (let i = 0; i < len; i++) {
  29. contexts[i].buffer.push(value);
  30. }
  31. }
  32. _error(err) {
  33. const contexts = this.contexts;
  34. while (contexts.length > 0) {
  35. const context = contexts.shift();
  36. context.subscription.unsubscribe();
  37. context.buffer = null;
  38. context.subscription = null;
  39. }
  40. this.contexts = null;
  41. super._error(err);
  42. }
  43. _complete() {
  44. const contexts = this.contexts;
  45. while (contexts.length > 0) {
  46. const context = contexts.shift();
  47. this.destination.next(context.buffer);
  48. context.subscription.unsubscribe();
  49. context.buffer = null;
  50. context.subscription = null;
  51. }
  52. this.contexts = null;
  53. super._complete();
  54. }
  55. notifyNext(outerValue, innerValue) {
  56. outerValue ? this.closeBuffer(outerValue) : this.openBuffer(innerValue);
  57. }
  58. notifyComplete(innerSub) {
  59. this.closeBuffer(innerSub.context);
  60. }
  61. openBuffer(value) {
  62. try {
  63. const closingSelector = this.closingSelector;
  64. const closingNotifier = closingSelector.call(this, value);
  65. if (closingNotifier) {
  66. this.trySubscribe(closingNotifier);
  67. }
  68. }
  69. catch (err) {
  70. this._error(err);
  71. }
  72. }
  73. closeBuffer(context) {
  74. const contexts = this.contexts;
  75. if (contexts && context) {
  76. const { buffer, subscription } = context;
  77. this.destination.next(buffer);
  78. contexts.splice(contexts.indexOf(context), 1);
  79. this.remove(subscription);
  80. subscription.unsubscribe();
  81. }
  82. }
  83. trySubscribe(closingNotifier) {
  84. const contexts = this.contexts;
  85. const buffer = [];
  86. const subscription = new Subscription();
  87. const context = { buffer, subscription };
  88. contexts.push(context);
  89. const innerSubscription = subscribeToResult(this, closingNotifier, context);
  90. if (!innerSubscription || innerSubscription.closed) {
  91. this.closeBuffer(context);
  92. }
  93. else {
  94. innerSubscription.context = context;
  95. this.add(innerSubscription);
  96. subscription.add(innerSubscription);
  97. }
  98. }
  99. }
  100. //# sourceMappingURL=bufferToggle.js.map