abstract_cursor.js 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.assertUninitialized = exports.AbstractCursor = exports.CURSOR_FLAGS = void 0;
  4. const stream_1 = require("stream");
  5. const bson_1 = require("../bson");
  6. const error_1 = require("../error");
  7. const mongo_types_1 = require("../mongo_types");
  8. const execute_operation_1 = require("../operations/execute_operation");
  9. const get_more_1 = require("../operations/get_more");
  10. const read_concern_1 = require("../read_concern");
  11. const read_preference_1 = require("../read_preference");
  12. const sessions_1 = require("../sessions");
  13. const utils_1 = require("../utils");
  14. /** @internal */
  15. const kId = Symbol('id');
  16. /** @internal */
  17. const kDocuments = Symbol('documents');
  18. /** @internal */
  19. const kServer = Symbol('server');
  20. /** @internal */
  21. const kNamespace = Symbol('namespace');
  22. /** @internal */
  23. const kClient = Symbol('client');
  24. /** @internal */
  25. const kSession = Symbol('session');
  26. /** @internal */
  27. const kOptions = Symbol('options');
  28. /** @internal */
  29. const kTransform = Symbol('transform');
  30. /** @internal */
  31. const kInitialized = Symbol('initialized');
  32. /** @internal */
  33. const kClosed = Symbol('closed');
  34. /** @internal */
  35. const kKilled = Symbol('killed');
  36. /** @internal */
  37. const kInit = Symbol('kInit');
  38. /** @public */
  39. exports.CURSOR_FLAGS = [
  40. 'tailable',
  41. 'oplogReplay',
  42. 'noCursorTimeout',
  43. 'awaitData',
  44. 'exhaust',
  45. 'partial'
  46. ];
  47. /** @public */
  48. class AbstractCursor extends mongo_types_1.TypedEventEmitter {
  49. /** @internal */
  50. constructor(client, namespace, options = {}) {
  51. super();
  52. if (!client.s.isMongoClient) {
  53. throw new error_1.MongoRuntimeError('Cursor must be constructed with MongoClient');
  54. }
  55. this[kClient] = client;
  56. this[kNamespace] = namespace;
  57. this[kDocuments] = []; // TODO: https://github.com/microsoft/TypeScript/issues/36230
  58. this[kInitialized] = false;
  59. this[kClosed] = false;
  60. this[kKilled] = false;
  61. this[kOptions] = {
  62. readPreference: options.readPreference && options.readPreference instanceof read_preference_1.ReadPreference
  63. ? options.readPreference
  64. : read_preference_1.ReadPreference.primary,
  65. ...(0, bson_1.pluckBSONSerializeOptions)(options)
  66. };
  67. const readConcern = read_concern_1.ReadConcern.fromOptions(options);
  68. if (readConcern) {
  69. this[kOptions].readConcern = readConcern;
  70. }
  71. if (typeof options.batchSize === 'number') {
  72. this[kOptions].batchSize = options.batchSize;
  73. }
  74. // we check for undefined specifically here to allow falsy values
  75. // eslint-disable-next-line no-restricted-syntax
  76. if (options.comment !== undefined) {
  77. this[kOptions].comment = options.comment;
  78. }
  79. if (typeof options.maxTimeMS === 'number') {
  80. this[kOptions].maxTimeMS = options.maxTimeMS;
  81. }
  82. if (options.session instanceof sessions_1.ClientSession) {
  83. this[kSession] = options.session;
  84. }
  85. }
  86. get id() {
  87. return this[kId];
  88. }
  89. /** @internal */
  90. get client() {
  91. return this[kClient];
  92. }
  93. /** @internal */
  94. get server() {
  95. return this[kServer];
  96. }
  97. get namespace() {
  98. return this[kNamespace];
  99. }
  100. get readPreference() {
  101. return this[kOptions].readPreference;
  102. }
  103. get readConcern() {
  104. return this[kOptions].readConcern;
  105. }
  106. /** @internal */
  107. get session() {
  108. return this[kSession];
  109. }
  110. set session(clientSession) {
  111. this[kSession] = clientSession;
  112. }
  113. /** @internal */
  114. get cursorOptions() {
  115. return this[kOptions];
  116. }
  117. get closed() {
  118. return this[kClosed];
  119. }
  120. get killed() {
  121. return this[kKilled];
  122. }
  123. get loadBalanced() {
  124. var _a;
  125. return !!((_a = this[kClient].topology) === null || _a === void 0 ? void 0 : _a.loadBalanced);
  126. }
  127. /** Returns current buffered documents length */
  128. bufferedCount() {
  129. return this[kDocuments].length;
  130. }
  131. /** Returns current buffered documents */
  132. readBufferedDocuments(number) {
  133. return this[kDocuments].splice(0, number !== null && number !== void 0 ? number : this[kDocuments].length);
  134. }
  135. [Symbol.asyncIterator]() {
  136. return {
  137. next: () => this.next().then(value => value != null ? { value, done: false } : { value: undefined, done: true })
  138. };
  139. }
  140. stream(options) {
  141. if (options === null || options === void 0 ? void 0 : options.transform) {
  142. const transform = options.transform;
  143. const readable = makeCursorStream(this);
  144. return readable.pipe(new stream_1.Transform({
  145. objectMode: true,
  146. highWaterMark: 1,
  147. transform(chunk, _, callback) {
  148. try {
  149. const transformed = transform(chunk);
  150. callback(undefined, transformed);
  151. }
  152. catch (err) {
  153. callback(err);
  154. }
  155. }
  156. }));
  157. }
  158. return makeCursorStream(this);
  159. }
  160. hasNext(callback) {
  161. return (0, utils_1.maybePromise)(callback, done => {
  162. if (this[kId] === bson_1.Long.ZERO) {
  163. return done(undefined, false);
  164. }
  165. if (this[kDocuments].length) {
  166. return done(undefined, true);
  167. }
  168. next(this, true, (err, doc) => {
  169. if (err)
  170. return done(err);
  171. if (doc) {
  172. this[kDocuments].unshift(doc);
  173. done(undefined, true);
  174. return;
  175. }
  176. done(undefined, false);
  177. });
  178. });
  179. }
  180. next(callback) {
  181. return (0, utils_1.maybePromise)(callback, done => {
  182. if (this[kId] === bson_1.Long.ZERO) {
  183. return done(new error_1.MongoCursorExhaustedError());
  184. }
  185. next(this, true, done);
  186. });
  187. }
  188. tryNext(callback) {
  189. return (0, utils_1.maybePromise)(callback, done => {
  190. if (this[kId] === bson_1.Long.ZERO) {
  191. return done(new error_1.MongoCursorExhaustedError());
  192. }
  193. next(this, false, done);
  194. });
  195. }
  196. forEach(iterator, callback) {
  197. if (typeof iterator !== 'function') {
  198. throw new error_1.MongoInvalidArgumentError('Argument "iterator" must be a function');
  199. }
  200. return (0, utils_1.maybePromise)(callback, done => {
  201. const transform = this[kTransform];
  202. const fetchDocs = () => {
  203. next(this, true, (err, doc) => {
  204. if (err || doc == null)
  205. return done(err);
  206. let result;
  207. // NOTE: no need to transform because `next` will do this automatically
  208. try {
  209. result = iterator(doc); // TODO(NODE-3283): Improve transform typing
  210. }
  211. catch (error) {
  212. return done(error);
  213. }
  214. if (result === false)
  215. return done();
  216. // these do need to be transformed since they are copying the rest of the batch
  217. const internalDocs = this[kDocuments].splice(0, this[kDocuments].length);
  218. for (let i = 0; i < internalDocs.length; ++i) {
  219. try {
  220. result = iterator((transform ? transform(internalDocs[i]) : internalDocs[i]) // TODO(NODE-3283): Improve transform typing
  221. );
  222. }
  223. catch (error) {
  224. return done(error);
  225. }
  226. if (result === false)
  227. return done();
  228. }
  229. fetchDocs();
  230. });
  231. };
  232. fetchDocs();
  233. });
  234. }
  235. close(options, callback) {
  236. if (typeof options === 'function')
  237. (callback = options), (options = {});
  238. options = options !== null && options !== void 0 ? options : {};
  239. const needsToEmitClosed = !this[kClosed];
  240. this[kClosed] = true;
  241. return (0, utils_1.maybePromise)(callback, done => cleanupCursor(this, { needsToEmitClosed }, done));
  242. }
  243. toArray(callback) {
  244. return (0, utils_1.maybePromise)(callback, done => {
  245. const docs = [];
  246. const transform = this[kTransform];
  247. const fetchDocs = () => {
  248. // NOTE: if we add a `nextBatch` then we should use it here
  249. next(this, true, (err, doc) => {
  250. if (err)
  251. return done(err);
  252. if (doc == null)
  253. return done(undefined, docs);
  254. // NOTE: no need to transform because `next` will do this automatically
  255. docs.push(doc);
  256. // these do need to be transformed since they are copying the rest of the batch
  257. const internalDocs = (transform
  258. ? this[kDocuments].splice(0, this[kDocuments].length).map(transform)
  259. : this[kDocuments].splice(0, this[kDocuments].length)); // TODO(NODE-3283): Improve transform typing
  260. if (internalDocs) {
  261. docs.push(...internalDocs);
  262. }
  263. fetchDocs();
  264. });
  265. };
  266. fetchDocs();
  267. });
  268. }
  269. /**
  270. * Add a cursor flag to the cursor
  271. *
  272. * @param flag - The flag to set, must be one of following ['tailable', 'oplogReplay', 'noCursorTimeout', 'awaitData', 'partial' -.
  273. * @param value - The flag boolean value.
  274. */
  275. addCursorFlag(flag, value) {
  276. assertUninitialized(this);
  277. if (!exports.CURSOR_FLAGS.includes(flag)) {
  278. throw new error_1.MongoInvalidArgumentError(`Flag ${flag} is not one of ${exports.CURSOR_FLAGS}`);
  279. }
  280. if (typeof value !== 'boolean') {
  281. throw new error_1.MongoInvalidArgumentError(`Flag ${flag} must be a boolean value`);
  282. }
  283. this[kOptions][flag] = value;
  284. return this;
  285. }
  286. /**
  287. * Map all documents using the provided function
  288. * If there is a transform set on the cursor, that will be called first and the result passed to
  289. * this function's transform.
  290. *
  291. * @remarks
  292. * **Note for Typescript Users:** adding a transform changes the return type of the iteration of this cursor,
  293. * it **does not** return a new instance of a cursor. This means when calling map,
  294. * you should always assign the result to a new variable in order to get a correctly typed cursor variable.
  295. * Take note of the following example:
  296. *
  297. * @example
  298. * ```typescript
  299. * const cursor: FindCursor<Document> = coll.find();
  300. * const mappedCursor: FindCursor<number> = cursor.map(doc => Object.keys(doc).length);
  301. * const keyCounts: number[] = await mappedCursor.toArray(); // cursor.toArray() still returns Document[]
  302. * ```
  303. * @param transform - The mapping transformation method.
  304. */
  305. map(transform) {
  306. assertUninitialized(this);
  307. const oldTransform = this[kTransform]; // TODO(NODE-3283): Improve transform typing
  308. if (oldTransform) {
  309. this[kTransform] = doc => {
  310. return transform(oldTransform(doc));
  311. };
  312. }
  313. else {
  314. this[kTransform] = transform;
  315. }
  316. return this;
  317. }
  318. /**
  319. * Set the ReadPreference for the cursor.
  320. *
  321. * @param readPreference - The new read preference for the cursor.
  322. */
  323. withReadPreference(readPreference) {
  324. assertUninitialized(this);
  325. if (readPreference instanceof read_preference_1.ReadPreference) {
  326. this[kOptions].readPreference = readPreference;
  327. }
  328. else if (typeof readPreference === 'string') {
  329. this[kOptions].readPreference = read_preference_1.ReadPreference.fromString(readPreference);
  330. }
  331. else {
  332. throw new error_1.MongoInvalidArgumentError(`Invalid read preference: ${readPreference}`);
  333. }
  334. return this;
  335. }
  336. /**
  337. * Set the ReadPreference for the cursor.
  338. *
  339. * @param readPreference - The new read preference for the cursor.
  340. */
  341. withReadConcern(readConcern) {
  342. assertUninitialized(this);
  343. const resolvedReadConcern = read_concern_1.ReadConcern.fromOptions({ readConcern });
  344. if (resolvedReadConcern) {
  345. this[kOptions].readConcern = resolvedReadConcern;
  346. }
  347. return this;
  348. }
  349. /**
  350. * Set a maxTimeMS on the cursor query, allowing for hard timeout limits on queries (Only supported on MongoDB 2.6 or higher)
  351. *
  352. * @param value - Number of milliseconds to wait before aborting the query.
  353. */
  354. maxTimeMS(value) {
  355. assertUninitialized(this);
  356. if (typeof value !== 'number') {
  357. throw new error_1.MongoInvalidArgumentError('Argument for maxTimeMS must be a number');
  358. }
  359. this[kOptions].maxTimeMS = value;
  360. return this;
  361. }
  362. /**
  363. * Set the batch size for the cursor.
  364. *
  365. * @param value - The number of documents to return per batch. See {@link https://docs.mongodb.com/manual/reference/command/find/|find command documentation}.
  366. */
  367. batchSize(value) {
  368. assertUninitialized(this);
  369. if (this[kOptions].tailable) {
  370. throw new error_1.MongoTailableCursorError('Tailable cursor does not support batchSize');
  371. }
  372. if (typeof value !== 'number') {
  373. throw new error_1.MongoInvalidArgumentError('Operation "batchSize" requires an integer');
  374. }
  375. this[kOptions].batchSize = value;
  376. return this;
  377. }
  378. /**
  379. * Rewind this cursor to its uninitialized state. Any options that are present on the cursor will
  380. * remain in effect. Iterating this cursor will cause new queries to be sent to the server, even
  381. * if the resultant data has already been retrieved by this cursor.
  382. */
  383. rewind() {
  384. if (!this[kInitialized]) {
  385. return;
  386. }
  387. this[kId] = undefined;
  388. this[kDocuments] = [];
  389. this[kClosed] = false;
  390. this[kKilled] = false;
  391. this[kInitialized] = false;
  392. const session = this[kSession];
  393. if (session) {
  394. // We only want to end this session if we created it, and it hasn't ended yet
  395. if (session.explicit === false && !session.hasEnded) {
  396. session.endSession();
  397. }
  398. this[kSession] = undefined;
  399. }
  400. }
  401. /** @internal */
  402. _getMore(batchSize, callback) {
  403. const cursorId = this[kId];
  404. const cursorNs = this[kNamespace];
  405. const server = this[kServer];
  406. if (cursorId == null) {
  407. callback(new error_1.MongoRuntimeError('Unable to iterate cursor with no id'));
  408. return;
  409. }
  410. if (server == null) {
  411. callback(new error_1.MongoRuntimeError('Unable to iterate cursor without selected server'));
  412. return;
  413. }
  414. const getMoreOperation = new get_more_1.GetMoreOperation(cursorNs, cursorId, server, {
  415. ...this[kOptions],
  416. session: this[kSession],
  417. batchSize
  418. });
  419. (0, execute_operation_1.executeOperation)(this[kClient], getMoreOperation, callback);
  420. }
  421. /**
  422. * @internal
  423. *
  424. * This function is exposed for the unified test runner's createChangeStream
  425. * operation. We cannot refactor to use the abstract _initialize method without
  426. * a significant refactor.
  427. */
  428. [kInit](callback) {
  429. var _a, _b, _c, _d;
  430. if (this[kSession] == null) {
  431. if ((_a = this[kClient].topology) === null || _a === void 0 ? void 0 : _a.shouldCheckForSessionSupport()) {
  432. return (_b = this[kClient].topology) === null || _b === void 0 ? void 0 : _b.selectServer(read_preference_1.ReadPreference.primaryPreferred, {}, err => {
  433. if (err)
  434. return callback(err);
  435. return this[kInit](callback);
  436. });
  437. }
  438. else if ((_c = this[kClient].topology) === null || _c === void 0 ? void 0 : _c.hasSessionSupport()) {
  439. this[kSession] = (_d = this[kClient].topology) === null || _d === void 0 ? void 0 : _d.startSession({ owner: this, explicit: false });
  440. }
  441. }
  442. this._initialize(this[kSession], (err, state) => {
  443. if (state) {
  444. const response = state.response;
  445. this[kServer] = state.server;
  446. this[kSession] = state.session;
  447. if (response.cursor) {
  448. this[kId] =
  449. typeof response.cursor.id === 'number'
  450. ? bson_1.Long.fromNumber(response.cursor.id)
  451. : response.cursor.id;
  452. if (response.cursor.ns) {
  453. this[kNamespace] = (0, utils_1.ns)(response.cursor.ns);
  454. }
  455. this[kDocuments] = response.cursor.firstBatch;
  456. }
  457. // When server responses return without a cursor document, we close this cursor
  458. // and return the raw server response. This is often the case for explain commands
  459. // for example
  460. if (this[kId] == null) {
  461. this[kId] = bson_1.Long.ZERO;
  462. // TODO(NODE-3286): ExecutionResult needs to accept a generic parameter
  463. this[kDocuments] = [state.response];
  464. }
  465. }
  466. // the cursor is now initialized, even if an error occurred or it is dead
  467. this[kInitialized] = true;
  468. if (err || cursorIsDead(this)) {
  469. return cleanupCursor(this, { error: err }, () => callback(err, nextDocument(this)));
  470. }
  471. callback();
  472. });
  473. }
  474. }
  475. exports.AbstractCursor = AbstractCursor;
  476. /** @event */
  477. AbstractCursor.CLOSE = 'close';
  478. function nextDocument(cursor) {
  479. if (cursor[kDocuments] == null || !cursor[kDocuments].length) {
  480. return null;
  481. }
  482. const doc = cursor[kDocuments].shift();
  483. if (doc) {
  484. const transform = cursor[kTransform];
  485. if (transform) {
  486. return transform(doc);
  487. }
  488. return doc;
  489. }
  490. return null;
  491. }
  492. function next(cursor, blocking, callback) {
  493. const cursorId = cursor[kId];
  494. if (cursor.closed) {
  495. return callback(undefined, null);
  496. }
  497. if (cursor[kDocuments] && cursor[kDocuments].length) {
  498. callback(undefined, nextDocument(cursor));
  499. return;
  500. }
  501. if (cursorId == null) {
  502. // All cursors must operate within a session, one must be made implicitly if not explicitly provided
  503. cursor[kInit]((err, value) => {
  504. if (err)
  505. return callback(err);
  506. if (value) {
  507. return callback(undefined, value);
  508. }
  509. return next(cursor, blocking, callback);
  510. });
  511. return;
  512. }
  513. if (cursorIsDead(cursor)) {
  514. return cleanupCursor(cursor, undefined, () => callback(undefined, null));
  515. }
  516. // otherwise need to call getMore
  517. const batchSize = cursor[kOptions].batchSize || 1000;
  518. cursor._getMore(batchSize, (err, response) => {
  519. if (response) {
  520. const cursorId = typeof response.cursor.id === 'number'
  521. ? bson_1.Long.fromNumber(response.cursor.id)
  522. : response.cursor.id;
  523. cursor[kDocuments] = response.cursor.nextBatch;
  524. cursor[kId] = cursorId;
  525. }
  526. if (err || cursorIsDead(cursor)) {
  527. return cleanupCursor(cursor, { error: err }, () => callback(err, nextDocument(cursor)));
  528. }
  529. if (cursor[kDocuments].length === 0 && blocking === false) {
  530. return callback(undefined, null);
  531. }
  532. next(cursor, blocking, callback);
  533. });
  534. }
  535. function cursorIsDead(cursor) {
  536. const cursorId = cursor[kId];
  537. return !!cursorId && cursorId.isZero();
  538. }
  539. function cleanupCursor(cursor, options, callback) {
  540. var _a;
  541. const cursorId = cursor[kId];
  542. const cursorNs = cursor[kNamespace];
  543. const server = cursor[kServer];
  544. const session = cursor[kSession];
  545. const error = options === null || options === void 0 ? void 0 : options.error;
  546. const needsToEmitClosed = (_a = options === null || options === void 0 ? void 0 : options.needsToEmitClosed) !== null && _a !== void 0 ? _a : cursor[kDocuments].length === 0;
  547. if (error) {
  548. if (cursor.loadBalanced && error instanceof error_1.MongoNetworkError) {
  549. return completeCleanup();
  550. }
  551. }
  552. if (cursorId == null || server == null || cursorId.isZero() || cursorNs == null) {
  553. if (needsToEmitClosed) {
  554. cursor[kClosed] = true;
  555. cursor[kId] = bson_1.Long.ZERO;
  556. cursor.emit(AbstractCursor.CLOSE);
  557. }
  558. if (session) {
  559. if (session.owner === cursor) {
  560. return session.endSession({ error }, callback);
  561. }
  562. if (!session.inTransaction()) {
  563. (0, sessions_1.maybeClearPinnedConnection)(session, { error });
  564. }
  565. }
  566. return callback();
  567. }
  568. function completeCleanup() {
  569. if (session) {
  570. if (session.owner === cursor) {
  571. return session.endSession({ error }, () => {
  572. cursor.emit(AbstractCursor.CLOSE);
  573. callback();
  574. });
  575. }
  576. if (!session.inTransaction()) {
  577. (0, sessions_1.maybeClearPinnedConnection)(session, { error });
  578. }
  579. }
  580. cursor.emit(AbstractCursor.CLOSE);
  581. return callback();
  582. }
  583. cursor[kKilled] = true;
  584. server.killCursors(cursorNs, [cursorId], { ...(0, bson_1.pluckBSONSerializeOptions)(cursor[kOptions]), session }, () => completeCleanup());
  585. }
  586. /** @internal */
  587. function assertUninitialized(cursor) {
  588. if (cursor[kInitialized]) {
  589. throw new error_1.MongoCursorInUseError();
  590. }
  591. }
  592. exports.assertUninitialized = assertUninitialized;
  593. function makeCursorStream(cursor) {
  594. const readable = new stream_1.Readable({
  595. objectMode: true,
  596. autoDestroy: false,
  597. highWaterMark: 1
  598. });
  599. let initialized = false;
  600. let reading = false;
  601. let needToClose = true; // NOTE: we must close the cursor if we never read from it, use `_construct` in future node versions
  602. readable._read = function () {
  603. if (initialized === false) {
  604. needToClose = false;
  605. initialized = true;
  606. }
  607. if (!reading) {
  608. reading = true;
  609. readNext();
  610. }
  611. };
  612. readable._destroy = function (error, cb) {
  613. if (needToClose) {
  614. cursor.close(err => process.nextTick(cb, err || error));
  615. }
  616. else {
  617. cb(error);
  618. }
  619. };
  620. function readNext() {
  621. needToClose = false;
  622. next(cursor, true, (err, result) => {
  623. needToClose = err ? !cursor.closed : result != null;
  624. if (err) {
  625. // NOTE: This is questionable, but we have a test backing the behavior. It seems the
  626. // desired behavior is that a stream ends cleanly when a user explicitly closes
  627. // a client during iteration. Alternatively, we could do the "right" thing and
  628. // propagate the error message by removing this special case.
  629. if (err.message.match(/server is closed/)) {
  630. cursor.close();
  631. return readable.push(null);
  632. }
  633. // NOTE: This is also perhaps questionable. The rationale here is that these errors tend
  634. // to be "operation interrupted", where a cursor has been closed but there is an
  635. // active getMore in-flight. This used to check if the cursor was killed but once
  636. // that changed to happen in cleanup legitimate errors would not destroy the
  637. // stream. There are change streams test specifically test these cases.
  638. if (err.message.match(/interrupted/)) {
  639. return readable.push(null);
  640. }
  641. return readable.destroy(err);
  642. }
  643. if (result == null) {
  644. readable.push(null);
  645. }
  646. else if (readable.destroyed) {
  647. cursor.close();
  648. }
  649. else {
  650. if (readable.push(result)) {
  651. return readNext();
  652. }
  653. reading = false;
  654. }
  655. });
  656. }
  657. return readable;
  658. }
  659. //# sourceMappingURL=abstract_cursor.js.map