concatenate.ts 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. /*
  2. Copyright 2018 Google LLC
  3. Use of this source code is governed by an MIT-style
  4. license that can be found in the LICENSE file or at
  5. https://opensource.org/licenses/MIT.
  6. */
  7. import {logger} from 'workbox-core/_private/logger.js';
  8. import {assert} from 'workbox-core/_private/assert.js';
  9. import {Deferred} from 'workbox-core/_private/Deferred.js';
  10. import {StreamSource} from './_types.js';
  11. import './_version.js';
  12. /**
  13. * Takes either a Response, a ReadableStream, or a
  14. * [BodyInit](https://fetch.spec.whatwg.org/#bodyinit) and returns the
  15. * ReadableStreamReader object associated with it.
  16. *
  17. * @param {module:workbox-streams.StreamSource} source
  18. * @return {ReadableStreamReader}
  19. * @private
  20. */
  21. function _getReaderFromSource(source: StreamSource): ReadableStreamReader {
  22. if (source instanceof Response) {
  23. return source.body!.getReader();
  24. }
  25. if (source instanceof ReadableStream) {
  26. return source.getReader();
  27. }
  28. return new Response(source as BodyInit).body!.getReader();
  29. }
  30. /**
  31. * Takes multiple source Promises, each of which could resolve to a Response, a
  32. * ReadableStream, or a [BodyInit](https://fetch.spec.whatwg.org/#bodyinit).
  33. *
  34. * Returns an object exposing a ReadableStream with each individual stream's
  35. * data returned in sequence, along with a Promise which signals when the
  36. * stream is finished (useful for passing to a FetchEvent's waitUntil()).
  37. *
  38. * @param {Array<Promise<module:workbox-streams.StreamSource>>} sourcePromises
  39. * @return {Object<{done: Promise, stream: ReadableStream}>}
  40. *
  41. * @memberof module:workbox-streams
  42. */
  43. function concatenate(sourcePromises: Promise<StreamSource>[]): {
  44. done: Promise<void>;
  45. stream: ReadableStream;
  46. } {
  47. if (process.env.NODE_ENV !== 'production') {
  48. assert!.isArray(sourcePromises, {
  49. moduleName: 'workbox-streams',
  50. funcName: 'concatenate',
  51. paramName: 'sourcePromises',
  52. });
  53. }
  54. const readerPromises = sourcePromises.map((sourcePromise) => {
  55. return Promise.resolve(sourcePromise).then((source) => {
  56. return _getReaderFromSource(source);
  57. });
  58. });
  59. const streamDeferred: Deferred<void> = new Deferred();
  60. let i = 0;
  61. const logMessages: any[] = [];
  62. const stream = new ReadableStream({
  63. pull(controller: ReadableStreamDefaultController<any>) {
  64. return readerPromises[i]
  65. .then((reader) => reader.read())
  66. .then((result) => {
  67. if (result.done) {
  68. if (process.env.NODE_ENV !== 'production') {
  69. logMessages.push(['Reached the end of source:',
  70. sourcePromises[i]]);
  71. }
  72. i++;
  73. if (i >= readerPromises.length) {
  74. // Log all the messages in the group at once in a single group.
  75. if (process.env.NODE_ENV !== 'production') {
  76. logger.groupCollapsed(
  77. `Concatenating ${readerPromises.length} sources.`);
  78. for (const message of logMessages) {
  79. if (Array.isArray(message)) {
  80. logger.log(...message);
  81. } else {
  82. logger.log(message);
  83. }
  84. }
  85. logger.log('Finished reading all sources.');
  86. logger.groupEnd();
  87. }
  88. controller.close();
  89. streamDeferred.resolve();
  90. return;
  91. }
  92. // The `pull` method is defined because we're inside it.
  93. return this.pull!(controller);
  94. } else {
  95. controller.enqueue(result.value);
  96. }
  97. }).catch((error) => {
  98. if (process.env.NODE_ENV !== 'production') {
  99. logger.error('An error occurred:', error);
  100. }
  101. streamDeferred.reject(error);
  102. throw error;
  103. });
  104. },
  105. cancel() {
  106. if (process.env.NODE_ENV !== 'production') {
  107. logger.warn('The ReadableStream was cancelled.');
  108. }
  109. streamDeferred.resolve();
  110. },
  111. });
  112. return {done: streamDeferred.promise, stream};
  113. }
  114. export {concatenate};