windowCount.js 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. import { Subscriber } from '../Subscriber';
  2. import { Subject } from '../Subject';
  3. export function windowCount(windowSize, startWindowEvery = 0) {
  4. return function windowCountOperatorFunction(source) {
  5. return source.lift(new WindowCountOperator(windowSize, startWindowEvery));
  6. };
  7. }
  8. class WindowCountOperator {
  9. constructor(windowSize, startWindowEvery) {
  10. this.windowSize = windowSize;
  11. this.startWindowEvery = startWindowEvery;
  12. }
  13. call(subscriber, source) {
  14. return source.subscribe(new WindowCountSubscriber(subscriber, this.windowSize, this.startWindowEvery));
  15. }
  16. }
  17. class WindowCountSubscriber extends Subscriber {
  18. constructor(destination, windowSize, startWindowEvery) {
  19. super(destination);
  20. this.destination = destination;
  21. this.windowSize = windowSize;
  22. this.startWindowEvery = startWindowEvery;
  23. this.windows = [new Subject()];
  24. this.count = 0;
  25. destination.next(this.windows[0]);
  26. }
  27. _next(value) {
  28. const startWindowEvery = (this.startWindowEvery > 0) ? this.startWindowEvery : this.windowSize;
  29. const destination = this.destination;
  30. const windowSize = this.windowSize;
  31. const windows = this.windows;
  32. const len = windows.length;
  33. for (let i = 0; i < len && !this.closed; i++) {
  34. windows[i].next(value);
  35. }
  36. const c = this.count - windowSize + 1;
  37. if (c >= 0 && c % startWindowEvery === 0 && !this.closed) {
  38. windows.shift().complete();
  39. }
  40. if (++this.count % startWindowEvery === 0 && !this.closed) {
  41. const window = new Subject();
  42. windows.push(window);
  43. destination.next(window);
  44. }
  45. }
  46. _error(err) {
  47. const windows = this.windows;
  48. if (windows) {
  49. while (windows.length > 0 && !this.closed) {
  50. windows.shift().error(err);
  51. }
  52. }
  53. this.destination.error(err);
  54. }
  55. _complete() {
  56. const windows = this.windows;
  57. if (windows) {
  58. while (windows.length > 0 && !this.closed) {
  59. windows.shift().complete();
  60. }
  61. }
  62. this.destination.complete();
  63. }
  64. _unsubscribe() {
  65. this.count = 0;
  66. this.windows = null;
  67. }
  68. }
  69. //# sourceMappingURL=windowCount.js.map