takeWhile.js 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. import { Subscriber } from '../Subscriber';
  2. /**
  3. * Emits values emitted by the source Observable so long as each value satisfies
  4. * the given `predicate`, and then completes as soon as this `predicate` is not
  5. * satisfied.
  6. *
  7. * <span class="informal">Takes values from the source only while they pass the
  8. * condition given. When the first value does not satisfy, it completes.</span>
  9. *
  10. * <img src="./img/takeWhile.png" width="100%">
  11. *
  12. * `takeWhile` subscribes and begins mirroring the source Observable. Each value
  13. * emitted on the source is given to the `predicate` function which returns a
  14. * boolean, representing a condition to be satisfied by the source values. The
  15. * output Observable emits the source values until such time as the `predicate`
  16. * returns false, at which point `takeWhile` stops mirroring the source
  17. * Observable and completes the output Observable.
  18. *
  19. * @example <caption>Emit click events only while the clientX property is greater than 200</caption>
  20. * var clicks = Rx.Observable.fromEvent(document, 'click');
  21. * var result = clicks.takeWhile(ev => ev.clientX > 200);
  22. * result.subscribe(x => console.log(x));
  23. *
  24. * @see {@link take}
  25. * @see {@link takeLast}
  26. * @see {@link takeUntil}
  27. * @see {@link skip}
  28. *
  29. * @param {function(value: T, index: number): boolean} predicate A function that
  30. * evaluates a value emitted by the source Observable and returns a boolean.
  31. * Also takes the (zero-based) index as the second argument.
  32. * @return {Observable<T>} An Observable that emits the values from the source
  33. * Observable so long as each value satisfies the condition defined by the
  34. * `predicate`, then completes.
  35. * @method takeWhile
  36. * @owner Observable
  37. */
  38. export function takeWhile(predicate) {
  39. return (source) => source.lift(new TakeWhileOperator(predicate));
  40. }
  41. class TakeWhileOperator {
  42. constructor(predicate) {
  43. this.predicate = predicate;
  44. }
  45. call(subscriber, source) {
  46. return source.subscribe(new TakeWhileSubscriber(subscriber, this.predicate));
  47. }
  48. }
  49. /**
  50. * We need this JSDoc comment for affecting ESDoc.
  51. * @ignore
  52. * @extends {Ignored}
  53. */
  54. class TakeWhileSubscriber extends Subscriber {
  55. constructor(destination, predicate) {
  56. super(destination);
  57. this.predicate = predicate;
  58. this.index = 0;
  59. }
  60. _next(value) {
  61. const destination = this.destination;
  62. let result;
  63. try {
  64. result = this.predicate(value, this.index++);
  65. }
  66. catch (err) {
  67. destination.error(err);
  68. return;
  69. }
  70. this.nextOrComplete(value, result);
  71. }
  72. nextOrComplete(value, predicateResult) {
  73. const destination = this.destination;
  74. if (Boolean(predicateResult)) {
  75. destination.next(value);
  76. }
  77. else {
  78. destination.complete();
  79. }
  80. }
  81. }
  82. //# sourceMappingURL=takeWhile.js.map