abstract_cursor.js 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.assertUninitialized = exports.AbstractCursor = exports.CURSOR_FLAGS = void 0;
  4. const stream_1 = require("stream");
  5. const bson_1 = require("../bson");
  6. const error_1 = require("../error");
  7. const mongo_types_1 = require("../mongo_types");
  8. const execute_operation_1 = require("../operations/execute_operation");
  9. const get_more_1 = require("../operations/get_more");
  10. const read_concern_1 = require("../read_concern");
  11. const read_preference_1 = require("../read_preference");
  12. const sessions_1 = require("../sessions");
  13. const utils_1 = require("../utils");
  14. /** @internal */
  15. const kId = Symbol('id');
  16. /** @internal */
  17. const kDocuments = Symbol('documents');
  18. /** @internal */
  19. const kServer = Symbol('server');
  20. /** @internal */
  21. const kNamespace = Symbol('namespace');
  22. /** @internal */
  23. const kTopology = Symbol('topology');
  24. /** @internal */
  25. const kSession = Symbol('session');
  26. /** @internal */
  27. const kOptions = Symbol('options');
  28. /** @internal */
  29. const kTransform = Symbol('transform');
  30. /** @internal */
  31. const kInitialized = Symbol('initialized');
  32. /** @internal */
  33. const kClosed = Symbol('closed');
  34. /** @internal */
  35. const kKilled = Symbol('killed');
  36. /** @public */
  37. exports.CURSOR_FLAGS = [
  38. 'tailable',
  39. 'oplogReplay',
  40. 'noCursorTimeout',
  41. 'awaitData',
  42. 'exhaust',
  43. 'partial'
  44. ];
  45. /** @public */
  46. class AbstractCursor extends mongo_types_1.TypedEventEmitter {
  47. /** @internal */
  48. constructor(topology, namespace, options = {}) {
  49. super();
  50. this[kTopology] = topology;
  51. this[kNamespace] = namespace;
  52. this[kDocuments] = []; // TODO: https://github.com/microsoft/TypeScript/issues/36230
  53. this[kInitialized] = false;
  54. this[kClosed] = false;
  55. this[kKilled] = false;
  56. this[kOptions] = {
  57. readPreference: options.readPreference && options.readPreference instanceof read_preference_1.ReadPreference
  58. ? options.readPreference
  59. : read_preference_1.ReadPreference.primary,
  60. ...(0, bson_1.pluckBSONSerializeOptions)(options)
  61. };
  62. const readConcern = read_concern_1.ReadConcern.fromOptions(options);
  63. if (readConcern) {
  64. this[kOptions].readConcern = readConcern;
  65. }
  66. if (typeof options.batchSize === 'number') {
  67. this[kOptions].batchSize = options.batchSize;
  68. }
  69. if (options.comment != null) {
  70. this[kOptions].comment = options.comment;
  71. }
  72. if (typeof options.maxTimeMS === 'number') {
  73. this[kOptions].maxTimeMS = options.maxTimeMS;
  74. }
  75. if (options.session instanceof sessions_1.ClientSession) {
  76. this[kSession] = options.session;
  77. }
  78. }
  79. get id() {
  80. return this[kId];
  81. }
  82. /** @internal */
  83. get topology() {
  84. return this[kTopology];
  85. }
  86. /** @internal */
  87. get server() {
  88. return this[kServer];
  89. }
  90. get namespace() {
  91. return this[kNamespace];
  92. }
  93. get readPreference() {
  94. return this[kOptions].readPreference;
  95. }
  96. get readConcern() {
  97. return this[kOptions].readConcern;
  98. }
  99. /** @internal */
  100. get session() {
  101. return this[kSession];
  102. }
  103. set session(clientSession) {
  104. this[kSession] = clientSession;
  105. }
  106. /** @internal */
  107. get cursorOptions() {
  108. return this[kOptions];
  109. }
  110. get closed() {
  111. return this[kClosed];
  112. }
  113. get killed() {
  114. return this[kKilled];
  115. }
  116. get loadBalanced() {
  117. return this[kTopology].loadBalanced;
  118. }
  119. /** Returns current buffered documents length */
  120. bufferedCount() {
  121. return this[kDocuments].length;
  122. }
  123. /** Returns current buffered documents */
  124. readBufferedDocuments(number) {
  125. return this[kDocuments].splice(0, number !== null && number !== void 0 ? number : this[kDocuments].length);
  126. }
  127. [Symbol.asyncIterator]() {
  128. return {
  129. next: () => this.next().then(value => value != null ? { value, done: false } : { value: undefined, done: true })
  130. };
  131. }
  132. stream(options) {
  133. if (options === null || options === void 0 ? void 0 : options.transform) {
  134. const transform = options.transform;
  135. const readable = makeCursorStream(this);
  136. return readable.pipe(new stream_1.Transform({
  137. objectMode: true,
  138. highWaterMark: 1,
  139. transform(chunk, _, callback) {
  140. try {
  141. const transformed = transform(chunk);
  142. callback(undefined, transformed);
  143. }
  144. catch (err) {
  145. callback(err);
  146. }
  147. }
  148. }));
  149. }
  150. return makeCursorStream(this);
  151. }
  152. hasNext(callback) {
  153. return (0, utils_1.maybePromise)(callback, done => {
  154. if (this[kId] === bson_1.Long.ZERO) {
  155. return done(undefined, false);
  156. }
  157. if (this[kDocuments].length) {
  158. return done(undefined, true);
  159. }
  160. next(this, true, (err, doc) => {
  161. if (err)
  162. return done(err);
  163. if (doc) {
  164. this[kDocuments].unshift(doc);
  165. done(undefined, true);
  166. return;
  167. }
  168. done(undefined, false);
  169. });
  170. });
  171. }
  172. next(callback) {
  173. return (0, utils_1.maybePromise)(callback, done => {
  174. if (this[kId] === bson_1.Long.ZERO) {
  175. return done(new error_1.MongoCursorExhaustedError());
  176. }
  177. next(this, true, done);
  178. });
  179. }
  180. tryNext(callback) {
  181. return (0, utils_1.maybePromise)(callback, done => {
  182. if (this[kId] === bson_1.Long.ZERO) {
  183. return done(new error_1.MongoCursorExhaustedError());
  184. }
  185. next(this, false, done);
  186. });
  187. }
  188. forEach(iterator, callback) {
  189. if (typeof iterator !== 'function') {
  190. throw new error_1.MongoInvalidArgumentError('Argument "iterator" must be a function');
  191. }
  192. return (0, utils_1.maybePromise)(callback, done => {
  193. const transform = this[kTransform];
  194. const fetchDocs = () => {
  195. next(this, true, (err, doc) => {
  196. if (err || doc == null)
  197. return done(err);
  198. let result;
  199. // NOTE: no need to transform because `next` will do this automatically
  200. try {
  201. result = iterator(doc); // TODO(NODE-3283): Improve transform typing
  202. }
  203. catch (error) {
  204. return done(error);
  205. }
  206. if (result === false)
  207. return done();
  208. // these do need to be transformed since they are copying the rest of the batch
  209. const internalDocs = this[kDocuments].splice(0, this[kDocuments].length);
  210. for (let i = 0; i < internalDocs.length; ++i) {
  211. try {
  212. result = iterator((transform ? transform(internalDocs[i]) : internalDocs[i]) // TODO(NODE-3283): Improve transform typing
  213. );
  214. }
  215. catch (error) {
  216. return done(error);
  217. }
  218. if (result === false)
  219. return done();
  220. }
  221. fetchDocs();
  222. });
  223. };
  224. fetchDocs();
  225. });
  226. }
  227. close(options, callback) {
  228. if (typeof options === 'function')
  229. (callback = options), (options = {});
  230. options = options !== null && options !== void 0 ? options : {};
  231. const needsToEmitClosed = !this[kClosed];
  232. this[kClosed] = true;
  233. return (0, utils_1.maybePromise)(callback, done => cleanupCursor(this, { needsToEmitClosed }, done));
  234. }
  235. toArray(callback) {
  236. return (0, utils_1.maybePromise)(callback, done => {
  237. const docs = [];
  238. const transform = this[kTransform];
  239. const fetchDocs = () => {
  240. // NOTE: if we add a `nextBatch` then we should use it here
  241. next(this, true, (err, doc) => {
  242. if (err)
  243. return done(err);
  244. if (doc == null)
  245. return done(undefined, docs);
  246. // NOTE: no need to transform because `next` will do this automatically
  247. docs.push(doc);
  248. // these do need to be transformed since they are copying the rest of the batch
  249. const internalDocs = (transform
  250. ? this[kDocuments].splice(0, this[kDocuments].length).map(transform)
  251. : this[kDocuments].splice(0, this[kDocuments].length)); // TODO(NODE-3283): Improve transform typing
  252. if (internalDocs) {
  253. docs.push(...internalDocs);
  254. }
  255. fetchDocs();
  256. });
  257. };
  258. fetchDocs();
  259. });
  260. }
  261. /**
  262. * Add a cursor flag to the cursor
  263. *
  264. * @param flag - The flag to set, must be one of following ['tailable', 'oplogReplay', 'noCursorTimeout', 'awaitData', 'partial' -.
  265. * @param value - The flag boolean value.
  266. */
  267. addCursorFlag(flag, value) {
  268. assertUninitialized(this);
  269. if (!exports.CURSOR_FLAGS.includes(flag)) {
  270. throw new error_1.MongoInvalidArgumentError(`Flag ${flag} is not one of ${exports.CURSOR_FLAGS}`);
  271. }
  272. if (typeof value !== 'boolean') {
  273. throw new error_1.MongoInvalidArgumentError(`Flag ${flag} must be a boolean value`);
  274. }
  275. this[kOptions][flag] = value;
  276. return this;
  277. }
  278. /**
  279. * Map all documents using the provided function
  280. * If there is a transform set on the cursor, that will be called first and the result passed to
  281. * this function's transform.
  282. *
  283. * @remarks
  284. * **Note for Typescript Users:** adding a transform changes the return type of the iteration of this cursor,
  285. * it **does not** return a new instance of a cursor. This means when calling map,
  286. * you should always assign the result to a new variable in order to get a correctly typed cursor variable.
  287. * Take note of the following example:
  288. *
  289. * @example
  290. * ```typescript
  291. * const cursor: FindCursor<Document> = coll.find();
  292. * const mappedCursor: FindCursor<number> = cursor.map(doc => Object.keys(doc).length);
  293. * const keyCounts: number[] = await mappedCursor.toArray(); // cursor.toArray() still returns Document[]
  294. * ```
  295. * @param transform - The mapping transformation method.
  296. */
  297. map(transform) {
  298. assertUninitialized(this);
  299. const oldTransform = this[kTransform]; // TODO(NODE-3283): Improve transform typing
  300. if (oldTransform) {
  301. this[kTransform] = doc => {
  302. return transform(oldTransform(doc));
  303. };
  304. }
  305. else {
  306. this[kTransform] = transform;
  307. }
  308. return this;
  309. }
  310. /**
  311. * Set the ReadPreference for the cursor.
  312. *
  313. * @param readPreference - The new read preference for the cursor.
  314. */
  315. withReadPreference(readPreference) {
  316. assertUninitialized(this);
  317. if (readPreference instanceof read_preference_1.ReadPreference) {
  318. this[kOptions].readPreference = readPreference;
  319. }
  320. else if (typeof readPreference === 'string') {
  321. this[kOptions].readPreference = read_preference_1.ReadPreference.fromString(readPreference);
  322. }
  323. else {
  324. throw new error_1.MongoInvalidArgumentError(`Invalid read preference: ${readPreference}`);
  325. }
  326. return this;
  327. }
  328. /**
  329. * Set the ReadPreference for the cursor.
  330. *
  331. * @param readPreference - The new read preference for the cursor.
  332. */
  333. withReadConcern(readConcern) {
  334. assertUninitialized(this);
  335. const resolvedReadConcern = read_concern_1.ReadConcern.fromOptions({ readConcern });
  336. if (resolvedReadConcern) {
  337. this[kOptions].readConcern = resolvedReadConcern;
  338. }
  339. return this;
  340. }
  341. /**
  342. * Set a maxTimeMS on the cursor query, allowing for hard timeout limits on queries (Only supported on MongoDB 2.6 or higher)
  343. *
  344. * @param value - Number of milliseconds to wait before aborting the query.
  345. */
  346. maxTimeMS(value) {
  347. assertUninitialized(this);
  348. if (typeof value !== 'number') {
  349. throw new error_1.MongoInvalidArgumentError('Argument for maxTimeMS must be a number');
  350. }
  351. this[kOptions].maxTimeMS = value;
  352. return this;
  353. }
  354. /**
  355. * Set the batch size for the cursor.
  356. *
  357. * @param value - The number of documents to return per batch. See {@link https://docs.mongodb.com/manual/reference/command/find/|find command documentation}.
  358. */
  359. batchSize(value) {
  360. assertUninitialized(this);
  361. if (this[kOptions].tailable) {
  362. throw new error_1.MongoTailableCursorError('Tailable cursor does not support batchSize');
  363. }
  364. if (typeof value !== 'number') {
  365. throw new error_1.MongoInvalidArgumentError('Operation "batchSize" requires an integer');
  366. }
  367. this[kOptions].batchSize = value;
  368. return this;
  369. }
  370. /**
  371. * Rewind this cursor to its uninitialized state. Any options that are present on the cursor will
  372. * remain in effect. Iterating this cursor will cause new queries to be sent to the server, even
  373. * if the resultant data has already been retrieved by this cursor.
  374. */
  375. rewind() {
  376. if (!this[kInitialized]) {
  377. return;
  378. }
  379. this[kId] = undefined;
  380. this[kDocuments] = [];
  381. this[kClosed] = false;
  382. this[kKilled] = false;
  383. this[kInitialized] = false;
  384. const session = this[kSession];
  385. if (session) {
  386. // We only want to end this session if we created it, and it hasn't ended yet
  387. if (session.explicit === false && !session.hasEnded) {
  388. session.endSession();
  389. }
  390. this[kSession] = undefined;
  391. }
  392. }
  393. /** @internal */
  394. _getMore(batchSize, callback) {
  395. const cursorId = this[kId];
  396. const cursorNs = this[kNamespace];
  397. const server = this[kServer];
  398. if (cursorId == null) {
  399. callback(new error_1.MongoRuntimeError('Unable to iterate cursor with no id'));
  400. return;
  401. }
  402. if (server == null) {
  403. callback(new error_1.MongoRuntimeError('Unable to iterate cursor without selected server'));
  404. return;
  405. }
  406. const getMoreOperation = new get_more_1.GetMoreOperation(cursorNs, cursorId, server, {
  407. ...this[kOptions],
  408. session: this[kSession],
  409. batchSize
  410. });
  411. (0, execute_operation_1.executeOperation)(this.topology, getMoreOperation, callback);
  412. }
  413. }
  414. exports.AbstractCursor = AbstractCursor;
  415. /** @event */
  416. AbstractCursor.CLOSE = 'close';
  417. function nextDocument(cursor) {
  418. if (cursor[kDocuments] == null || !cursor[kDocuments].length) {
  419. return null;
  420. }
  421. const doc = cursor[kDocuments].shift();
  422. if (doc) {
  423. const transform = cursor[kTransform];
  424. if (transform) {
  425. return transform(doc);
  426. }
  427. return doc;
  428. }
  429. return null;
  430. }
  431. function next(cursor, blocking, callback) {
  432. const cursorId = cursor[kId];
  433. if (cursor.closed) {
  434. return callback(undefined, null);
  435. }
  436. if (cursor[kDocuments] && cursor[kDocuments].length) {
  437. callback(undefined, nextDocument(cursor));
  438. return;
  439. }
  440. if (cursorId == null) {
  441. // All cursors must operate within a session, one must be made implicitly if not explicitly provided
  442. if (cursor[kSession] == null && cursor[kTopology].hasSessionSupport()) {
  443. cursor[kSession] = cursor[kTopology].startSession({ owner: cursor, explicit: false });
  444. }
  445. cursor._initialize(cursor[kSession], (err, state) => {
  446. if (state) {
  447. const response = state.response;
  448. cursor[kServer] = state.server;
  449. cursor[kSession] = state.session;
  450. if (response.cursor) {
  451. cursor[kId] =
  452. typeof response.cursor.id === 'number'
  453. ? bson_1.Long.fromNumber(response.cursor.id)
  454. : response.cursor.id;
  455. if (response.cursor.ns) {
  456. cursor[kNamespace] = (0, utils_1.ns)(response.cursor.ns);
  457. }
  458. cursor[kDocuments] = response.cursor.firstBatch;
  459. }
  460. else {
  461. // NOTE: This is for support of older servers (<3.2) which do not use commands
  462. cursor[kId] =
  463. typeof response.cursorId === 'number'
  464. ? bson_1.Long.fromNumber(response.cursorId)
  465. : response.cursorId;
  466. cursor[kDocuments] = response.documents;
  467. }
  468. // When server responses return without a cursor document, we close this cursor
  469. // and return the raw server response. This is often the case for explain commands
  470. // for example
  471. if (cursor[kId] == null) {
  472. cursor[kId] = bson_1.Long.ZERO;
  473. // TODO(NODE-3286): ExecutionResult needs to accept a generic parameter
  474. cursor[kDocuments] = [state.response];
  475. }
  476. }
  477. // the cursor is now initialized, even if an error occurred or it is dead
  478. cursor[kInitialized] = true;
  479. if (err || cursorIsDead(cursor)) {
  480. return cleanupCursor(cursor, { error: err }, () => callback(err, nextDocument(cursor)));
  481. }
  482. next(cursor, blocking, callback);
  483. });
  484. return;
  485. }
  486. if (cursorIsDead(cursor)) {
  487. return cleanupCursor(cursor, undefined, () => callback(undefined, null));
  488. }
  489. // otherwise need to call getMore
  490. const batchSize = cursor[kOptions].batchSize || 1000;
  491. cursor._getMore(batchSize, (err, response) => {
  492. if (response) {
  493. const cursorId = typeof response.cursor.id === 'number'
  494. ? bson_1.Long.fromNumber(response.cursor.id)
  495. : response.cursor.id;
  496. cursor[kDocuments] = response.cursor.nextBatch;
  497. cursor[kId] = cursorId;
  498. }
  499. if (err || cursorIsDead(cursor)) {
  500. return cleanupCursor(cursor, { error: err }, () => callback(err, nextDocument(cursor)));
  501. }
  502. if (cursor[kDocuments].length === 0 && blocking === false) {
  503. return callback(undefined, null);
  504. }
  505. next(cursor, blocking, callback);
  506. });
  507. }
  508. function cursorIsDead(cursor) {
  509. const cursorId = cursor[kId];
  510. return !!cursorId && cursorId.isZero();
  511. }
  512. function cleanupCursor(cursor, options, callback) {
  513. var _a;
  514. const cursorId = cursor[kId];
  515. const cursorNs = cursor[kNamespace];
  516. const server = cursor[kServer];
  517. const session = cursor[kSession];
  518. const error = options === null || options === void 0 ? void 0 : options.error;
  519. const needsToEmitClosed = (_a = options === null || options === void 0 ? void 0 : options.needsToEmitClosed) !== null && _a !== void 0 ? _a : cursor[kDocuments].length === 0;
  520. if (error) {
  521. if (cursor.loadBalanced && error instanceof error_1.MongoNetworkError) {
  522. return completeCleanup();
  523. }
  524. }
  525. if (cursorId == null || server == null || cursorId.isZero() || cursorNs == null) {
  526. if (needsToEmitClosed) {
  527. cursor[kClosed] = true;
  528. cursor[kId] = bson_1.Long.ZERO;
  529. cursor.emit(AbstractCursor.CLOSE);
  530. }
  531. if (session) {
  532. if (session.owner === cursor) {
  533. return session.endSession({ error }, callback);
  534. }
  535. if (!session.inTransaction()) {
  536. (0, sessions_1.maybeClearPinnedConnection)(session, { error });
  537. }
  538. }
  539. return callback();
  540. }
  541. function completeCleanup() {
  542. if (session) {
  543. if (session.owner === cursor) {
  544. return session.endSession({ error }, () => {
  545. cursor.emit(AbstractCursor.CLOSE);
  546. callback();
  547. });
  548. }
  549. if (!session.inTransaction()) {
  550. (0, sessions_1.maybeClearPinnedConnection)(session, { error });
  551. }
  552. }
  553. cursor.emit(AbstractCursor.CLOSE);
  554. return callback();
  555. }
  556. cursor[kKilled] = true;
  557. server.killCursors(cursorNs, [cursorId], { ...(0, bson_1.pluckBSONSerializeOptions)(cursor[kOptions]), session }, () => completeCleanup());
  558. }
  559. /** @internal */
  560. function assertUninitialized(cursor) {
  561. if (cursor[kInitialized]) {
  562. throw new error_1.MongoCursorInUseError();
  563. }
  564. }
  565. exports.assertUninitialized = assertUninitialized;
  566. function makeCursorStream(cursor) {
  567. const readable = new stream_1.Readable({
  568. objectMode: true,
  569. autoDestroy: false,
  570. highWaterMark: 1
  571. });
  572. let initialized = false;
  573. let reading = false;
  574. let needToClose = true; // NOTE: we must close the cursor if we never read from it, use `_construct` in future node versions
  575. readable._read = function () {
  576. if (initialized === false) {
  577. needToClose = false;
  578. initialized = true;
  579. }
  580. if (!reading) {
  581. reading = true;
  582. readNext();
  583. }
  584. };
  585. readable._destroy = function (error, cb) {
  586. if (needToClose) {
  587. cursor.close(err => process.nextTick(cb, err || error));
  588. }
  589. else {
  590. cb(error);
  591. }
  592. };
  593. function readNext() {
  594. needToClose = false;
  595. next(cursor, true, (err, result) => {
  596. needToClose = err ? !cursor.closed : result != null;
  597. if (err) {
  598. // NOTE: This is questionable, but we have a test backing the behavior. It seems the
  599. // desired behavior is that a stream ends cleanly when a user explicitly closes
  600. // a client during iteration. Alternatively, we could do the "right" thing and
  601. // propagate the error message by removing this special case.
  602. if (err.message.match(/server is closed/)) {
  603. cursor.close();
  604. return readable.push(null);
  605. }
  606. // NOTE: This is also perhaps questionable. The rationale here is that these errors tend
  607. // to be "operation interrupted", where a cursor has been closed but there is an
  608. // active getMore in-flight. This used to check if the cursor was killed but once
  609. // that changed to happen in cleanup legitimate errors would not destroy the
  610. // stream. There are change streams test specifically test these cases.
  611. if (err.message.match(/interrupted/)) {
  612. return readable.push(null);
  613. }
  614. return readable.destroy(err);
  615. }
  616. if (result == null) {
  617. readable.push(null);
  618. }
  619. else if (readable.destroyed) {
  620. cursor.close();
  621. }
  622. else {
  623. if (readable.push(result)) {
  624. return readNext();
  625. }
  626. reading = false;
  627. }
  628. });
  629. }
  630. return readable;
  631. }
  632. //# sourceMappingURL=abstract_cursor.js.map