take.js 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. import { Subscriber } from '../Subscriber';
  2. import { ArgumentOutOfRangeError } from '../util/ArgumentOutOfRangeError';
  3. import { EmptyObservable } from '../observable/EmptyObservable';
  4. /**
  5. * Emits only the first `count` values emitted by the source Observable.
  6. *
  7. * <span class="informal">Takes the first `count` values from the source, then
  8. * completes.</span>
  9. *
  10. * <img src="./img/take.png" width="100%">
  11. *
  12. * `take` returns an Observable that emits only the first `count` values emitted
  13. * by the source Observable. If the source emits fewer than `count` values then
  14. * all of its values are emitted. After that, it completes, regardless if the
  15. * source completes.
  16. *
  17. * @example <caption>Take the first 5 seconds of an infinite 1-second interval Observable</caption>
  18. * var interval = Rx.Observable.interval(1000);
  19. * var five = interval.take(5);
  20. * five.subscribe(x => console.log(x));
  21. *
  22. * @see {@link takeLast}
  23. * @see {@link takeUntil}
  24. * @see {@link takeWhile}
  25. * @see {@link skip}
  26. *
  27. * @throws {ArgumentOutOfRangeError} When using `take(i)`, it delivers an
  28. * ArgumentOutOrRangeError to the Observer's `error` callback if `i < 0`.
  29. *
  30. * @param {number} count The maximum number of `next` values to emit.
  31. * @return {Observable<T>} An Observable that emits only the first `count`
  32. * values emitted by the source Observable, or all of the values from the source
  33. * if the source emits fewer than `count` values.
  34. * @method take
  35. * @owner Observable
  36. */
  37. export function take(count) {
  38. return (source) => {
  39. if (count === 0) {
  40. return new EmptyObservable();
  41. }
  42. else {
  43. return source.lift(new TakeOperator(count));
  44. }
  45. };
  46. }
  47. class TakeOperator {
  48. constructor(total) {
  49. this.total = total;
  50. if (this.total < 0) {
  51. throw new ArgumentOutOfRangeError;
  52. }
  53. }
  54. call(subscriber, source) {
  55. return source.subscribe(new TakeSubscriber(subscriber, this.total));
  56. }
  57. }
  58. /**
  59. * We need this JSDoc comment for affecting ESDoc.
  60. * @ignore
  61. * @extends {Ignored}
  62. */
  63. class TakeSubscriber extends Subscriber {
  64. constructor(destination, total) {
  65. super(destination);
  66. this.total = total;
  67. this.count = 0;
  68. }
  69. _next(value) {
  70. const total = this.total;
  71. const count = ++this.count;
  72. if (count <= total) {
  73. this.destination.next(value);
  74. if (count === total) {
  75. this.destination.complete();
  76. this.unsubscribe();
  77. }
  78. }
  79. }
  80. }
  81. //# sourceMappingURL=take.js.map