concatenate.js 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. /*
  2. Copyright 2018 Google LLC
  3. Use of this source code is governed by an MIT-style
  4. license that can be found in the LICENSE file or at
  5. https://opensource.org/licenses/MIT.
  6. */
  7. import { logger } from 'workbox-core/_private/logger.js';
  8. import { assert } from 'workbox-core/_private/assert.js';
  9. import { Deferred } from 'workbox-core/_private/Deferred.js';
  10. import './_version.js';
  11. /**
  12. * Takes either a Response, a ReadableStream, or a
  13. * [BodyInit](https://fetch.spec.whatwg.org/#bodyinit) and returns the
  14. * ReadableStreamReader object associated with it.
  15. *
  16. * @param {module:workbox-streams.StreamSource} source
  17. * @return {ReadableStreamReader}
  18. * @private
  19. */
  20. function _getReaderFromSource(source) {
  21. if (source instanceof Response) {
  22. return source.body.getReader();
  23. }
  24. if (source instanceof ReadableStream) {
  25. return source.getReader();
  26. }
  27. return new Response(source).body.getReader();
  28. }
  29. /**
  30. * Takes multiple source Promises, each of which could resolve to a Response, a
  31. * ReadableStream, or a [BodyInit](https://fetch.spec.whatwg.org/#bodyinit).
  32. *
  33. * Returns an object exposing a ReadableStream with each individual stream's
  34. * data returned in sequence, along with a Promise which signals when the
  35. * stream is finished (useful for passing to a FetchEvent's waitUntil()).
  36. *
  37. * @param {Array<Promise<module:workbox-streams.StreamSource>>} sourcePromises
  38. * @return {Object<{done: Promise, stream: ReadableStream}>}
  39. *
  40. * @memberof module:workbox-streams
  41. */
  42. function concatenate(sourcePromises) {
  43. if (process.env.NODE_ENV !== 'production') {
  44. assert.isArray(sourcePromises, {
  45. moduleName: 'workbox-streams',
  46. funcName: 'concatenate',
  47. paramName: 'sourcePromises',
  48. });
  49. }
  50. const readerPromises = sourcePromises.map((sourcePromise) => {
  51. return Promise.resolve(sourcePromise).then((source) => {
  52. return _getReaderFromSource(source);
  53. });
  54. });
  55. const streamDeferred = new Deferred();
  56. let i = 0;
  57. const logMessages = [];
  58. const stream = new ReadableStream({
  59. pull(controller) {
  60. return readerPromises[i]
  61. .then((reader) => reader.read())
  62. .then((result) => {
  63. if (result.done) {
  64. if (process.env.NODE_ENV !== 'production') {
  65. logMessages.push([
  66. 'Reached the end of source:',
  67. sourcePromises[i],
  68. ]);
  69. }
  70. i++;
  71. if (i >= readerPromises.length) {
  72. // Log all the messages in the group at once in a single group.
  73. if (process.env.NODE_ENV !== 'production') {
  74. logger.groupCollapsed(`Concatenating ${readerPromises.length} sources.`);
  75. for (const message of logMessages) {
  76. if (Array.isArray(message)) {
  77. logger.log(...message);
  78. }
  79. else {
  80. logger.log(message);
  81. }
  82. }
  83. logger.log('Finished reading all sources.');
  84. logger.groupEnd();
  85. }
  86. controller.close();
  87. streamDeferred.resolve();
  88. return;
  89. }
  90. // The `pull` method is defined because we're inside it.
  91. return this.pull(controller);
  92. }
  93. else {
  94. controller.enqueue(result.value);
  95. }
  96. })
  97. .catch((error) => {
  98. if (process.env.NODE_ENV !== 'production') {
  99. logger.error('An error occurred:', error);
  100. }
  101. streamDeferred.reject(error);
  102. throw error;
  103. });
  104. },
  105. cancel() {
  106. if (process.env.NODE_ENV !== 'production') {
  107. logger.warn('The ReadableStream was cancelled.');
  108. }
  109. streamDeferred.resolve();
  110. },
  111. });
  112. return { done: streamDeferred.promise, stream };
  113. }
  114. export { concatenate };