async_iterator.js 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. 'use strict';
  2. var _Object$setPrototypeO;
  3. function _defineProperty(obj, key, value) { if (key in obj) { Object.defineProperty(obj, key, { value: value, enumerable: true, configurable: true, writable: true }); } else { obj[key] = value; } return obj; }
  4. var finished = require('./end-of-stream');
  5. var kLastResolve = Symbol('lastResolve');
  6. var kLastReject = Symbol('lastReject');
  7. var kError = Symbol('error');
  8. var kEnded = Symbol('ended');
  9. var kLastPromise = Symbol('lastPromise');
  10. var kHandlePromise = Symbol('handlePromise');
  11. var kStream = Symbol('stream');
  12. function createIterResult(value, done) {
  13. return {
  14. value: value,
  15. done: done
  16. };
  17. }
  18. function readAndResolve(iter) {
  19. var resolve = iter[kLastResolve];
  20. if (resolve !== null) {
  21. var data = iter[kStream].read(); // we defer if data is null
  22. // we can be expecting either 'end' or
  23. // 'error'
  24. if (data !== null) {
  25. iter[kLastPromise] = null;
  26. iter[kLastResolve] = null;
  27. iter[kLastReject] = null;
  28. resolve(createIterResult(data, false));
  29. }
  30. }
  31. }
  32. function onReadable(iter) {
  33. // we wait for the next tick, because it might
  34. // emit an error with process.nextTick
  35. process.nextTick(readAndResolve, iter);
  36. }
  37. function wrapForNext(lastPromise, iter) {
  38. return function (resolve, reject) {
  39. lastPromise.then(function () {
  40. if (iter[kEnded]) {
  41. resolve(createIterResult(undefined, true));
  42. return;
  43. }
  44. iter[kHandlePromise](resolve, reject);
  45. }, reject);
  46. };
  47. }
  48. var AsyncIteratorPrototype = Object.getPrototypeOf(function () {});
  49. var ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf((_Object$setPrototypeO = {
  50. get stream() {
  51. return this[kStream];
  52. },
  53. next: function next() {
  54. var _this = this;
  55. // if we have detected an error in the meanwhile
  56. // reject straight away
  57. var error = this[kError];
  58. if (error !== null) {
  59. return Promise.reject(error);
  60. }
  61. if (this[kEnded]) {
  62. return Promise.resolve(createIterResult(undefined, true));
  63. }
  64. if (this[kStream].destroyed) {
  65. // We need to defer via nextTick because if .destroy(err) is
  66. // called, the error will be emitted via nextTick, and
  67. // we cannot guarantee that there is no error lingering around
  68. // waiting to be emitted.
  69. return new Promise(function (resolve, reject) {
  70. process.nextTick(function () {
  71. if (_this[kError]) {
  72. reject(_this[kError]);
  73. } else {
  74. resolve(createIterResult(undefined, true));
  75. }
  76. });
  77. });
  78. } // if we have multiple next() calls
  79. // we will wait for the previous Promise to finish
  80. // this logic is optimized to support for await loops,
  81. // where next() is only called once at a time
  82. var lastPromise = this[kLastPromise];
  83. var promise;
  84. if (lastPromise) {
  85. promise = new Promise(wrapForNext(lastPromise, this));
  86. } else {
  87. // fast path needed to support multiple this.push()
  88. // without triggering the next() queue
  89. var data = this[kStream].read();
  90. if (data !== null) {
  91. return Promise.resolve(createIterResult(data, false));
  92. }
  93. promise = new Promise(this[kHandlePromise]);
  94. }
  95. this[kLastPromise] = promise;
  96. return promise;
  97. }
  98. }, _defineProperty(_Object$setPrototypeO, Symbol.asyncIterator, function () {
  99. return this;
  100. }), _defineProperty(_Object$setPrototypeO, "return", function _return() {
  101. var _this2 = this;
  102. // destroy(err, cb) is a private API
  103. // we can guarantee we have that here, because we control the
  104. // Readable class this is attached to
  105. return new Promise(function (resolve, reject) {
  106. _this2[kStream].destroy(null, function (err) {
  107. if (err) {
  108. reject(err);
  109. return;
  110. }
  111. resolve(createIterResult(undefined, true));
  112. });
  113. });
  114. }), _Object$setPrototypeO), AsyncIteratorPrototype);
  115. var createReadableStreamAsyncIterator = function createReadableStreamAsyncIterator(stream) {
  116. var _Object$create;
  117. var iterator = Object.create(ReadableStreamAsyncIteratorPrototype, (_Object$create = {}, _defineProperty(_Object$create, kStream, {
  118. value: stream,
  119. writable: true
  120. }), _defineProperty(_Object$create, kLastResolve, {
  121. value: null,
  122. writable: true
  123. }), _defineProperty(_Object$create, kLastReject, {
  124. value: null,
  125. writable: true
  126. }), _defineProperty(_Object$create, kError, {
  127. value: null,
  128. writable: true
  129. }), _defineProperty(_Object$create, kEnded, {
  130. value: stream._readableState.endEmitted,
  131. writable: true
  132. }), _defineProperty(_Object$create, kHandlePromise, {
  133. value: function value(resolve, reject) {
  134. var data = iterator[kStream].read();
  135. if (data) {
  136. iterator[kLastPromise] = null;
  137. iterator[kLastResolve] = null;
  138. iterator[kLastReject] = null;
  139. resolve(createIterResult(data, false));
  140. } else {
  141. iterator[kLastResolve] = resolve;
  142. iterator[kLastReject] = reject;
  143. }
  144. },
  145. writable: true
  146. }), _Object$create));
  147. iterator[kLastPromise] = null;
  148. finished(stream, function (err) {
  149. if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
  150. var reject = iterator[kLastReject]; // reject if we are waiting for data in the Promise
  151. // returned by next() and store the error
  152. if (reject !== null) {
  153. iterator[kLastPromise] = null;
  154. iterator[kLastResolve] = null;
  155. iterator[kLastReject] = null;
  156. reject(err);
  157. }
  158. iterator[kError] = err;
  159. return;
  160. }
  161. var resolve = iterator[kLastResolve];
  162. if (resolve !== null) {
  163. iterator[kLastPromise] = null;
  164. iterator[kLastResolve] = null;
  165. iterator[kLastReject] = null;
  166. resolve(createIterResult(undefined, true));
  167. }
  168. iterator[kEnded] = true;
  169. });
  170. stream.on('readable', onReadable.bind(null, iterator));
  171. return iterator;
  172. };
  173. module.exports = createReadableStreamAsyncIterator;