bufferCount.js 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. import { Subscriber } from '../Subscriber';
  2. export function bufferCount(bufferSize, startBufferEvery = null) {
  3. return function bufferCountOperatorFunction(source) {
  4. return source.lift(new BufferCountOperator(bufferSize, startBufferEvery));
  5. };
  6. }
  7. class BufferCountOperator {
  8. constructor(bufferSize, startBufferEvery) {
  9. this.bufferSize = bufferSize;
  10. this.startBufferEvery = startBufferEvery;
  11. if (!startBufferEvery || bufferSize === startBufferEvery) {
  12. this.subscriberClass = BufferCountSubscriber;
  13. }
  14. else {
  15. this.subscriberClass = BufferSkipCountSubscriber;
  16. }
  17. }
  18. call(subscriber, source) {
  19. return source.subscribe(new this.subscriberClass(subscriber, this.bufferSize, this.startBufferEvery));
  20. }
  21. }
  22. class BufferCountSubscriber extends Subscriber {
  23. constructor(destination, bufferSize) {
  24. super(destination);
  25. this.bufferSize = bufferSize;
  26. this.buffer = [];
  27. }
  28. _next(value) {
  29. const buffer = this.buffer;
  30. buffer.push(value);
  31. if (buffer.length == this.bufferSize) {
  32. this.destination.next(buffer);
  33. this.buffer = [];
  34. }
  35. }
  36. _complete() {
  37. const buffer = this.buffer;
  38. if (buffer.length > 0) {
  39. this.destination.next(buffer);
  40. }
  41. super._complete();
  42. }
  43. }
  44. class BufferSkipCountSubscriber extends Subscriber {
  45. constructor(destination, bufferSize, startBufferEvery) {
  46. super(destination);
  47. this.bufferSize = bufferSize;
  48. this.startBufferEvery = startBufferEvery;
  49. this.buffers = [];
  50. this.count = 0;
  51. }
  52. _next(value) {
  53. const { bufferSize, startBufferEvery, buffers, count } = this;
  54. this.count++;
  55. if (count % startBufferEvery === 0) {
  56. buffers.push([]);
  57. }
  58. for (let i = buffers.length; i--;) {
  59. const buffer = buffers[i];
  60. buffer.push(value);
  61. if (buffer.length === bufferSize) {
  62. buffers.splice(i, 1);
  63. this.destination.next(buffer);
  64. }
  65. }
  66. }
  67. _complete() {
  68. const { buffers, destination } = this;
  69. while (buffers.length > 0) {
  70. let buffer = buffers.shift();
  71. if (buffer.length > 0) {
  72. destination.next(buffer);
  73. }
  74. }
  75. super._complete();
  76. }
  77. }
  78. //# sourceMappingURL=bufferCount.js.map