forkJoin.js 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. import { Observable } from '../Observable';
  2. import { isArray } from '../util/isArray';
  3. import { map } from '../operators/map';
  4. import { isObject } from '../util/isObject';
  5. import { from } from './from';
  6. export function forkJoin(...sources) {
  7. if (sources.length === 1) {
  8. const first = sources[0];
  9. if (isArray(first)) {
  10. return forkJoinInternal(first, null);
  11. }
  12. if (isObject(first) && Object.getPrototypeOf(first) === Object.prototype) {
  13. const keys = Object.keys(first);
  14. return forkJoinInternal(keys.map(key => first[key]), keys);
  15. }
  16. }
  17. if (typeof sources[sources.length - 1] === 'function') {
  18. const resultSelector = sources.pop();
  19. sources = (sources.length === 1 && isArray(sources[0])) ? sources[0] : sources;
  20. return forkJoinInternal(sources, null).pipe(map((args) => resultSelector(...args)));
  21. }
  22. return forkJoinInternal(sources, null);
  23. }
  24. function forkJoinInternal(sources, keys) {
  25. return new Observable(subscriber => {
  26. const len = sources.length;
  27. if (len === 0) {
  28. subscriber.complete();
  29. return;
  30. }
  31. const values = new Array(len);
  32. let completed = 0;
  33. let emitted = 0;
  34. for (let i = 0; i < len; i++) {
  35. const source = from(sources[i]);
  36. let hasValue = false;
  37. subscriber.add(source.subscribe({
  38. next: value => {
  39. if (!hasValue) {
  40. hasValue = true;
  41. emitted++;
  42. }
  43. values[i] = value;
  44. },
  45. error: err => subscriber.error(err),
  46. complete: () => {
  47. completed++;
  48. if (completed === len || !hasValue) {
  49. if (emitted === len) {
  50. subscriber.next(keys ?
  51. keys.reduce((result, key, i) => (result[key] = values[i], result), {}) :
  52. values);
  53. }
  54. subscriber.complete();
  55. }
  56. }
  57. }));
  58. }
  59. });
  60. }
  61. //# sourceMappingURL=forkJoin.js.map