123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207 |
- 'use strict';
- var _Object$setPrototypeO;
- function _defineProperty(obj, key, value) { if (key in obj) { Object.defineProperty(obj, key, { value: value, enumerable: true, configurable: true, writable: true }); } else { obj[key] = value; } return obj; }
- var finished = require('./end-of-stream');
- var kLastResolve = Symbol('lastResolve');
- var kLastReject = Symbol('lastReject');
- var kError = Symbol('error');
- var kEnded = Symbol('ended');
- var kLastPromise = Symbol('lastPromise');
- var kHandlePromise = Symbol('handlePromise');
- var kStream = Symbol('stream');
- function createIterResult(value, done) {
- return {
- value: value,
- done: done
- };
- }
- function readAndResolve(iter) {
- var resolve = iter[kLastResolve];
- if (resolve !== null) {
- var data = iter[kStream].read(); // we defer if data is null
- // we can be expecting either 'end' or
- // 'error'
- if (data !== null) {
- iter[kLastPromise] = null;
- iter[kLastResolve] = null;
- iter[kLastReject] = null;
- resolve(createIterResult(data, false));
- }
- }
- }
- function onReadable(iter) {
- // we wait for the next tick, because it might
- // emit an error with process.nextTick
- process.nextTick(readAndResolve, iter);
- }
- function wrapForNext(lastPromise, iter) {
- return function (resolve, reject) {
- lastPromise.then(function () {
- if (iter[kEnded]) {
- resolve(createIterResult(undefined, true));
- return;
- }
- iter[kHandlePromise](resolve, reject);
- }, reject);
- };
- }
- var AsyncIteratorPrototype = Object.getPrototypeOf(function () {});
- var ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf((_Object$setPrototypeO = {
- get stream() {
- return this[kStream];
- },
- next: function next() {
- var _this = this;
- // if we have detected an error in the meanwhile
- // reject straight away
- var error = this[kError];
- if (error !== null) {
- return Promise.reject(error);
- }
- if (this[kEnded]) {
- return Promise.resolve(createIterResult(undefined, true));
- }
- if (this[kStream].destroyed) {
- // We need to defer via nextTick because if .destroy(err) is
- // called, the error will be emitted via nextTick, and
- // we cannot guarantee that there is no error lingering around
- // waiting to be emitted.
- return new Promise(function (resolve, reject) {
- process.nextTick(function () {
- if (_this[kError]) {
- reject(_this[kError]);
- } else {
- resolve(createIterResult(undefined, true));
- }
- });
- });
- } // if we have multiple next() calls
- // we will wait for the previous Promise to finish
- // this logic is optimized to support for await loops,
- // where next() is only called once at a time
- var lastPromise = this[kLastPromise];
- var promise;
- if (lastPromise) {
- promise = new Promise(wrapForNext(lastPromise, this));
- } else {
- // fast path needed to support multiple this.push()
- // without triggering the next() queue
- var data = this[kStream].read();
- if (data !== null) {
- return Promise.resolve(createIterResult(data, false));
- }
- promise = new Promise(this[kHandlePromise]);
- }
- this[kLastPromise] = promise;
- return promise;
- }
- }, _defineProperty(_Object$setPrototypeO, Symbol.asyncIterator, function () {
- return this;
- }), _defineProperty(_Object$setPrototypeO, "return", function _return() {
- var _this2 = this;
- // destroy(err, cb) is a private API
- // we can guarantee we have that here, because we control the
- // Readable class this is attached to
- return new Promise(function (resolve, reject) {
- _this2[kStream].destroy(null, function (err) {
- if (err) {
- reject(err);
- return;
- }
- resolve(createIterResult(undefined, true));
- });
- });
- }), _Object$setPrototypeO), AsyncIteratorPrototype);
- var createReadableStreamAsyncIterator = function createReadableStreamAsyncIterator(stream) {
- var _Object$create;
- var iterator = Object.create(ReadableStreamAsyncIteratorPrototype, (_Object$create = {}, _defineProperty(_Object$create, kStream, {
- value: stream,
- writable: true
- }), _defineProperty(_Object$create, kLastResolve, {
- value: null,
- writable: true
- }), _defineProperty(_Object$create, kLastReject, {
- value: null,
- writable: true
- }), _defineProperty(_Object$create, kError, {
- value: null,
- writable: true
- }), _defineProperty(_Object$create, kEnded, {
- value: stream._readableState.endEmitted,
- writable: true
- }), _defineProperty(_Object$create, kHandlePromise, {
- value: function value(resolve, reject) {
- var data = iterator[kStream].read();
- if (data) {
- iterator[kLastPromise] = null;
- iterator[kLastResolve] = null;
- iterator[kLastReject] = null;
- resolve(createIterResult(data, false));
- } else {
- iterator[kLastResolve] = resolve;
- iterator[kLastReject] = reject;
- }
- },
- writable: true
- }), _Object$create));
- iterator[kLastPromise] = null;
- finished(stream, function (err) {
- if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
- var reject = iterator[kLastReject]; // reject if we are waiting for data in the Promise
- // returned by next() and store the error
- if (reject !== null) {
- iterator[kLastPromise] = null;
- iterator[kLastResolve] = null;
- iterator[kLastReject] = null;
- reject(err);
- }
- iterator[kError] = err;
- return;
- }
- var resolve = iterator[kLastResolve];
- if (resolve !== null) {
- iterator[kLastPromise] = null;
- iterator[kLastResolve] = null;
- iterator[kLastReject] = null;
- resolve(createIterResult(undefined, true));
- }
- iterator[kEnded] = true;
- });
- stream.on('readable', onReadable.bind(null, iterator));
- return iterator;
- };
- module.exports = createReadableStreamAsyncIterator;
|