sessions.js 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748
  1. "use strict";
  2. var _a;
  3. Object.defineProperty(exports, "__esModule", { value: true });
  4. exports.updateSessionFromResponse = exports.applySession = exports.ServerSessionPool = exports.ServerSession = exports.maybeClearPinnedConnection = exports.ClientSession = void 0;
  5. const bson_1 = require("./bson");
  6. const metrics_1 = require("./cmap/metrics");
  7. const shared_1 = require("./cmap/wire_protocol/shared");
  8. const constants_1 = require("./constants");
  9. const error_1 = require("./error");
  10. const mongo_types_1 = require("./mongo_types");
  11. const execute_operation_1 = require("./operations/execute_operation");
  12. const run_command_1 = require("./operations/run_command");
  13. const promise_provider_1 = require("./promise_provider");
  14. const read_concern_1 = require("./read_concern");
  15. const read_preference_1 = require("./read_preference");
  16. const common_1 = require("./sdam/common");
  17. const transactions_1 = require("./transactions");
  18. const utils_1 = require("./utils");
  19. const minWireVersionForShardedTransactions = 8;
  20. /** @internal */
  21. const kServerSession = Symbol('serverSession');
  22. /** @internal */
  23. const kSnapshotTime = Symbol('snapshotTime');
  24. /** @internal */
  25. const kSnapshotEnabled = Symbol('snapshotEnabled');
  26. /** @internal */
  27. const kPinnedConnection = Symbol('pinnedConnection');
  28. /** @internal Accumulates total number of increments to add to txnNumber when applying session to command */
  29. const kTxnNumberIncrement = Symbol('txnNumberIncrement');
  30. /**
  31. * A class representing a client session on the server
  32. *
  33. * NOTE: not meant to be instantiated directly.
  34. * @public
  35. */
  36. class ClientSession extends mongo_types_1.TypedEventEmitter {
  37. /**
  38. * Create a client session.
  39. * @internal
  40. * @param client - The current client
  41. * @param sessionPool - The server session pool (Internal Class)
  42. * @param options - Optional settings
  43. * @param clientOptions - Optional settings provided when creating a MongoClient
  44. */
  45. constructor(client, sessionPool, options, clientOptions) {
  46. super();
  47. /** @internal */
  48. this[_a] = false;
  49. if (client == null) {
  50. // TODO(NODE-3483)
  51. throw new error_1.MongoRuntimeError('ClientSession requires a MongoClient');
  52. }
  53. if (sessionPool == null || !(sessionPool instanceof ServerSessionPool)) {
  54. // TODO(NODE-3483)
  55. throw new error_1.MongoRuntimeError('ClientSession requires a ServerSessionPool');
  56. }
  57. options = options !== null && options !== void 0 ? options : {};
  58. if (options.snapshot === true) {
  59. this[kSnapshotEnabled] = true;
  60. if (options.causalConsistency === true) {
  61. throw new error_1.MongoInvalidArgumentError('Properties "causalConsistency" and "snapshot" are mutually exclusive');
  62. }
  63. }
  64. this.client = client;
  65. this.sessionPool = sessionPool;
  66. this.hasEnded = false;
  67. this.clientOptions = clientOptions;
  68. this.explicit = !!options.explicit;
  69. this[kServerSession] = this.explicit ? this.sessionPool.acquire() : null;
  70. this[kTxnNumberIncrement] = 0;
  71. this.supports = {
  72. causalConsistency: options.snapshot !== true && options.causalConsistency !== false
  73. };
  74. this.clusterTime = options.initialClusterTime;
  75. this.operationTime = undefined;
  76. this.owner = options.owner;
  77. this.defaultTransactionOptions = Object.assign({}, options.defaultTransactionOptions);
  78. this.transaction = new transactions_1.Transaction();
  79. }
  80. /** The server id associated with this session */
  81. get id() {
  82. var _b;
  83. return (_b = this[kServerSession]) === null || _b === void 0 ? void 0 : _b.id;
  84. }
  85. get serverSession() {
  86. let serverSession = this[kServerSession];
  87. if (serverSession == null) {
  88. if (this.explicit) {
  89. throw new error_1.MongoRuntimeError('Unexpected null serverSession for an explicit session');
  90. }
  91. if (this.hasEnded) {
  92. throw new error_1.MongoRuntimeError('Unexpected null serverSession for an ended implicit session');
  93. }
  94. serverSession = this.sessionPool.acquire();
  95. this[kServerSession] = serverSession;
  96. }
  97. return serverSession;
  98. }
  99. /** Whether or not this session is configured for snapshot reads */
  100. get snapshotEnabled() {
  101. return this[kSnapshotEnabled];
  102. }
  103. get loadBalanced() {
  104. var _b;
  105. return ((_b = this.client.topology) === null || _b === void 0 ? void 0 : _b.description.type) === common_1.TopologyType.LoadBalanced;
  106. }
  107. /** @internal */
  108. get pinnedConnection() {
  109. return this[kPinnedConnection];
  110. }
  111. /** @internal */
  112. pin(conn) {
  113. if (this[kPinnedConnection]) {
  114. throw TypeError('Cannot pin multiple connections to the same session');
  115. }
  116. this[kPinnedConnection] = conn;
  117. conn.emit(constants_1.PINNED, this.inTransaction() ? metrics_1.ConnectionPoolMetrics.TXN : metrics_1.ConnectionPoolMetrics.CURSOR);
  118. }
  119. /** @internal */
  120. unpin(options) {
  121. if (this.loadBalanced) {
  122. return maybeClearPinnedConnection(this, options);
  123. }
  124. this.transaction.unpinServer();
  125. }
  126. get isPinned() {
  127. return this.loadBalanced ? !!this[kPinnedConnection] : this.transaction.isPinned;
  128. }
  129. endSession(options, callback) {
  130. if (typeof options === 'function')
  131. (callback = options), (options = {});
  132. const finalOptions = { force: true, ...options };
  133. return (0, utils_1.maybePromise)(callback, done => {
  134. if (this.hasEnded) {
  135. maybeClearPinnedConnection(this, finalOptions);
  136. return done();
  137. }
  138. const completeEndSession = () => {
  139. maybeClearPinnedConnection(this, finalOptions);
  140. const serverSession = this[kServerSession];
  141. if (serverSession != null) {
  142. // release the server session back to the pool
  143. this.sessionPool.release(serverSession);
  144. // Make sure a new serverSession never makes it on to the ClientSession
  145. Object.defineProperty(this, kServerSession, {
  146. value: ServerSession.clone(serverSession)
  147. });
  148. }
  149. // mark the session as ended, and emit a signal
  150. this.hasEnded = true;
  151. this.emit('ended', this);
  152. // spec indicates that we should ignore all errors for `endSessions`
  153. done();
  154. };
  155. if (this.inTransaction()) {
  156. // If we've reached endSession and the transaction is still active
  157. // by default we abort it
  158. this.abortTransaction(err => {
  159. if (err)
  160. return done(err);
  161. completeEndSession();
  162. });
  163. return;
  164. }
  165. completeEndSession();
  166. });
  167. }
  168. /**
  169. * Advances the operationTime for a ClientSession.
  170. *
  171. * @param operationTime - the `BSON.Timestamp` of the operation type it is desired to advance to
  172. */
  173. advanceOperationTime(operationTime) {
  174. if (this.operationTime == null) {
  175. this.operationTime = operationTime;
  176. return;
  177. }
  178. if (operationTime.greaterThan(this.operationTime)) {
  179. this.operationTime = operationTime;
  180. }
  181. }
  182. /**
  183. * Advances the clusterTime for a ClientSession to the provided clusterTime of another ClientSession
  184. *
  185. * @param clusterTime - the $clusterTime returned by the server from another session in the form of a document containing the `BSON.Timestamp` clusterTime and signature
  186. */
  187. advanceClusterTime(clusterTime) {
  188. var _b, _c;
  189. if (!clusterTime || typeof clusterTime !== 'object') {
  190. throw new error_1.MongoInvalidArgumentError('input cluster time must be an object');
  191. }
  192. if (!clusterTime.clusterTime || clusterTime.clusterTime._bsontype !== 'Timestamp') {
  193. throw new error_1.MongoInvalidArgumentError('input cluster time "clusterTime" property must be a valid BSON Timestamp');
  194. }
  195. if (!clusterTime.signature ||
  196. ((_b = clusterTime.signature.hash) === null || _b === void 0 ? void 0 : _b._bsontype) !== 'Binary' ||
  197. (typeof clusterTime.signature.keyId !== 'number' &&
  198. ((_c = clusterTime.signature.keyId) === null || _c === void 0 ? void 0 : _c._bsontype) !== 'Long') // apparently we decode the key to number?
  199. ) {
  200. throw new error_1.MongoInvalidArgumentError('input cluster time must have a valid "signature" property with BSON Binary hash and BSON Long keyId');
  201. }
  202. (0, common_1._advanceClusterTime)(this, clusterTime);
  203. }
  204. /**
  205. * Used to determine if this session equals another
  206. *
  207. * @param session - The session to compare to
  208. */
  209. equals(session) {
  210. if (!(session instanceof ClientSession)) {
  211. return false;
  212. }
  213. if (this.id == null || session.id == null) {
  214. return false;
  215. }
  216. return this.id.id.buffer.equals(session.id.id.buffer);
  217. }
  218. /**
  219. * Increment the transaction number on the internal ServerSession
  220. *
  221. * @privateRemarks
  222. * This helper increments a value stored on the client session that will be
  223. * added to the serverSession's txnNumber upon applying it to a command.
  224. * This is because the serverSession is lazily acquired after a connection is obtained
  225. */
  226. incrementTransactionNumber() {
  227. this[kTxnNumberIncrement] += 1;
  228. }
  229. /** @returns whether this session is currently in a transaction or not */
  230. inTransaction() {
  231. return this.transaction.isActive;
  232. }
  233. /**
  234. * Starts a new transaction with the given options.
  235. *
  236. * @param options - Options for the transaction
  237. */
  238. startTransaction(options) {
  239. var _b, _c, _d, _e, _f, _g, _h, _j, _k, _l;
  240. if (this[kSnapshotEnabled]) {
  241. throw new error_1.MongoCompatibilityError('Transactions are not allowed with snapshot sessions');
  242. }
  243. if (this.inTransaction()) {
  244. throw new error_1.MongoTransactionError('Transaction already in progress');
  245. }
  246. if (this.isPinned && this.transaction.isCommitted) {
  247. this.unpin();
  248. }
  249. const topologyMaxWireVersion = (0, utils_1.maxWireVersion)(this.client.topology);
  250. if ((0, shared_1.isSharded)(this.client.topology) &&
  251. topologyMaxWireVersion != null &&
  252. topologyMaxWireVersion < minWireVersionForShardedTransactions) {
  253. throw new error_1.MongoCompatibilityError('Transactions are not supported on sharded clusters in MongoDB < 4.2.');
  254. }
  255. // increment txnNumber
  256. this.incrementTransactionNumber();
  257. // create transaction state
  258. this.transaction = new transactions_1.Transaction({
  259. readConcern: (_c = (_b = options === null || options === void 0 ? void 0 : options.readConcern) !== null && _b !== void 0 ? _b : this.defaultTransactionOptions.readConcern) !== null && _c !== void 0 ? _c : (_d = this.clientOptions) === null || _d === void 0 ? void 0 : _d.readConcern,
  260. writeConcern: (_f = (_e = options === null || options === void 0 ? void 0 : options.writeConcern) !== null && _e !== void 0 ? _e : this.defaultTransactionOptions.writeConcern) !== null && _f !== void 0 ? _f : (_g = this.clientOptions) === null || _g === void 0 ? void 0 : _g.writeConcern,
  261. readPreference: (_j = (_h = options === null || options === void 0 ? void 0 : options.readPreference) !== null && _h !== void 0 ? _h : this.defaultTransactionOptions.readPreference) !== null && _j !== void 0 ? _j : (_k = this.clientOptions) === null || _k === void 0 ? void 0 : _k.readPreference,
  262. maxCommitTimeMS: (_l = options === null || options === void 0 ? void 0 : options.maxCommitTimeMS) !== null && _l !== void 0 ? _l : this.defaultTransactionOptions.maxCommitTimeMS
  263. });
  264. this.transaction.transition(transactions_1.TxnState.STARTING_TRANSACTION);
  265. }
  266. commitTransaction(callback) {
  267. return (0, utils_1.maybePromise)(callback, cb => endTransaction(this, 'commitTransaction', cb));
  268. }
  269. abortTransaction(callback) {
  270. return (0, utils_1.maybePromise)(callback, cb => endTransaction(this, 'abortTransaction', cb));
  271. }
  272. /**
  273. * This is here to ensure that ClientSession is never serialized to BSON.
  274. */
  275. toBSON() {
  276. throw new error_1.MongoRuntimeError('ClientSession cannot be serialized to BSON.');
  277. }
  278. /**
  279. * Runs a provided callback within a transaction, retrying either the commitTransaction operation
  280. * or entire transaction as needed (and when the error permits) to better ensure that
  281. * the transaction can complete successfully.
  282. *
  283. * **IMPORTANT:** This method requires the user to return a Promise, and `await` all operations.
  284. * Any callbacks that do not return a Promise will result in undefined behavior.
  285. *
  286. * @remarks
  287. * This function:
  288. * - Will return the command response from the final commitTransaction if every operation is successful (can be used as a truthy object)
  289. * - Will return `undefined` if the transaction is explicitly aborted with `await session.abortTransaction()`
  290. * - Will throw if one of the operations throws or `throw` statement is used inside the `withTransaction` callback
  291. *
  292. * Checkout a descriptive example here:
  293. * @see https://www.mongodb.com/developer/quickstart/node-transactions/
  294. *
  295. * @param fn - callback to run within a transaction
  296. * @param options - optional settings for the transaction
  297. * @returns A raw command response or undefined
  298. */
  299. withTransaction(fn, options) {
  300. const startTime = (0, utils_1.now)();
  301. return attemptTransaction(this, startTime, fn, options);
  302. }
  303. }
  304. exports.ClientSession = ClientSession;
  305. _a = kSnapshotEnabled;
  306. const MAX_WITH_TRANSACTION_TIMEOUT = 120000;
  307. const NON_DETERMINISTIC_WRITE_CONCERN_ERRORS = new Set([
  308. 'CannotSatisfyWriteConcern',
  309. 'UnknownReplWriteConcern',
  310. 'UnsatisfiableWriteConcern'
  311. ]);
  312. function hasNotTimedOut(startTime, max) {
  313. return (0, utils_1.calculateDurationInMs)(startTime) < max;
  314. }
  315. function isUnknownTransactionCommitResult(err) {
  316. const isNonDeterministicWriteConcernError = err instanceof error_1.MongoServerError &&
  317. err.codeName &&
  318. NON_DETERMINISTIC_WRITE_CONCERN_ERRORS.has(err.codeName);
  319. return (isMaxTimeMSExpiredError(err) ||
  320. (!isNonDeterministicWriteConcernError &&
  321. err.code !== error_1.MONGODB_ERROR_CODES.UnsatisfiableWriteConcern &&
  322. err.code !== error_1.MONGODB_ERROR_CODES.UnknownReplWriteConcern));
  323. }
  324. function maybeClearPinnedConnection(session, options) {
  325. // unpin a connection if it has been pinned
  326. const conn = session[kPinnedConnection];
  327. const error = options === null || options === void 0 ? void 0 : options.error;
  328. if (session.inTransaction() &&
  329. error &&
  330. error instanceof error_1.MongoError &&
  331. error.hasErrorLabel(error_1.MongoErrorLabel.TransientTransactionError)) {
  332. return;
  333. }
  334. const topology = session.client.topology;
  335. // NOTE: the spec talks about what to do on a network error only, but the tests seem to
  336. // to validate that we don't unpin on _all_ errors?
  337. if (conn && topology != null) {
  338. const servers = Array.from(topology.s.servers.values());
  339. const loadBalancer = servers[0];
  340. if ((options === null || options === void 0 ? void 0 : options.error) == null || (options === null || options === void 0 ? void 0 : options.force)) {
  341. loadBalancer.s.pool.checkIn(conn);
  342. conn.emit(constants_1.UNPINNED, session.transaction.state !== transactions_1.TxnState.NO_TRANSACTION
  343. ? metrics_1.ConnectionPoolMetrics.TXN
  344. : metrics_1.ConnectionPoolMetrics.CURSOR);
  345. if (options === null || options === void 0 ? void 0 : options.forceClear) {
  346. loadBalancer.s.pool.clear(conn.serviceId);
  347. }
  348. }
  349. session[kPinnedConnection] = undefined;
  350. }
  351. }
  352. exports.maybeClearPinnedConnection = maybeClearPinnedConnection;
  353. function isMaxTimeMSExpiredError(err) {
  354. if (err == null || !(err instanceof error_1.MongoServerError)) {
  355. return false;
  356. }
  357. return (err.code === error_1.MONGODB_ERROR_CODES.MaxTimeMSExpired ||
  358. (err.writeConcernError && err.writeConcernError.code === error_1.MONGODB_ERROR_CODES.MaxTimeMSExpired));
  359. }
  360. function attemptTransactionCommit(session, startTime, fn, options) {
  361. return session.commitTransaction().catch((err) => {
  362. if (err instanceof error_1.MongoError &&
  363. hasNotTimedOut(startTime, MAX_WITH_TRANSACTION_TIMEOUT) &&
  364. !isMaxTimeMSExpiredError(err)) {
  365. if (err.hasErrorLabel(error_1.MongoErrorLabel.UnknownTransactionCommitResult)) {
  366. return attemptTransactionCommit(session, startTime, fn, options);
  367. }
  368. if (err.hasErrorLabel(error_1.MongoErrorLabel.TransientTransactionError)) {
  369. return attemptTransaction(session, startTime, fn, options);
  370. }
  371. }
  372. throw err;
  373. });
  374. }
  375. const USER_EXPLICIT_TXN_END_STATES = new Set([
  376. transactions_1.TxnState.NO_TRANSACTION,
  377. transactions_1.TxnState.TRANSACTION_COMMITTED,
  378. transactions_1.TxnState.TRANSACTION_ABORTED
  379. ]);
  380. function userExplicitlyEndedTransaction(session) {
  381. return USER_EXPLICIT_TXN_END_STATES.has(session.transaction.state);
  382. }
  383. function attemptTransaction(session, startTime, fn, options) {
  384. const Promise = promise_provider_1.PromiseProvider.get();
  385. session.startTransaction(options);
  386. let promise;
  387. try {
  388. promise = fn(session);
  389. }
  390. catch (err) {
  391. promise = Promise.reject(err);
  392. }
  393. if (!(0, utils_1.isPromiseLike)(promise)) {
  394. session.abortTransaction();
  395. throw new error_1.MongoInvalidArgumentError('Function provided to `withTransaction` must return a Promise');
  396. }
  397. return promise.then(() => {
  398. if (userExplicitlyEndedTransaction(session)) {
  399. return;
  400. }
  401. return attemptTransactionCommit(session, startTime, fn, options);
  402. }, err => {
  403. function maybeRetryOrThrow(err) {
  404. if (err instanceof error_1.MongoError &&
  405. err.hasErrorLabel(error_1.MongoErrorLabel.TransientTransactionError) &&
  406. hasNotTimedOut(startTime, MAX_WITH_TRANSACTION_TIMEOUT)) {
  407. return attemptTransaction(session, startTime, fn, options);
  408. }
  409. if (isMaxTimeMSExpiredError(err)) {
  410. err.addErrorLabel(error_1.MongoErrorLabel.UnknownTransactionCommitResult);
  411. }
  412. throw err;
  413. }
  414. if (session.inTransaction()) {
  415. return session.abortTransaction().then(() => maybeRetryOrThrow(err));
  416. }
  417. return maybeRetryOrThrow(err);
  418. });
  419. }
  420. function endTransaction(session, commandName, callback) {
  421. // handle any initial problematic cases
  422. const txnState = session.transaction.state;
  423. if (txnState === transactions_1.TxnState.NO_TRANSACTION) {
  424. callback(new error_1.MongoTransactionError('No transaction started'));
  425. return;
  426. }
  427. if (commandName === 'commitTransaction') {
  428. if (txnState === transactions_1.TxnState.STARTING_TRANSACTION ||
  429. txnState === transactions_1.TxnState.TRANSACTION_COMMITTED_EMPTY) {
  430. // the transaction was never started, we can safely exit here
  431. session.transaction.transition(transactions_1.TxnState.TRANSACTION_COMMITTED_EMPTY);
  432. callback();
  433. return;
  434. }
  435. if (txnState === transactions_1.TxnState.TRANSACTION_ABORTED) {
  436. callback(new error_1.MongoTransactionError('Cannot call commitTransaction after calling abortTransaction'));
  437. return;
  438. }
  439. }
  440. else {
  441. if (txnState === transactions_1.TxnState.STARTING_TRANSACTION) {
  442. // the transaction was never started, we can safely exit here
  443. session.transaction.transition(transactions_1.TxnState.TRANSACTION_ABORTED);
  444. callback();
  445. return;
  446. }
  447. if (txnState === transactions_1.TxnState.TRANSACTION_ABORTED) {
  448. callback(new error_1.MongoTransactionError('Cannot call abortTransaction twice'));
  449. return;
  450. }
  451. if (txnState === transactions_1.TxnState.TRANSACTION_COMMITTED ||
  452. txnState === transactions_1.TxnState.TRANSACTION_COMMITTED_EMPTY) {
  453. callback(new error_1.MongoTransactionError('Cannot call abortTransaction after calling commitTransaction'));
  454. return;
  455. }
  456. }
  457. // construct and send the command
  458. const command = { [commandName]: 1 };
  459. // apply a writeConcern if specified
  460. let writeConcern;
  461. if (session.transaction.options.writeConcern) {
  462. writeConcern = Object.assign({}, session.transaction.options.writeConcern);
  463. }
  464. else if (session.clientOptions && session.clientOptions.writeConcern) {
  465. writeConcern = { w: session.clientOptions.writeConcern.w };
  466. }
  467. if (txnState === transactions_1.TxnState.TRANSACTION_COMMITTED) {
  468. writeConcern = Object.assign({ wtimeout: 10000 }, writeConcern, { w: 'majority' });
  469. }
  470. if (writeConcern) {
  471. Object.assign(command, { writeConcern });
  472. }
  473. if (commandName === 'commitTransaction' && session.transaction.options.maxTimeMS) {
  474. Object.assign(command, { maxTimeMS: session.transaction.options.maxTimeMS });
  475. }
  476. function commandHandler(error, result) {
  477. if (commandName !== 'commitTransaction') {
  478. session.transaction.transition(transactions_1.TxnState.TRANSACTION_ABORTED);
  479. if (session.loadBalanced) {
  480. maybeClearPinnedConnection(session, { force: false });
  481. }
  482. // The spec indicates that we should ignore all errors on `abortTransaction`
  483. return callback();
  484. }
  485. session.transaction.transition(transactions_1.TxnState.TRANSACTION_COMMITTED);
  486. if (error instanceof error_1.MongoError) {
  487. if (error.hasErrorLabel(error_1.MongoErrorLabel.RetryableWriteError) ||
  488. error instanceof error_1.MongoWriteConcernError ||
  489. isMaxTimeMSExpiredError(error)) {
  490. if (isUnknownTransactionCommitResult(error)) {
  491. error.addErrorLabel(error_1.MongoErrorLabel.UnknownTransactionCommitResult);
  492. // per txns spec, must unpin session in this case
  493. session.unpin({ error });
  494. }
  495. }
  496. else if (error.hasErrorLabel(error_1.MongoErrorLabel.TransientTransactionError)) {
  497. session.unpin({ error });
  498. }
  499. }
  500. callback(error, result);
  501. }
  502. if (session.transaction.recoveryToken) {
  503. command.recoveryToken = session.transaction.recoveryToken;
  504. }
  505. // send the command
  506. (0, execute_operation_1.executeOperation)(session.client, new run_command_1.RunAdminCommandOperation(undefined, command, {
  507. session,
  508. readPreference: read_preference_1.ReadPreference.primary,
  509. bypassPinningCheck: true
  510. }), (error, result) => {
  511. if (command.abortTransaction) {
  512. // always unpin on abort regardless of command outcome
  513. session.unpin();
  514. }
  515. if (error instanceof error_1.MongoError && error.hasErrorLabel(error_1.MongoErrorLabel.RetryableWriteError)) {
  516. // SPEC-1185: apply majority write concern when retrying commitTransaction
  517. if (command.commitTransaction) {
  518. // per txns spec, must unpin session in this case
  519. session.unpin({ force: true });
  520. command.writeConcern = Object.assign({ wtimeout: 10000 }, command.writeConcern, {
  521. w: 'majority'
  522. });
  523. }
  524. return (0, execute_operation_1.executeOperation)(session.client, new run_command_1.RunAdminCommandOperation(undefined, command, {
  525. session,
  526. readPreference: read_preference_1.ReadPreference.primary,
  527. bypassPinningCheck: true
  528. }), commandHandler);
  529. }
  530. commandHandler(error, result);
  531. });
  532. }
  533. /**
  534. * Reflects the existence of a session on the server. Can be reused by the session pool.
  535. * WARNING: not meant to be instantiated directly. For internal use only.
  536. * @public
  537. */
  538. class ServerSession {
  539. /** @internal */
  540. constructor() {
  541. this.id = { id: new bson_1.Binary((0, utils_1.uuidV4)(), bson_1.Binary.SUBTYPE_UUID) };
  542. this.lastUse = (0, utils_1.now)();
  543. this.txnNumber = 0;
  544. this.isDirty = false;
  545. }
  546. /**
  547. * Determines if the server session has timed out.
  548. *
  549. * @param sessionTimeoutMinutes - The server's "logicalSessionTimeoutMinutes"
  550. */
  551. hasTimedOut(sessionTimeoutMinutes) {
  552. // Take the difference of the lastUse timestamp and now, which will result in a value in
  553. // milliseconds, and then convert milliseconds to minutes to compare to `sessionTimeoutMinutes`
  554. const idleTimeMinutes = Math.round((((0, utils_1.calculateDurationInMs)(this.lastUse) % 86400000) % 3600000) / 60000);
  555. return idleTimeMinutes > sessionTimeoutMinutes - 1;
  556. }
  557. /**
  558. * @internal
  559. * Cloning meant to keep a readable reference to the server session data
  560. * after ClientSession has ended
  561. */
  562. static clone(serverSession) {
  563. const arrayBuffer = new ArrayBuffer(16);
  564. const idBytes = Buffer.from(arrayBuffer);
  565. idBytes.set(serverSession.id.id.buffer);
  566. const id = new bson_1.Binary(idBytes, serverSession.id.id.sub_type);
  567. // Manual prototype construction to avoid modifying the constructor of this class
  568. return Object.setPrototypeOf({
  569. id: { id },
  570. lastUse: serverSession.lastUse,
  571. txnNumber: serverSession.txnNumber,
  572. isDirty: serverSession.isDirty
  573. }, ServerSession.prototype);
  574. }
  575. }
  576. exports.ServerSession = ServerSession;
  577. /**
  578. * Maintains a pool of Server Sessions.
  579. * For internal use only
  580. * @internal
  581. */
  582. class ServerSessionPool {
  583. constructor(topology) {
  584. if (topology == null) {
  585. throw new error_1.MongoRuntimeError('ServerSessionPool requires a topology');
  586. }
  587. this.topology = topology;
  588. this.sessions = [];
  589. }
  590. /** Ends all sessions in the session pool */
  591. endAllPooledSessions(callback) {
  592. if (this.sessions.length) {
  593. this.topology.endSessions(this.sessions.map((session) => session.id), () => {
  594. this.sessions = [];
  595. if (typeof callback === 'function') {
  596. callback();
  597. }
  598. });
  599. return;
  600. }
  601. if (typeof callback === 'function') {
  602. callback();
  603. }
  604. }
  605. /**
  606. * Acquire a Server Session from the pool.
  607. * Iterates through each session in the pool, removing any stale sessions
  608. * along the way. The first non-stale session found is removed from the
  609. * pool and returned. If no non-stale session is found, a new ServerSession is created.
  610. */
  611. acquire() {
  612. const sessionTimeoutMinutes = this.topology.logicalSessionTimeoutMinutes || 10;
  613. while (this.sessions.length) {
  614. const session = this.sessions.shift();
  615. if (session && (this.topology.loadBalanced || !session.hasTimedOut(sessionTimeoutMinutes))) {
  616. return session;
  617. }
  618. }
  619. return new ServerSession();
  620. }
  621. /**
  622. * Release a session to the session pool
  623. * Adds the session back to the session pool if the session has not timed out yet.
  624. * This method also removes any stale sessions from the pool.
  625. *
  626. * @param session - The session to release to the pool
  627. */
  628. release(session) {
  629. const sessionTimeoutMinutes = this.topology.logicalSessionTimeoutMinutes;
  630. if (this.topology.loadBalanced && !sessionTimeoutMinutes) {
  631. this.sessions.unshift(session);
  632. }
  633. if (!sessionTimeoutMinutes) {
  634. return;
  635. }
  636. while (this.sessions.length) {
  637. const pooledSession = this.sessions[this.sessions.length - 1];
  638. if (pooledSession.hasTimedOut(sessionTimeoutMinutes)) {
  639. this.sessions.pop();
  640. }
  641. else {
  642. break;
  643. }
  644. }
  645. if (!session.hasTimedOut(sessionTimeoutMinutes)) {
  646. if (session.isDirty) {
  647. return;
  648. }
  649. // otherwise, readd this session to the session pool
  650. this.sessions.unshift(session);
  651. }
  652. }
  653. }
  654. exports.ServerSessionPool = ServerSessionPool;
  655. /**
  656. * Optionally decorate a command with sessions specific keys
  657. *
  658. * @param session - the session tracking transaction state
  659. * @param command - the command to decorate
  660. * @param options - Optional settings passed to calling operation
  661. *
  662. * @internal
  663. */
  664. function applySession(session, command, options) {
  665. var _b, _c;
  666. if (session.hasEnded) {
  667. return new error_1.MongoExpiredSessionError();
  668. }
  669. // May acquire serverSession here
  670. const serverSession = session.serverSession;
  671. if (serverSession == null) {
  672. return new error_1.MongoRuntimeError('Unable to acquire server session');
  673. }
  674. if (((_b = options.writeConcern) === null || _b === void 0 ? void 0 : _b.w) === 0) {
  675. if (session && session.explicit) {
  676. // Error if user provided an explicit session to an unacknowledged write (SPEC-1019)
  677. return new error_1.MongoAPIError('Cannot have explicit session with unacknowledged writes');
  678. }
  679. return;
  680. }
  681. // mark the last use of this session, and apply the `lsid`
  682. serverSession.lastUse = (0, utils_1.now)();
  683. command.lsid = serverSession.id;
  684. const inTxnOrTxnCommand = session.inTransaction() || (0, transactions_1.isTransactionCommand)(command);
  685. const isRetryableWrite = !!options.willRetryWrite;
  686. if (isRetryableWrite || inTxnOrTxnCommand) {
  687. serverSession.txnNumber += session[kTxnNumberIncrement];
  688. session[kTxnNumberIncrement] = 0;
  689. command.txnNumber = bson_1.Long.fromNumber(serverSession.txnNumber);
  690. }
  691. if (!inTxnOrTxnCommand) {
  692. if (session.transaction.state !== transactions_1.TxnState.NO_TRANSACTION) {
  693. session.transaction.transition(transactions_1.TxnState.NO_TRANSACTION);
  694. }
  695. if (session.supports.causalConsistency &&
  696. session.operationTime &&
  697. (0, utils_1.commandSupportsReadConcern)(command, options)) {
  698. command.readConcern = command.readConcern || {};
  699. Object.assign(command.readConcern, { afterClusterTime: session.operationTime });
  700. }
  701. else if (session[kSnapshotEnabled]) {
  702. command.readConcern = command.readConcern || { level: read_concern_1.ReadConcernLevel.snapshot };
  703. if (session[kSnapshotTime] != null) {
  704. Object.assign(command.readConcern, { atClusterTime: session[kSnapshotTime] });
  705. }
  706. }
  707. return;
  708. }
  709. // now attempt to apply transaction-specific sessions data
  710. // `autocommit` must always be false to differentiate from retryable writes
  711. command.autocommit = false;
  712. if (session.transaction.state === transactions_1.TxnState.STARTING_TRANSACTION) {
  713. session.transaction.transition(transactions_1.TxnState.TRANSACTION_IN_PROGRESS);
  714. command.startTransaction = true;
  715. const readConcern = session.transaction.options.readConcern || ((_c = session === null || session === void 0 ? void 0 : session.clientOptions) === null || _c === void 0 ? void 0 : _c.readConcern);
  716. if (readConcern) {
  717. command.readConcern = readConcern;
  718. }
  719. if (session.supports.causalConsistency && session.operationTime) {
  720. command.readConcern = command.readConcern || {};
  721. Object.assign(command.readConcern, { afterClusterTime: session.operationTime });
  722. }
  723. }
  724. return;
  725. }
  726. exports.applySession = applySession;
  727. function updateSessionFromResponse(session, document) {
  728. var _b;
  729. if (document.$clusterTime) {
  730. (0, common_1._advanceClusterTime)(session, document.$clusterTime);
  731. }
  732. if (document.operationTime && session && session.supports.causalConsistency) {
  733. session.advanceOperationTime(document.operationTime);
  734. }
  735. if (document.recoveryToken && session && session.inTransaction()) {
  736. session.transaction._recoveryToken = document.recoveryToken;
  737. }
  738. if ((session === null || session === void 0 ? void 0 : session[kSnapshotEnabled]) && session[kSnapshotTime] == null) {
  739. // find and aggregate commands return atClusterTime on the cursor
  740. // distinct includes it in the response body
  741. const atClusterTime = ((_b = document.cursor) === null || _b === void 0 ? void 0 : _b.atClusterTime) || document.atClusterTime;
  742. if (atClusterTime) {
  743. session[kSnapshotTime] = atClusterTime;
  744. }
  745. }
  746. }
  747. exports.updateSessionFromResponse = updateSessionFromResponse;
  748. //# sourceMappingURL=sessions.js.map