takeLast.js 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. import { Subscriber } from '../Subscriber';
  2. import { ArgumentOutOfRangeError } from '../util/ArgumentOutOfRangeError';
  3. import { EmptyObservable } from '../observable/EmptyObservable';
  4. /**
  5. * Emits only the last `count` values emitted by the source Observable.
  6. *
  7. * <span class="informal">Remembers the latest `count` values, then emits those
  8. * only when the source completes.</span>
  9. *
  10. * <img src="./img/takeLast.png" width="100%">
  11. *
  12. * `takeLast` returns an Observable that emits at most the last `count` values
  13. * emitted by the source Observable. If the source emits fewer than `count`
  14. * values then all of its values are emitted. This operator must wait until the
  15. * `complete` notification emission from the source in order to emit the `next`
  16. * values on the output Observable, because otherwise it is impossible to know
  17. * whether or not more values will be emitted on the source. For this reason,
  18. * all values are emitted synchronously, followed by the complete notification.
  19. *
  20. * @example <caption>Take the last 3 values of an Observable with many values</caption>
  21. * var many = Rx.Observable.range(1, 100);
  22. * var lastThree = many.takeLast(3);
  23. * lastThree.subscribe(x => console.log(x));
  24. *
  25. * @see {@link take}
  26. * @see {@link takeUntil}
  27. * @see {@link takeWhile}
  28. * @see {@link skip}
  29. *
  30. * @throws {ArgumentOutOfRangeError} When using `takeLast(i)`, it delivers an
  31. * ArgumentOutOrRangeError to the Observer's `error` callback if `i < 0`.
  32. *
  33. * @param {number} count The maximum number of values to emit from the end of
  34. * the sequence of values emitted by the source Observable.
  35. * @return {Observable<T>} An Observable that emits at most the last count
  36. * values emitted by the source Observable.
  37. * @method takeLast
  38. * @owner Observable
  39. */
  40. export function takeLast(count) {
  41. return function takeLastOperatorFunction(source) {
  42. if (count === 0) {
  43. return new EmptyObservable();
  44. }
  45. else {
  46. return source.lift(new TakeLastOperator(count));
  47. }
  48. };
  49. }
  50. class TakeLastOperator {
  51. constructor(total) {
  52. this.total = total;
  53. if (this.total < 0) {
  54. throw new ArgumentOutOfRangeError;
  55. }
  56. }
  57. call(subscriber, source) {
  58. return source.subscribe(new TakeLastSubscriber(subscriber, this.total));
  59. }
  60. }
  61. /**
  62. * We need this JSDoc comment for affecting ESDoc.
  63. * @ignore
  64. * @extends {Ignored}
  65. */
  66. class TakeLastSubscriber extends Subscriber {
  67. constructor(destination, total) {
  68. super(destination);
  69. this.total = total;
  70. this.ring = new Array();
  71. this.count = 0;
  72. }
  73. _next(value) {
  74. const ring = this.ring;
  75. const total = this.total;
  76. const count = this.count++;
  77. if (ring.length < total) {
  78. ring.push(value);
  79. }
  80. else {
  81. const index = count % total;
  82. ring[index] = value;
  83. }
  84. }
  85. _complete() {
  86. const destination = this.destination;
  87. let count = this.count;
  88. if (count > 0) {
  89. const total = this.count >= this.total ? this.total : this.count;
  90. const ring = this.ring;
  91. for (let i = 0; i < total; i++) {
  92. const idx = (count++) % total;
  93. destination.next(ring[idx]);
  94. }
  95. }
  96. destination.complete();
  97. }
  98. }
  99. //# sourceMappingURL=takeLast.js.map