sessions.js 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780
  1. 'use strict';
  2. const retrieveBSON = require('./connection/utils').retrieveBSON;
  3. const EventEmitter = require('events');
  4. const BSON = retrieveBSON();
  5. const Binary = BSON.Binary;
  6. const uuidV4 = require('./utils').uuidV4;
  7. const MongoError = require('./error').MongoError;
  8. const isRetryableError = require('././error').isRetryableError;
  9. const MongoNetworkError = require('./error').MongoNetworkError;
  10. const MongoWriteConcernError = require('./error').MongoWriteConcernError;
  11. const Transaction = require('./transactions').Transaction;
  12. const TxnState = require('./transactions').TxnState;
  13. const isPromiseLike = require('./utils').isPromiseLike;
  14. const ReadPreference = require('./topologies/read_preference');
  15. const maybePromise = require('../utils').maybePromise;
  16. const isTransactionCommand = require('./transactions').isTransactionCommand;
  17. const resolveClusterTime = require('./topologies/shared').resolveClusterTime;
  18. const isSharded = require('./wireprotocol/shared').isSharded;
  19. const maxWireVersion = require('./utils').maxWireVersion;
  20. const now = require('./../utils').now;
  21. const calculateDurationInMs = require('./../utils').calculateDurationInMs;
  22. const minWireVersionForShardedTransactions = 8;
  23. function assertAlive(session, callback) {
  24. if (session.serverSession == null) {
  25. const error = new MongoError('Cannot use a session that has ended');
  26. if (typeof callback === 'function') {
  27. callback(error, null);
  28. return false;
  29. }
  30. throw error;
  31. }
  32. return true;
  33. }
  34. /**
  35. * Options to pass when creating a Client Session
  36. * @typedef {Object} SessionOptions
  37. * @property {boolean} [causalConsistency=true] Whether causal consistency should be enabled on this session
  38. * @property {TransactionOptions} [defaultTransactionOptions] The default TransactionOptions to use for transactions started on this session.
  39. */
  40. /**
  41. * A BSON document reflecting the lsid of a {@link ClientSession}
  42. * @typedef {Object} SessionId
  43. */
  44. const kServerSession = Symbol('serverSession');
  45. /**
  46. * A class representing a client session on the server
  47. * WARNING: not meant to be instantiated directly.
  48. * @class
  49. * @hideconstructor
  50. */
  51. class ClientSession extends EventEmitter {
  52. /**
  53. * Create a client session.
  54. * WARNING: not meant to be instantiated directly
  55. *
  56. * @param {Topology} topology The current client's topology (Internal Class)
  57. * @param {ServerSessionPool} sessionPool The server session pool (Internal Class)
  58. * @param {SessionOptions} [options] Optional settings
  59. * @param {Object} [clientOptions] Optional settings provided when creating a client in the porcelain driver
  60. */
  61. constructor(topology, sessionPool, options, clientOptions) {
  62. super();
  63. if (topology == null) {
  64. throw new Error('ClientSession requires a topology');
  65. }
  66. if (sessionPool == null || !(sessionPool instanceof ServerSessionPool)) {
  67. throw new Error('ClientSession requires a ServerSessionPool');
  68. }
  69. options = options || {};
  70. clientOptions = clientOptions || {};
  71. this.topology = topology;
  72. this.sessionPool = sessionPool;
  73. this.hasEnded = false;
  74. this.clientOptions = clientOptions;
  75. this[kServerSession] = undefined;
  76. this.supports = {
  77. causalConsistency:
  78. typeof options.causalConsistency !== 'undefined' ? options.causalConsistency : true
  79. };
  80. this.clusterTime = options.initialClusterTime;
  81. this.operationTime = null;
  82. this.explicit = !!options.explicit;
  83. this.owner = options.owner;
  84. this.defaultTransactionOptions = Object.assign({}, options.defaultTransactionOptions);
  85. this.transaction = new Transaction();
  86. }
  87. /**
  88. * The server id associated with this session
  89. * @type {SessionId}
  90. */
  91. get id() {
  92. return this.serverSession.id;
  93. }
  94. get serverSession() {
  95. if (this[kServerSession] == null) {
  96. this[kServerSession] = this.sessionPool.acquire();
  97. }
  98. return this[kServerSession];
  99. }
  100. /**
  101. * Ends this session on the server
  102. *
  103. * @param {Object} [options] Optional settings. Currently reserved for future use
  104. * @param {Function} [callback] Optional callback for completion of this operation
  105. */
  106. endSession(options, callback) {
  107. if (typeof options === 'function') (callback = options), (options = {});
  108. options = options || {};
  109. const session = this;
  110. return maybePromise(this, callback, done => {
  111. if (session.hasEnded) {
  112. return done();
  113. }
  114. function completeEndSession() {
  115. // release the server session back to the pool
  116. session.sessionPool.release(session.serverSession);
  117. session[kServerSession] = undefined;
  118. // mark the session as ended, and emit a signal
  119. session.hasEnded = true;
  120. session.emit('ended', session);
  121. // spec indicates that we should ignore all errors for `endSessions`
  122. done();
  123. }
  124. if (session.serverSession && session.inTransaction()) {
  125. session.abortTransaction(err => {
  126. if (err) return done(err);
  127. completeEndSession();
  128. });
  129. return;
  130. }
  131. completeEndSession();
  132. });
  133. }
  134. /**
  135. * Advances the operationTime for a ClientSession.
  136. *
  137. * @param {Timestamp} operationTime the `BSON.Timestamp` of the operation type it is desired to advance to
  138. */
  139. advanceOperationTime(operationTime) {
  140. if (this.operationTime == null) {
  141. this.operationTime = operationTime;
  142. return;
  143. }
  144. if (operationTime.greaterThan(this.operationTime)) {
  145. this.operationTime = operationTime;
  146. }
  147. }
  148. /**
  149. * Used to determine if this session equals another
  150. * @param {ClientSession} session
  151. * @return {boolean} true if the sessions are equal
  152. */
  153. equals(session) {
  154. if (!(session instanceof ClientSession)) {
  155. return false;
  156. }
  157. return this.id.id.buffer.equals(session.id.id.buffer);
  158. }
  159. /**
  160. * Increment the transaction number on the internal ServerSession
  161. */
  162. incrementTransactionNumber() {
  163. this.serverSession.txnNumber++;
  164. }
  165. /**
  166. * @returns {boolean} whether this session is currently in a transaction or not
  167. */
  168. inTransaction() {
  169. return this.transaction.isActive;
  170. }
  171. /**
  172. * Starts a new transaction with the given options.
  173. *
  174. * @param {TransactionOptions} options Options for the transaction
  175. */
  176. startTransaction(options) {
  177. assertAlive(this);
  178. if (this.inTransaction()) {
  179. throw new MongoError('Transaction already in progress');
  180. }
  181. const topologyMaxWireVersion = maxWireVersion(this.topology);
  182. if (
  183. isSharded(this.topology) &&
  184. topologyMaxWireVersion != null &&
  185. topologyMaxWireVersion < minWireVersionForShardedTransactions
  186. ) {
  187. throw new MongoError('Transactions are not supported on sharded clusters in MongoDB < 4.2.');
  188. }
  189. // increment txnNumber
  190. this.incrementTransactionNumber();
  191. // create transaction state
  192. this.transaction = new Transaction(
  193. Object.assign({}, this.clientOptions, options || this.defaultTransactionOptions)
  194. );
  195. this.transaction.transition(TxnState.STARTING_TRANSACTION);
  196. }
  197. /**
  198. * Commits the currently active transaction in this session.
  199. *
  200. * @param {Function} [callback] optional callback for completion of this operation
  201. * @return {Promise} A promise is returned if no callback is provided
  202. */
  203. commitTransaction(callback) {
  204. return maybePromise(this, callback, done => endTransaction(this, 'commitTransaction', done));
  205. }
  206. /**
  207. * Aborts the currently active transaction in this session.
  208. *
  209. * @param {Function} [callback] optional callback for completion of this operation
  210. * @return {Promise} A promise is returned if no callback is provided
  211. */
  212. abortTransaction(callback) {
  213. return maybePromise(this, callback, done => endTransaction(this, 'abortTransaction', done));
  214. }
  215. /**
  216. * This is here to ensure that ClientSession is never serialized to BSON.
  217. * @ignore
  218. */
  219. toBSON() {
  220. throw new Error('ClientSession cannot be serialized to BSON.');
  221. }
  222. /**
  223. * A user provided function to be run within a transaction
  224. *
  225. * @callback WithTransactionCallback
  226. * @param {ClientSession} session The parent session of the transaction running the operation. This should be passed into each operation within the lambda.
  227. * @returns {Promise} The resulting Promise of operations run within this transaction
  228. */
  229. /**
  230. * Runs a provided lambda within a transaction, retrying either the commit operation
  231. * or entire transaction as needed (and when the error permits) to better ensure that
  232. * the transaction can complete successfully.
  233. *
  234. * IMPORTANT: This method requires the user to return a Promise, all lambdas that do not
  235. * return a Promise will result in undefined behavior.
  236. *
  237. * @param {WithTransactionCallback} fn
  238. * @param {TransactionOptions} [options] Optional settings for the transaction
  239. */
  240. withTransaction(fn, options) {
  241. const startTime = now();
  242. return attemptTransaction(this, startTime, fn, options);
  243. }
  244. }
  245. const MAX_WITH_TRANSACTION_TIMEOUT = 120000;
  246. const UNSATISFIABLE_WRITE_CONCERN_CODE = 100;
  247. const UNKNOWN_REPL_WRITE_CONCERN_CODE = 79;
  248. const MAX_TIME_MS_EXPIRED_CODE = 50;
  249. const NON_DETERMINISTIC_WRITE_CONCERN_ERRORS = new Set([
  250. 'CannotSatisfyWriteConcern',
  251. 'UnknownReplWriteConcern',
  252. 'UnsatisfiableWriteConcern'
  253. ]);
  254. function hasNotTimedOut(startTime, max) {
  255. return calculateDurationInMs(startTime) < max;
  256. }
  257. function isUnknownTransactionCommitResult(err) {
  258. return (
  259. isMaxTimeMSExpiredError(err) ||
  260. (!NON_DETERMINISTIC_WRITE_CONCERN_ERRORS.has(err.codeName) &&
  261. err.code !== UNSATISFIABLE_WRITE_CONCERN_CODE &&
  262. err.code !== UNKNOWN_REPL_WRITE_CONCERN_CODE)
  263. );
  264. }
  265. function isMaxTimeMSExpiredError(err) {
  266. if (err == null) return false;
  267. return (
  268. err.code === MAX_TIME_MS_EXPIRED_CODE ||
  269. (err.writeConcernError && err.writeConcernError.code === MAX_TIME_MS_EXPIRED_CODE)
  270. );
  271. }
  272. function attemptTransactionCommit(session, startTime, fn, options) {
  273. return session.commitTransaction().catch(err => {
  274. if (
  275. err instanceof MongoError &&
  276. hasNotTimedOut(startTime, MAX_WITH_TRANSACTION_TIMEOUT) &&
  277. !isMaxTimeMSExpiredError(err)
  278. ) {
  279. if (err.hasErrorLabel('UnknownTransactionCommitResult')) {
  280. return attemptTransactionCommit(session, startTime, fn, options);
  281. }
  282. if (err.hasErrorLabel('TransientTransactionError')) {
  283. return attemptTransaction(session, startTime, fn, options);
  284. }
  285. }
  286. throw err;
  287. });
  288. }
  289. const USER_EXPLICIT_TXN_END_STATES = new Set([
  290. TxnState.NO_TRANSACTION,
  291. TxnState.TRANSACTION_COMMITTED,
  292. TxnState.TRANSACTION_ABORTED
  293. ]);
  294. function userExplicitlyEndedTransaction(session) {
  295. return USER_EXPLICIT_TXN_END_STATES.has(session.transaction.state);
  296. }
  297. function attemptTransaction(session, startTime, fn, options) {
  298. session.startTransaction(options);
  299. let promise;
  300. try {
  301. promise = fn(session);
  302. } catch (err) {
  303. promise = Promise.reject(err);
  304. }
  305. if (!isPromiseLike(promise)) {
  306. session.abortTransaction();
  307. throw new TypeError('Function provided to `withTransaction` must return a Promise');
  308. }
  309. return promise
  310. .then(() => {
  311. if (userExplicitlyEndedTransaction(session)) {
  312. return;
  313. }
  314. return attemptTransactionCommit(session, startTime, fn, options);
  315. })
  316. .catch(err => {
  317. function maybeRetryOrThrow(err) {
  318. if (
  319. err instanceof MongoError &&
  320. err.hasErrorLabel('TransientTransactionError') &&
  321. hasNotTimedOut(startTime, MAX_WITH_TRANSACTION_TIMEOUT)
  322. ) {
  323. return attemptTransaction(session, startTime, fn, options);
  324. }
  325. if (isMaxTimeMSExpiredError(err)) {
  326. err.addErrorLabel('UnknownTransactionCommitResult');
  327. }
  328. throw err;
  329. }
  330. if (session.transaction.isActive) {
  331. return session.abortTransaction().then(() => maybeRetryOrThrow(err));
  332. }
  333. return maybeRetryOrThrow(err);
  334. });
  335. }
  336. function endTransaction(session, commandName, callback) {
  337. if (!assertAlive(session, callback)) {
  338. // checking result in case callback was called
  339. return;
  340. }
  341. // handle any initial problematic cases
  342. let txnState = session.transaction.state;
  343. if (txnState === TxnState.NO_TRANSACTION) {
  344. callback(new MongoError('No transaction started'));
  345. return;
  346. }
  347. if (commandName === 'commitTransaction') {
  348. if (
  349. txnState === TxnState.STARTING_TRANSACTION ||
  350. txnState === TxnState.TRANSACTION_COMMITTED_EMPTY
  351. ) {
  352. // the transaction was never started, we can safely exit here
  353. session.transaction.transition(TxnState.TRANSACTION_COMMITTED_EMPTY);
  354. callback(null, null);
  355. return;
  356. }
  357. if (txnState === TxnState.TRANSACTION_ABORTED) {
  358. callback(new MongoError('Cannot call commitTransaction after calling abortTransaction'));
  359. return;
  360. }
  361. } else {
  362. if (txnState === TxnState.STARTING_TRANSACTION) {
  363. // the transaction was never started, we can safely exit here
  364. session.transaction.transition(TxnState.TRANSACTION_ABORTED);
  365. callback(null, null);
  366. return;
  367. }
  368. if (txnState === TxnState.TRANSACTION_ABORTED) {
  369. callback(new MongoError('Cannot call abortTransaction twice'));
  370. return;
  371. }
  372. if (
  373. txnState === TxnState.TRANSACTION_COMMITTED ||
  374. txnState === TxnState.TRANSACTION_COMMITTED_EMPTY
  375. ) {
  376. callback(new MongoError('Cannot call abortTransaction after calling commitTransaction'));
  377. return;
  378. }
  379. }
  380. // construct and send the command
  381. const command = { [commandName]: 1 };
  382. // apply a writeConcern if specified
  383. let writeConcern;
  384. if (session.transaction.options.writeConcern) {
  385. writeConcern = Object.assign({}, session.transaction.options.writeConcern);
  386. } else if (session.clientOptions && session.clientOptions.w) {
  387. writeConcern = { w: session.clientOptions.w };
  388. }
  389. if (txnState === TxnState.TRANSACTION_COMMITTED) {
  390. writeConcern = Object.assign({ wtimeout: 10000 }, writeConcern, { w: 'majority' });
  391. }
  392. if (writeConcern) {
  393. Object.assign(command, { writeConcern });
  394. }
  395. if (commandName === 'commitTransaction' && session.transaction.options.maxTimeMS) {
  396. Object.assign(command, { maxTimeMS: session.transaction.options.maxTimeMS });
  397. }
  398. function commandHandler(e, r) {
  399. if (commandName === 'commitTransaction') {
  400. session.transaction.transition(TxnState.TRANSACTION_COMMITTED);
  401. if (
  402. e &&
  403. (e instanceof MongoNetworkError ||
  404. e instanceof MongoWriteConcernError ||
  405. isRetryableError(e) ||
  406. isMaxTimeMSExpiredError(e))
  407. ) {
  408. if (isUnknownTransactionCommitResult(e)) {
  409. e.addErrorLabel('UnknownTransactionCommitResult');
  410. // per txns spec, must unpin session in this case
  411. session.transaction.unpinServer();
  412. }
  413. }
  414. } else {
  415. session.transaction.transition(TxnState.TRANSACTION_ABORTED);
  416. }
  417. callback(e, r);
  418. }
  419. // The spec indicates that we should ignore all errors on `abortTransaction`
  420. function transactionError(err) {
  421. return commandName === 'commitTransaction' ? err : null;
  422. }
  423. if (
  424. // Assumption here that commandName is "commitTransaction" or "abortTransaction"
  425. session.transaction.recoveryToken &&
  426. supportsRecoveryToken(session)
  427. ) {
  428. command.recoveryToken = session.transaction.recoveryToken;
  429. }
  430. // send the command
  431. session.topology.command('admin.$cmd', command, { session }, (err, reply) => {
  432. if (err && isRetryableError(err)) {
  433. // SPEC-1185: apply majority write concern when retrying commitTransaction
  434. if (command.commitTransaction) {
  435. // per txns spec, must unpin session in this case
  436. session.transaction.unpinServer();
  437. command.writeConcern = Object.assign({ wtimeout: 10000 }, command.writeConcern, {
  438. w: 'majority'
  439. });
  440. }
  441. return session.topology.command('admin.$cmd', command, { session }, (_err, _reply) =>
  442. commandHandler(transactionError(_err), _reply)
  443. );
  444. }
  445. commandHandler(transactionError(err), reply);
  446. });
  447. }
  448. function supportsRecoveryToken(session) {
  449. const topology = session.topology;
  450. return !!topology.s.options.useRecoveryToken;
  451. }
  452. /**
  453. * Reflects the existence of a session on the server. Can be reused by the session pool.
  454. * WARNING: not meant to be instantiated directly. For internal use only.
  455. * @ignore
  456. */
  457. class ServerSession {
  458. constructor() {
  459. this.id = { id: new Binary(uuidV4(), Binary.SUBTYPE_UUID) };
  460. this.lastUse = now();
  461. this.txnNumber = 0;
  462. this.isDirty = false;
  463. }
  464. /**
  465. * Determines if the server session has timed out.
  466. * @ignore
  467. * @param {Date} sessionTimeoutMinutes The server's "logicalSessionTimeoutMinutes"
  468. * @return {boolean} true if the session has timed out.
  469. */
  470. hasTimedOut(sessionTimeoutMinutes) {
  471. // Take the difference of the lastUse timestamp and now, which will result in a value in
  472. // milliseconds, and then convert milliseconds to minutes to compare to `sessionTimeoutMinutes`
  473. const idleTimeMinutes = Math.round(
  474. ((calculateDurationInMs(this.lastUse) % 86400000) % 3600000) / 60000
  475. );
  476. return idleTimeMinutes > sessionTimeoutMinutes - 1;
  477. }
  478. }
  479. /**
  480. * Maintains a pool of Server Sessions.
  481. * For internal use only
  482. * @ignore
  483. */
  484. class ServerSessionPool {
  485. constructor(topology) {
  486. if (topology == null) {
  487. throw new Error('ServerSessionPool requires a topology');
  488. }
  489. this.topology = topology;
  490. this.sessions = [];
  491. }
  492. /**
  493. * Ends all sessions in the session pool.
  494. * @ignore
  495. */
  496. endAllPooledSessions(callback) {
  497. if (this.sessions.length) {
  498. this.topology.endSessions(
  499. this.sessions.map(session => session.id),
  500. () => {
  501. this.sessions = [];
  502. if (typeof callback === 'function') {
  503. callback();
  504. }
  505. }
  506. );
  507. return;
  508. }
  509. if (typeof callback === 'function') {
  510. callback();
  511. }
  512. }
  513. /**
  514. * Acquire a Server Session from the pool.
  515. * Iterates through each session in the pool, removing any stale sessions
  516. * along the way. The first non-stale session found is removed from the
  517. * pool and returned. If no non-stale session is found, a new ServerSession
  518. * is created.
  519. * @ignore
  520. * @returns {ServerSession}
  521. */
  522. acquire() {
  523. const sessionTimeoutMinutes = this.topology.logicalSessionTimeoutMinutes;
  524. while (this.sessions.length) {
  525. const session = this.sessions.shift();
  526. if (!session.hasTimedOut(sessionTimeoutMinutes)) {
  527. return session;
  528. }
  529. }
  530. return new ServerSession();
  531. }
  532. /**
  533. * Release a session to the session pool
  534. * Adds the session back to the session pool if the session has not timed out yet.
  535. * This method also removes any stale sessions from the pool.
  536. * @ignore
  537. * @param {ServerSession} session The session to release to the pool
  538. */
  539. release(session) {
  540. const sessionTimeoutMinutes = this.topology.logicalSessionTimeoutMinutes;
  541. while (this.sessions.length) {
  542. const pooledSession = this.sessions[this.sessions.length - 1];
  543. if (pooledSession.hasTimedOut(sessionTimeoutMinutes)) {
  544. this.sessions.pop();
  545. } else {
  546. break;
  547. }
  548. }
  549. if (!session.hasTimedOut(sessionTimeoutMinutes)) {
  550. if (session.isDirty) {
  551. return;
  552. }
  553. // otherwise, readd this session to the session pool
  554. this.sessions.unshift(session);
  555. }
  556. }
  557. }
  558. // TODO: this should be codified in command construction
  559. // @see https://github.com/mongodb/specifications/blob/master/source/read-write-concern/read-write-concern.rst#read-concern
  560. function commandSupportsReadConcern(command, options) {
  561. if (
  562. command.aggregate ||
  563. command.count ||
  564. command.distinct ||
  565. command.find ||
  566. command.parallelCollectionScan ||
  567. command.geoNear ||
  568. command.geoSearch
  569. ) {
  570. return true;
  571. }
  572. if (
  573. command.mapReduce &&
  574. options &&
  575. options.out &&
  576. (options.out.inline === 1 || options.out === 'inline')
  577. ) {
  578. return true;
  579. }
  580. return false;
  581. }
  582. /**
  583. * Optionally decorate a command with sessions specific keys
  584. *
  585. * @ignore
  586. * @param {ClientSession} session the session tracking transaction state
  587. * @param {Object} command the command to decorate
  588. * @param {Object} topology the topology for tracking the cluster time
  589. * @param {Object} [options] Optional settings passed to calling operation
  590. * @return {MongoError|null} An error, if some error condition was met
  591. */
  592. function applySession(session, command, options) {
  593. if (session.hasEnded) {
  594. // TODO: merge this with `assertAlive`, did not want to throw a try/catch here
  595. return new MongoError('Cannot use a session that has ended');
  596. }
  597. // SPEC-1019: silently ignore explicit session with unacknowledged write for backwards compatibility
  598. if (options && options.writeConcern && options.writeConcern.w === 0) {
  599. return;
  600. }
  601. const serverSession = session.serverSession;
  602. serverSession.lastUse = now();
  603. command.lsid = serverSession.id;
  604. // first apply non-transaction-specific sessions data
  605. const inTransaction = session.inTransaction() || isTransactionCommand(command);
  606. const isRetryableWrite = options.willRetryWrite;
  607. const shouldApplyReadConcern = commandSupportsReadConcern(command, options);
  608. if (serverSession.txnNumber && (isRetryableWrite || inTransaction)) {
  609. command.txnNumber = BSON.Long.fromNumber(serverSession.txnNumber);
  610. }
  611. // now attempt to apply transaction-specific sessions data
  612. if (!inTransaction) {
  613. if (session.transaction.state !== TxnState.NO_TRANSACTION) {
  614. session.transaction.transition(TxnState.NO_TRANSACTION);
  615. }
  616. // TODO: the following should only be applied to read operation per spec.
  617. // for causal consistency
  618. if (session.supports.causalConsistency && session.operationTime && shouldApplyReadConcern) {
  619. command.readConcern = command.readConcern || {};
  620. Object.assign(command.readConcern, { afterClusterTime: session.operationTime });
  621. }
  622. return;
  623. }
  624. if (options.readPreference && !options.readPreference.equals(ReadPreference.primary)) {
  625. return new MongoError(
  626. `Read preference in a transaction must be primary, not: ${options.readPreference.mode}`
  627. );
  628. }
  629. // `autocommit` must always be false to differentiate from retryable writes
  630. command.autocommit = false;
  631. if (session.transaction.state === TxnState.STARTING_TRANSACTION) {
  632. session.transaction.transition(TxnState.TRANSACTION_IN_PROGRESS);
  633. command.startTransaction = true;
  634. const readConcern =
  635. session.transaction.options.readConcern || session.clientOptions.readConcern;
  636. if (readConcern) {
  637. command.readConcern = readConcern;
  638. }
  639. if (session.supports.causalConsistency && session.operationTime) {
  640. command.readConcern = command.readConcern || {};
  641. Object.assign(command.readConcern, { afterClusterTime: session.operationTime });
  642. }
  643. }
  644. }
  645. function updateSessionFromResponse(session, document) {
  646. if (document.$clusterTime) {
  647. resolveClusterTime(session, document.$clusterTime);
  648. }
  649. if (document.operationTime && session && session.supports.causalConsistency) {
  650. session.advanceOperationTime(document.operationTime);
  651. }
  652. if (document.recoveryToken && session && session.inTransaction()) {
  653. session.transaction._recoveryToken = document.recoveryToken;
  654. }
  655. }
  656. module.exports = {
  657. ClientSession,
  658. ServerSession,
  659. ServerSessionPool,
  660. TxnState,
  661. applySession,
  662. updateSessionFromResponse,
  663. commandSupportsReadConcern
  664. };