bufferToggle.js 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. import { Subscription } from '../Subscription';
  2. import { subscribeToResult } from '../util/subscribeToResult';
  3. import { OuterSubscriber } from '../OuterSubscriber';
  4. export function bufferToggle(openings, closingSelector) {
  5. return function bufferToggleOperatorFunction(source) {
  6. return source.lift(new BufferToggleOperator(openings, closingSelector));
  7. };
  8. }
  9. class BufferToggleOperator {
  10. constructor(openings, closingSelector) {
  11. this.openings = openings;
  12. this.closingSelector = closingSelector;
  13. }
  14. call(subscriber, source) {
  15. return source.subscribe(new BufferToggleSubscriber(subscriber, this.openings, this.closingSelector));
  16. }
  17. }
  18. class BufferToggleSubscriber extends OuterSubscriber {
  19. constructor(destination, openings, closingSelector) {
  20. super(destination);
  21. this.openings = openings;
  22. this.closingSelector = closingSelector;
  23. this.contexts = [];
  24. this.add(subscribeToResult(this, openings));
  25. }
  26. _next(value) {
  27. const contexts = this.contexts;
  28. const len = contexts.length;
  29. for (let i = 0; i < len; i++) {
  30. contexts[i].buffer.push(value);
  31. }
  32. }
  33. _error(err) {
  34. const contexts = this.contexts;
  35. while (contexts.length > 0) {
  36. const context = contexts.shift();
  37. context.subscription.unsubscribe();
  38. context.buffer = null;
  39. context.subscription = null;
  40. }
  41. this.contexts = null;
  42. super._error(err);
  43. }
  44. _complete() {
  45. const contexts = this.contexts;
  46. while (contexts.length > 0) {
  47. const context = contexts.shift();
  48. this.destination.next(context.buffer);
  49. context.subscription.unsubscribe();
  50. context.buffer = null;
  51. context.subscription = null;
  52. }
  53. this.contexts = null;
  54. super._complete();
  55. }
  56. notifyNext(outerValue, innerValue, outerIndex, innerIndex, innerSub) {
  57. outerValue ? this.closeBuffer(outerValue) : this.openBuffer(innerValue);
  58. }
  59. notifyComplete(innerSub) {
  60. this.closeBuffer(innerSub.context);
  61. }
  62. openBuffer(value) {
  63. try {
  64. const closingSelector = this.closingSelector;
  65. const closingNotifier = closingSelector.call(this, value);
  66. if (closingNotifier) {
  67. this.trySubscribe(closingNotifier);
  68. }
  69. }
  70. catch (err) {
  71. this._error(err);
  72. }
  73. }
  74. closeBuffer(context) {
  75. const contexts = this.contexts;
  76. if (contexts && context) {
  77. const { buffer, subscription } = context;
  78. this.destination.next(buffer);
  79. contexts.splice(contexts.indexOf(context), 1);
  80. this.remove(subscription);
  81. subscription.unsubscribe();
  82. }
  83. }
  84. trySubscribe(closingNotifier) {
  85. const contexts = this.contexts;
  86. const buffer = [];
  87. const subscription = new Subscription();
  88. const context = { buffer, subscription };
  89. contexts.push(context);
  90. const innerSubscription = subscribeToResult(this, closingNotifier, context);
  91. if (!innerSubscription || innerSubscription.closed) {
  92. this.closeBuffer(context);
  93. }
  94. else {
  95. innerSubscription.context = context;
  96. this.add(innerSubscription);
  97. subscription.add(innerSubscription);
  98. }
  99. }
  100. }
  101. //# sourceMappingURL=bufferToggle.js.map