sessions.js 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711
  1. "use strict";
  2. var _a;
  3. Object.defineProperty(exports, "__esModule", { value: true });
  4. exports.updateSessionFromResponse = exports.applySession = exports.ServerSessionPool = exports.ServerSession = exports.maybeClearPinnedConnection = exports.ClientSession = void 0;
  5. const bson_1 = require("./bson");
  6. const metrics_1 = require("./cmap/metrics");
  7. const shared_1 = require("./cmap/wire_protocol/shared");
  8. const constants_1 = require("./constants");
  9. const error_1 = require("./error");
  10. const mongo_types_1 = require("./mongo_types");
  11. const execute_operation_1 = require("./operations/execute_operation");
  12. const run_command_1 = require("./operations/run_command");
  13. const promise_provider_1 = require("./promise_provider");
  14. const read_concern_1 = require("./read_concern");
  15. const read_preference_1 = require("./read_preference");
  16. const common_1 = require("./sdam/common");
  17. const transactions_1 = require("./transactions");
  18. const utils_1 = require("./utils");
  19. const minWireVersionForShardedTransactions = 8;
  20. function assertAlive(session, callback) {
  21. if (session.serverSession == null) {
  22. const error = new error_1.MongoExpiredSessionError();
  23. if (typeof callback === 'function') {
  24. callback(error);
  25. return false;
  26. }
  27. throw error;
  28. }
  29. return true;
  30. }
  31. /** @internal */
  32. const kServerSession = Symbol('serverSession');
  33. /** @internal */
  34. const kSnapshotTime = Symbol('snapshotTime');
  35. /** @internal */
  36. const kSnapshotEnabled = Symbol('snapshotEnabled');
  37. /** @internal */
  38. const kPinnedConnection = Symbol('pinnedConnection');
  39. /**
  40. * A class representing a client session on the server
  41. *
  42. * NOTE: not meant to be instantiated directly.
  43. * @public
  44. */
  45. class ClientSession extends mongo_types_1.TypedEventEmitter {
  46. /**
  47. * Create a client session.
  48. * @internal
  49. * @param topology - The current client's topology (Internal Class)
  50. * @param sessionPool - The server session pool (Internal Class)
  51. * @param options - Optional settings
  52. * @param clientOptions - Optional settings provided when creating a MongoClient
  53. */
  54. constructor(topology, sessionPool, options, clientOptions) {
  55. super();
  56. /** @internal */
  57. this[_a] = false;
  58. if (topology == null) {
  59. // TODO(NODE-3483)
  60. throw new error_1.MongoRuntimeError('ClientSession requires a topology');
  61. }
  62. if (sessionPool == null || !(sessionPool instanceof ServerSessionPool)) {
  63. // TODO(NODE-3483)
  64. throw new error_1.MongoRuntimeError('ClientSession requires a ServerSessionPool');
  65. }
  66. options = options !== null && options !== void 0 ? options : {};
  67. if (options.snapshot === true) {
  68. this[kSnapshotEnabled] = true;
  69. if (options.causalConsistency === true) {
  70. throw new error_1.MongoInvalidArgumentError('Properties "causalConsistency" and "snapshot" are mutually exclusive');
  71. }
  72. }
  73. this.topology = topology;
  74. this.sessionPool = sessionPool;
  75. this.hasEnded = false;
  76. this.clientOptions = clientOptions;
  77. this[kServerSession] = undefined;
  78. this.supports = {
  79. causalConsistency: options.snapshot !== true && options.causalConsistency !== false
  80. };
  81. this.clusterTime = options.initialClusterTime;
  82. this.operationTime = undefined;
  83. this.explicit = !!options.explicit;
  84. this.owner = options.owner;
  85. this.defaultTransactionOptions = Object.assign({}, options.defaultTransactionOptions);
  86. this.transaction = new transactions_1.Transaction();
  87. }
  88. /** The server id associated with this session */
  89. get id() {
  90. var _b;
  91. return (_b = this.serverSession) === null || _b === void 0 ? void 0 : _b.id;
  92. }
  93. get serverSession() {
  94. if (this[kServerSession] == null) {
  95. this[kServerSession] = this.sessionPool.acquire();
  96. }
  97. // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
  98. return this[kServerSession];
  99. }
  100. /** Whether or not this session is configured for snapshot reads */
  101. get snapshotEnabled() {
  102. return this[kSnapshotEnabled];
  103. }
  104. get loadBalanced() {
  105. return this.topology.description.type === common_1.TopologyType.LoadBalanced;
  106. }
  107. /** @internal */
  108. get pinnedConnection() {
  109. return this[kPinnedConnection];
  110. }
  111. /** @internal */
  112. pin(conn) {
  113. if (this[kPinnedConnection]) {
  114. throw TypeError('Cannot pin multiple connections to the same session');
  115. }
  116. this[kPinnedConnection] = conn;
  117. conn.emit(constants_1.PINNED, this.inTransaction() ? metrics_1.ConnectionPoolMetrics.TXN : metrics_1.ConnectionPoolMetrics.CURSOR);
  118. }
  119. /** @internal */
  120. unpin(options) {
  121. if (this.loadBalanced) {
  122. return maybeClearPinnedConnection(this, options);
  123. }
  124. this.transaction.unpinServer();
  125. }
  126. get isPinned() {
  127. return this.loadBalanced ? !!this[kPinnedConnection] : this.transaction.isPinned;
  128. }
  129. endSession(options, callback) {
  130. if (typeof options === 'function')
  131. (callback = options), (options = {});
  132. const finalOptions = { force: true, ...options };
  133. return (0, utils_1.maybePromise)(callback, done => {
  134. if (this.hasEnded) {
  135. maybeClearPinnedConnection(this, finalOptions);
  136. return done();
  137. }
  138. const completeEndSession = () => {
  139. maybeClearPinnedConnection(this, finalOptions);
  140. // release the server session back to the pool
  141. this.sessionPool.release(this.serverSession);
  142. this[kServerSession] = undefined;
  143. // mark the session as ended, and emit a signal
  144. this.hasEnded = true;
  145. this.emit('ended', this);
  146. // spec indicates that we should ignore all errors for `endSessions`
  147. done();
  148. };
  149. if (this.serverSession && this.inTransaction()) {
  150. this.abortTransaction(err => {
  151. if (err)
  152. return done(err);
  153. completeEndSession();
  154. });
  155. return;
  156. }
  157. completeEndSession();
  158. });
  159. }
  160. /**
  161. * Advances the operationTime for a ClientSession.
  162. *
  163. * @param operationTime - the `BSON.Timestamp` of the operation type it is desired to advance to
  164. */
  165. advanceOperationTime(operationTime) {
  166. if (this.operationTime == null) {
  167. this.operationTime = operationTime;
  168. return;
  169. }
  170. if (operationTime.greaterThan(this.operationTime)) {
  171. this.operationTime = operationTime;
  172. }
  173. }
  174. /**
  175. * Advances the clusterTime for a ClientSession to the provided clusterTime of another ClientSession
  176. *
  177. * @param clusterTime - the $clusterTime returned by the server from another session in the form of a document containing the `BSON.Timestamp` clusterTime and signature
  178. */
  179. advanceClusterTime(clusterTime) {
  180. var _b, _c;
  181. if (!clusterTime || typeof clusterTime !== 'object') {
  182. throw new error_1.MongoInvalidArgumentError('input cluster time must be an object');
  183. }
  184. if (!clusterTime.clusterTime || clusterTime.clusterTime._bsontype !== 'Timestamp') {
  185. throw new error_1.MongoInvalidArgumentError('input cluster time "clusterTime" property must be a valid BSON Timestamp');
  186. }
  187. if (!clusterTime.signature ||
  188. ((_b = clusterTime.signature.hash) === null || _b === void 0 ? void 0 : _b._bsontype) !== 'Binary' ||
  189. (typeof clusterTime.signature.keyId !== 'number' &&
  190. ((_c = clusterTime.signature.keyId) === null || _c === void 0 ? void 0 : _c._bsontype) !== 'Long') // apparently we decode the key to number?
  191. ) {
  192. throw new error_1.MongoInvalidArgumentError('input cluster time must have a valid "signature" property with BSON Binary hash and BSON Long keyId');
  193. }
  194. (0, common_1._advanceClusterTime)(this, clusterTime);
  195. }
  196. /**
  197. * Used to determine if this session equals another
  198. *
  199. * @param session - The session to compare to
  200. */
  201. equals(session) {
  202. if (!(session instanceof ClientSession)) {
  203. return false;
  204. }
  205. if (this.id == null || session.id == null) {
  206. return false;
  207. }
  208. return this.id.id.buffer.equals(session.id.id.buffer);
  209. }
  210. /** Increment the transaction number on the internal ServerSession */
  211. incrementTransactionNumber() {
  212. if (this.serverSession) {
  213. this.serverSession.txnNumber =
  214. typeof this.serverSession.txnNumber === 'number' ? this.serverSession.txnNumber + 1 : 0;
  215. }
  216. }
  217. /** @returns whether this session is currently in a transaction or not */
  218. inTransaction() {
  219. return this.transaction.isActive;
  220. }
  221. /**
  222. * Starts a new transaction with the given options.
  223. *
  224. * @param options - Options for the transaction
  225. */
  226. startTransaction(options) {
  227. var _b, _c, _d, _e, _f, _g, _h, _j, _k, _l;
  228. if (this[kSnapshotEnabled]) {
  229. throw new error_1.MongoCompatibilityError('Transactions are not allowed with snapshot sessions');
  230. }
  231. assertAlive(this);
  232. if (this.inTransaction()) {
  233. throw new error_1.MongoTransactionError('Transaction already in progress');
  234. }
  235. if (this.isPinned && this.transaction.isCommitted) {
  236. this.unpin();
  237. }
  238. const topologyMaxWireVersion = (0, utils_1.maxWireVersion)(this.topology);
  239. if ((0, shared_1.isSharded)(this.topology) &&
  240. topologyMaxWireVersion != null &&
  241. topologyMaxWireVersion < minWireVersionForShardedTransactions) {
  242. throw new error_1.MongoCompatibilityError('Transactions are not supported on sharded clusters in MongoDB < 4.2.');
  243. }
  244. // increment txnNumber
  245. this.incrementTransactionNumber();
  246. // create transaction state
  247. this.transaction = new transactions_1.Transaction({
  248. readConcern: (_c = (_b = options === null || options === void 0 ? void 0 : options.readConcern) !== null && _b !== void 0 ? _b : this.defaultTransactionOptions.readConcern) !== null && _c !== void 0 ? _c : (_d = this.clientOptions) === null || _d === void 0 ? void 0 : _d.readConcern,
  249. writeConcern: (_f = (_e = options === null || options === void 0 ? void 0 : options.writeConcern) !== null && _e !== void 0 ? _e : this.defaultTransactionOptions.writeConcern) !== null && _f !== void 0 ? _f : (_g = this.clientOptions) === null || _g === void 0 ? void 0 : _g.writeConcern,
  250. readPreference: (_j = (_h = options === null || options === void 0 ? void 0 : options.readPreference) !== null && _h !== void 0 ? _h : this.defaultTransactionOptions.readPreference) !== null && _j !== void 0 ? _j : (_k = this.clientOptions) === null || _k === void 0 ? void 0 : _k.readPreference,
  251. maxCommitTimeMS: (_l = options === null || options === void 0 ? void 0 : options.maxCommitTimeMS) !== null && _l !== void 0 ? _l : this.defaultTransactionOptions.maxCommitTimeMS
  252. });
  253. this.transaction.transition(transactions_1.TxnState.STARTING_TRANSACTION);
  254. }
  255. commitTransaction(callback) {
  256. return (0, utils_1.maybePromise)(callback, cb => endTransaction(this, 'commitTransaction', cb));
  257. }
  258. abortTransaction(callback) {
  259. return (0, utils_1.maybePromise)(callback, cb => endTransaction(this, 'abortTransaction', cb));
  260. }
  261. /**
  262. * This is here to ensure that ClientSession is never serialized to BSON.
  263. */
  264. toBSON() {
  265. throw new error_1.MongoRuntimeError('ClientSession cannot be serialized to BSON.');
  266. }
  267. /**
  268. * Runs a provided lambda within a transaction, retrying either the commit operation
  269. * or entire transaction as needed (and when the error permits) to better ensure that
  270. * the transaction can complete successfully.
  271. *
  272. * IMPORTANT: This method requires the user to return a Promise, all lambdas that do not
  273. * return a Promise will result in undefined behavior.
  274. *
  275. * @param fn - A lambda to run within a transaction
  276. * @param options - Optional settings for the transaction
  277. */
  278. withTransaction(fn, options) {
  279. const startTime = (0, utils_1.now)();
  280. return attemptTransaction(this, startTime, fn, options);
  281. }
  282. }
  283. exports.ClientSession = ClientSession;
  284. _a = kSnapshotEnabled;
  285. const MAX_WITH_TRANSACTION_TIMEOUT = 120000;
  286. const NON_DETERMINISTIC_WRITE_CONCERN_ERRORS = new Set([
  287. 'CannotSatisfyWriteConcern',
  288. 'UnknownReplWriteConcern',
  289. 'UnsatisfiableWriteConcern'
  290. ]);
  291. function hasNotTimedOut(startTime, max) {
  292. return (0, utils_1.calculateDurationInMs)(startTime) < max;
  293. }
  294. function isUnknownTransactionCommitResult(err) {
  295. const isNonDeterministicWriteConcernError = err instanceof error_1.MongoServerError &&
  296. err.codeName &&
  297. NON_DETERMINISTIC_WRITE_CONCERN_ERRORS.has(err.codeName);
  298. return (isMaxTimeMSExpiredError(err) ||
  299. (!isNonDeterministicWriteConcernError &&
  300. err.code !== error_1.MONGODB_ERROR_CODES.UnsatisfiableWriteConcern &&
  301. err.code !== error_1.MONGODB_ERROR_CODES.UnknownReplWriteConcern));
  302. }
  303. function maybeClearPinnedConnection(session, options) {
  304. // unpin a connection if it has been pinned
  305. const conn = session[kPinnedConnection];
  306. const error = options === null || options === void 0 ? void 0 : options.error;
  307. if (session.inTransaction() &&
  308. error &&
  309. error instanceof error_1.MongoError &&
  310. error.hasErrorLabel('TransientTransactionError')) {
  311. return;
  312. }
  313. // NOTE: the spec talks about what to do on a network error only, but the tests seem to
  314. // to validate that we don't unpin on _all_ errors?
  315. if (conn) {
  316. const servers = Array.from(session.topology.s.servers.values());
  317. const loadBalancer = servers[0];
  318. if ((options === null || options === void 0 ? void 0 : options.error) == null || (options === null || options === void 0 ? void 0 : options.force)) {
  319. loadBalancer.s.pool.checkIn(conn);
  320. conn.emit(constants_1.UNPINNED, session.transaction.state !== transactions_1.TxnState.NO_TRANSACTION
  321. ? metrics_1.ConnectionPoolMetrics.TXN
  322. : metrics_1.ConnectionPoolMetrics.CURSOR);
  323. if (options === null || options === void 0 ? void 0 : options.forceClear) {
  324. loadBalancer.s.pool.clear(conn.serviceId);
  325. }
  326. }
  327. session[kPinnedConnection] = undefined;
  328. }
  329. }
  330. exports.maybeClearPinnedConnection = maybeClearPinnedConnection;
  331. function isMaxTimeMSExpiredError(err) {
  332. if (err == null || !(err instanceof error_1.MongoServerError)) {
  333. return false;
  334. }
  335. return (err.code === error_1.MONGODB_ERROR_CODES.MaxTimeMSExpired ||
  336. (err.writeConcernError && err.writeConcernError.code === error_1.MONGODB_ERROR_CODES.MaxTimeMSExpired));
  337. }
  338. function attemptTransactionCommit(session, startTime, fn, options) {
  339. return session.commitTransaction().catch((err) => {
  340. if (err instanceof error_1.MongoError &&
  341. hasNotTimedOut(startTime, MAX_WITH_TRANSACTION_TIMEOUT) &&
  342. !isMaxTimeMSExpiredError(err)) {
  343. if (err.hasErrorLabel('UnknownTransactionCommitResult')) {
  344. return attemptTransactionCommit(session, startTime, fn, options);
  345. }
  346. if (err.hasErrorLabel('TransientTransactionError')) {
  347. return attemptTransaction(session, startTime, fn, options);
  348. }
  349. }
  350. throw err;
  351. });
  352. }
  353. const USER_EXPLICIT_TXN_END_STATES = new Set([
  354. transactions_1.TxnState.NO_TRANSACTION,
  355. transactions_1.TxnState.TRANSACTION_COMMITTED,
  356. transactions_1.TxnState.TRANSACTION_ABORTED
  357. ]);
  358. function userExplicitlyEndedTransaction(session) {
  359. return USER_EXPLICIT_TXN_END_STATES.has(session.transaction.state);
  360. }
  361. function attemptTransaction(session, startTime, fn, options) {
  362. const Promise = promise_provider_1.PromiseProvider.get();
  363. session.startTransaction(options);
  364. let promise;
  365. try {
  366. promise = fn(session);
  367. }
  368. catch (err) {
  369. promise = Promise.reject(err);
  370. }
  371. if (!(0, utils_1.isPromiseLike)(promise)) {
  372. session.abortTransaction();
  373. throw new error_1.MongoInvalidArgumentError('Function provided to `withTransaction` must return a Promise');
  374. }
  375. return promise.then(() => {
  376. if (userExplicitlyEndedTransaction(session)) {
  377. return;
  378. }
  379. return attemptTransactionCommit(session, startTime, fn, options);
  380. }, err => {
  381. function maybeRetryOrThrow(err) {
  382. if (err instanceof error_1.MongoError &&
  383. err.hasErrorLabel('TransientTransactionError') &&
  384. hasNotTimedOut(startTime, MAX_WITH_TRANSACTION_TIMEOUT)) {
  385. return attemptTransaction(session, startTime, fn, options);
  386. }
  387. if (isMaxTimeMSExpiredError(err)) {
  388. err.addErrorLabel('UnknownTransactionCommitResult');
  389. }
  390. throw err;
  391. }
  392. if (session.transaction.isActive) {
  393. return session.abortTransaction().then(() => maybeRetryOrThrow(err));
  394. }
  395. return maybeRetryOrThrow(err);
  396. });
  397. }
  398. function endTransaction(session, commandName, callback) {
  399. if (!assertAlive(session, callback)) {
  400. // checking result in case callback was called
  401. return;
  402. }
  403. // handle any initial problematic cases
  404. const txnState = session.transaction.state;
  405. if (txnState === transactions_1.TxnState.NO_TRANSACTION) {
  406. callback(new error_1.MongoTransactionError('No transaction started'));
  407. return;
  408. }
  409. if (commandName === 'commitTransaction') {
  410. if (txnState === transactions_1.TxnState.STARTING_TRANSACTION ||
  411. txnState === transactions_1.TxnState.TRANSACTION_COMMITTED_EMPTY) {
  412. // the transaction was never started, we can safely exit here
  413. session.transaction.transition(transactions_1.TxnState.TRANSACTION_COMMITTED_EMPTY);
  414. callback();
  415. return;
  416. }
  417. if (txnState === transactions_1.TxnState.TRANSACTION_ABORTED) {
  418. callback(new error_1.MongoTransactionError('Cannot call commitTransaction after calling abortTransaction'));
  419. return;
  420. }
  421. }
  422. else {
  423. if (txnState === transactions_1.TxnState.STARTING_TRANSACTION) {
  424. // the transaction was never started, we can safely exit here
  425. session.transaction.transition(transactions_1.TxnState.TRANSACTION_ABORTED);
  426. callback();
  427. return;
  428. }
  429. if (txnState === transactions_1.TxnState.TRANSACTION_ABORTED) {
  430. callback(new error_1.MongoTransactionError('Cannot call abortTransaction twice'));
  431. return;
  432. }
  433. if (txnState === transactions_1.TxnState.TRANSACTION_COMMITTED ||
  434. txnState === transactions_1.TxnState.TRANSACTION_COMMITTED_EMPTY) {
  435. callback(new error_1.MongoTransactionError('Cannot call abortTransaction after calling commitTransaction'));
  436. return;
  437. }
  438. }
  439. // construct and send the command
  440. const command = { [commandName]: 1 };
  441. // apply a writeConcern if specified
  442. let writeConcern;
  443. if (session.transaction.options.writeConcern) {
  444. writeConcern = Object.assign({}, session.transaction.options.writeConcern);
  445. }
  446. else if (session.clientOptions && session.clientOptions.writeConcern) {
  447. writeConcern = { w: session.clientOptions.writeConcern.w };
  448. }
  449. if (txnState === transactions_1.TxnState.TRANSACTION_COMMITTED) {
  450. writeConcern = Object.assign({ wtimeout: 10000 }, writeConcern, { w: 'majority' });
  451. }
  452. if (writeConcern) {
  453. Object.assign(command, { writeConcern });
  454. }
  455. if (commandName === 'commitTransaction' && session.transaction.options.maxTimeMS) {
  456. Object.assign(command, { maxTimeMS: session.transaction.options.maxTimeMS });
  457. }
  458. function commandHandler(e, r) {
  459. if (commandName !== 'commitTransaction') {
  460. session.transaction.transition(transactions_1.TxnState.TRANSACTION_ABORTED);
  461. if (session.loadBalanced) {
  462. maybeClearPinnedConnection(session, { force: false });
  463. }
  464. // The spec indicates that we should ignore all errors on `abortTransaction`
  465. return callback();
  466. }
  467. session.transaction.transition(transactions_1.TxnState.TRANSACTION_COMMITTED);
  468. if (e) {
  469. if (e instanceof error_1.MongoNetworkError ||
  470. e instanceof error_1.MongoWriteConcernError ||
  471. (0, error_1.isRetryableError)(e) ||
  472. isMaxTimeMSExpiredError(e)) {
  473. if (isUnknownTransactionCommitResult(e)) {
  474. e.addErrorLabel('UnknownTransactionCommitResult');
  475. // per txns spec, must unpin session in this case
  476. session.unpin({ error: e });
  477. }
  478. }
  479. else if (e.hasErrorLabel('TransientTransactionError')) {
  480. session.unpin({ error: e });
  481. }
  482. }
  483. callback(e, r);
  484. }
  485. // Assumption here that commandName is "commitTransaction" or "abortTransaction"
  486. if (session.transaction.recoveryToken) {
  487. command.recoveryToken = session.transaction.recoveryToken;
  488. }
  489. // send the command
  490. (0, execute_operation_1.executeOperation)(session.topology, new run_command_1.RunAdminCommandOperation(undefined, command, {
  491. session,
  492. readPreference: read_preference_1.ReadPreference.primary,
  493. bypassPinningCheck: true
  494. }), (err, reply) => {
  495. if (command.abortTransaction) {
  496. // always unpin on abort regardless of command outcome
  497. session.unpin();
  498. }
  499. if (err && (0, error_1.isRetryableEndTransactionError)(err)) {
  500. // SPEC-1185: apply majority write concern when retrying commitTransaction
  501. if (command.commitTransaction) {
  502. // per txns spec, must unpin session in this case
  503. session.unpin({ force: true });
  504. command.writeConcern = Object.assign({ wtimeout: 10000 }, command.writeConcern, {
  505. w: 'majority'
  506. });
  507. }
  508. return (0, execute_operation_1.executeOperation)(session.topology, new run_command_1.RunAdminCommandOperation(undefined, command, {
  509. session,
  510. readPreference: read_preference_1.ReadPreference.primary,
  511. bypassPinningCheck: true
  512. }), (_err, _reply) => commandHandler(_err, _reply));
  513. }
  514. commandHandler(err, reply);
  515. });
  516. }
  517. /**
  518. * Reflects the existence of a session on the server. Can be reused by the session pool.
  519. * WARNING: not meant to be instantiated directly. For internal use only.
  520. * @public
  521. */
  522. class ServerSession {
  523. /** @internal */
  524. constructor() {
  525. this.id = { id: new bson_1.Binary((0, utils_1.uuidV4)(), bson_1.Binary.SUBTYPE_UUID) };
  526. this.lastUse = (0, utils_1.now)();
  527. this.txnNumber = 0;
  528. this.isDirty = false;
  529. }
  530. /**
  531. * Determines if the server session has timed out.
  532. *
  533. * @param sessionTimeoutMinutes - The server's "logicalSessionTimeoutMinutes"
  534. */
  535. hasTimedOut(sessionTimeoutMinutes) {
  536. // Take the difference of the lastUse timestamp and now, which will result in a value in
  537. // milliseconds, and then convert milliseconds to minutes to compare to `sessionTimeoutMinutes`
  538. const idleTimeMinutes = Math.round((((0, utils_1.calculateDurationInMs)(this.lastUse) % 86400000) % 3600000) / 60000);
  539. return idleTimeMinutes > sessionTimeoutMinutes - 1;
  540. }
  541. }
  542. exports.ServerSession = ServerSession;
  543. /**
  544. * Maintains a pool of Server Sessions.
  545. * For internal use only
  546. * @internal
  547. */
  548. class ServerSessionPool {
  549. constructor(topology) {
  550. if (topology == null) {
  551. throw new error_1.MongoRuntimeError('ServerSessionPool requires a topology');
  552. }
  553. this.topology = topology;
  554. this.sessions = [];
  555. }
  556. /** Ends all sessions in the session pool */
  557. endAllPooledSessions(callback) {
  558. if (this.sessions.length) {
  559. this.topology.endSessions(this.sessions.map((session) => session.id), () => {
  560. this.sessions = [];
  561. if (typeof callback === 'function') {
  562. callback();
  563. }
  564. });
  565. return;
  566. }
  567. if (typeof callback === 'function') {
  568. callback();
  569. }
  570. }
  571. /**
  572. * Acquire a Server Session from the pool.
  573. * Iterates through each session in the pool, removing any stale sessions
  574. * along the way. The first non-stale session found is removed from the
  575. * pool and returned. If no non-stale session is found, a new ServerSession is created.
  576. */
  577. acquire() {
  578. const sessionTimeoutMinutes = this.topology.logicalSessionTimeoutMinutes || 10;
  579. while (this.sessions.length) {
  580. const session = this.sessions.shift();
  581. if (session && (this.topology.loadBalanced || !session.hasTimedOut(sessionTimeoutMinutes))) {
  582. return session;
  583. }
  584. }
  585. return new ServerSession();
  586. }
  587. /**
  588. * Release a session to the session pool
  589. * Adds the session back to the session pool if the session has not timed out yet.
  590. * This method also removes any stale sessions from the pool.
  591. *
  592. * @param session - The session to release to the pool
  593. */
  594. release(session) {
  595. const sessionTimeoutMinutes = this.topology.logicalSessionTimeoutMinutes;
  596. if (this.topology.loadBalanced && !sessionTimeoutMinutes) {
  597. this.sessions.unshift(session);
  598. }
  599. if (!sessionTimeoutMinutes) {
  600. return;
  601. }
  602. while (this.sessions.length) {
  603. const pooledSession = this.sessions[this.sessions.length - 1];
  604. if (pooledSession.hasTimedOut(sessionTimeoutMinutes)) {
  605. this.sessions.pop();
  606. }
  607. else {
  608. break;
  609. }
  610. }
  611. if (!session.hasTimedOut(sessionTimeoutMinutes)) {
  612. if (session.isDirty) {
  613. return;
  614. }
  615. // otherwise, readd this session to the session pool
  616. this.sessions.unshift(session);
  617. }
  618. }
  619. }
  620. exports.ServerSessionPool = ServerSessionPool;
  621. /**
  622. * Optionally decorate a command with sessions specific keys
  623. *
  624. * @param session - the session tracking transaction state
  625. * @param command - the command to decorate
  626. * @param options - Optional settings passed to calling operation
  627. */
  628. function applySession(session, command, options) {
  629. var _b;
  630. // TODO: merge this with `assertAlive`, did not want to throw a try/catch here
  631. if (session.hasEnded) {
  632. return new error_1.MongoExpiredSessionError();
  633. }
  634. const serverSession = session.serverSession;
  635. if (serverSession == null) {
  636. return new error_1.MongoRuntimeError('Unable to acquire server session');
  637. }
  638. // SPEC-1019: silently ignore explicit session with unacknowledged write for backwards compatibility
  639. // FIXME: NODE-2781, this check for write concern shouldn't be happening here, but instead during command construction
  640. if (options && options.writeConcern && options.writeConcern.w === 0) {
  641. if (session && session.explicit) {
  642. return new error_1.MongoAPIError('Cannot have explicit session with unacknowledged writes');
  643. }
  644. return;
  645. }
  646. // mark the last use of this session, and apply the `lsid`
  647. serverSession.lastUse = (0, utils_1.now)();
  648. command.lsid = serverSession.id;
  649. // first apply non-transaction-specific sessions data
  650. const inTransaction = session.inTransaction() || (0, transactions_1.isTransactionCommand)(command);
  651. const isRetryableWrite = (options === null || options === void 0 ? void 0 : options.willRetryWrite) || false;
  652. if (serverSession.txnNumber && (isRetryableWrite || inTransaction)) {
  653. command.txnNumber = bson_1.Long.fromNumber(serverSession.txnNumber);
  654. }
  655. if (!inTransaction) {
  656. if (session.transaction.state !== transactions_1.TxnState.NO_TRANSACTION) {
  657. session.transaction.transition(transactions_1.TxnState.NO_TRANSACTION);
  658. }
  659. if (session.supports.causalConsistency &&
  660. session.operationTime &&
  661. (0, utils_1.commandSupportsReadConcern)(command, options)) {
  662. command.readConcern = command.readConcern || {};
  663. Object.assign(command.readConcern, { afterClusterTime: session.operationTime });
  664. }
  665. else if (session[kSnapshotEnabled]) {
  666. command.readConcern = command.readConcern || { level: read_concern_1.ReadConcernLevel.snapshot };
  667. if (session[kSnapshotTime] != null) {
  668. Object.assign(command.readConcern, { atClusterTime: session[kSnapshotTime] });
  669. }
  670. }
  671. return;
  672. }
  673. // now attempt to apply transaction-specific sessions data
  674. // `autocommit` must always be false to differentiate from retryable writes
  675. command.autocommit = false;
  676. if (session.transaction.state === transactions_1.TxnState.STARTING_TRANSACTION) {
  677. session.transaction.transition(transactions_1.TxnState.TRANSACTION_IN_PROGRESS);
  678. command.startTransaction = true;
  679. const readConcern = session.transaction.options.readConcern || ((_b = session === null || session === void 0 ? void 0 : session.clientOptions) === null || _b === void 0 ? void 0 : _b.readConcern);
  680. if (readConcern) {
  681. command.readConcern = readConcern;
  682. }
  683. if (session.supports.causalConsistency && session.operationTime) {
  684. command.readConcern = command.readConcern || {};
  685. Object.assign(command.readConcern, { afterClusterTime: session.operationTime });
  686. }
  687. }
  688. }
  689. exports.applySession = applySession;
  690. function updateSessionFromResponse(session, document) {
  691. var _b;
  692. if (document.$clusterTime) {
  693. (0, common_1._advanceClusterTime)(session, document.$clusterTime);
  694. }
  695. if (document.operationTime && session && session.supports.causalConsistency) {
  696. session.advanceOperationTime(document.operationTime);
  697. }
  698. if (document.recoveryToken && session && session.inTransaction()) {
  699. session.transaction._recoveryToken = document.recoveryToken;
  700. }
  701. if ((session === null || session === void 0 ? void 0 : session[kSnapshotEnabled]) && session[kSnapshotTime] == null) {
  702. // find and aggregate commands return atClusterTime on the cursor
  703. // distinct includes it in the response body
  704. const atClusterTime = ((_b = document.cursor) === null || _b === void 0 ? void 0 : _b.atClusterTime) || document.atClusterTime;
  705. if (atClusterTime) {
  706. session[kSnapshotTime] = atClusterTime;
  707. }
  708. }
  709. }
  710. exports.updateSessionFromResponse = updateSessionFromResponse;
  711. //# sourceMappingURL=sessions.js.map