subscribeToResult.js 2.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. import { root } from './root';
  2. import { isArrayLike } from './isArrayLike';
  3. import { isPromise } from './isPromise';
  4. import { isObject } from './isObject';
  5. import { Observable } from '../Observable';
  6. import { iterator as Symbol_iterator } from '../symbol/iterator';
  7. import { InnerSubscriber } from '../InnerSubscriber';
  8. import { observable as Symbol_observable } from '../symbol/observable';
  9. export function subscribeToResult(outerSubscriber, result, outerValue, outerIndex) {
  10. let destination = new InnerSubscriber(outerSubscriber, outerValue, outerIndex);
  11. if (destination.closed) {
  12. return null;
  13. }
  14. if (result instanceof Observable) {
  15. if (result._isScalar) {
  16. destination.next(result.value);
  17. destination.complete();
  18. return null;
  19. }
  20. else {
  21. destination.syncErrorThrowable = true;
  22. return result.subscribe(destination);
  23. }
  24. }
  25. else if (isArrayLike(result)) {
  26. for (let i = 0, len = result.length; i < len && !destination.closed; i++) {
  27. destination.next(result[i]);
  28. }
  29. if (!destination.closed) {
  30. destination.complete();
  31. }
  32. }
  33. else if (isPromise(result)) {
  34. result.then((value) => {
  35. if (!destination.closed) {
  36. destination.next(value);
  37. destination.complete();
  38. }
  39. }, (err) => destination.error(err))
  40. .then(null, (err) => {
  41. // Escaping the Promise trap: globally throw unhandled errors
  42. root.setTimeout(() => { throw err; });
  43. });
  44. return destination;
  45. }
  46. else if (result && typeof result[Symbol_iterator] === 'function') {
  47. const iterator = result[Symbol_iterator]();
  48. do {
  49. let item = iterator.next();
  50. if (item.done) {
  51. destination.complete();
  52. break;
  53. }
  54. destination.next(item.value);
  55. if (destination.closed) {
  56. break;
  57. }
  58. } while (true);
  59. }
  60. else if (result && typeof result[Symbol_observable] === 'function') {
  61. const obs = result[Symbol_observable]();
  62. if (typeof obs.subscribe !== 'function') {
  63. destination.error(new TypeError('Provided object does not correctly implement Symbol.observable'));
  64. }
  65. else {
  66. return obs.subscribe(new InnerSubscriber(outerSubscriber, outerValue, outerIndex));
  67. }
  68. }
  69. else {
  70. const value = isObject(result) ? 'an invalid object' : `'${result}'`;
  71. const msg = `You provided ${value} where a stream was expected.`
  72. + ' You can provide an Observable, Promise, Array, or Iterable.';
  73. destination.error(new TypeError(msg));
  74. }
  75. return null;
  76. }
  77. //# sourceMappingURL=subscribeToResult.js.map