'use strict'; const retrieveBSON = require('./connection/utils').retrieveBSON; const EventEmitter = require('events'); const BSON = retrieveBSON(); const Binary = BSON.Binary; const uuidV4 = require('./utils').uuidV4; const MongoError = require('./error').MongoError; const isRetryableError = require('././error').isRetryableError; const MongoNetworkError = require('./error').MongoNetworkError; const MongoWriteConcernError = require('./error').MongoWriteConcernError; const Transaction = require('./transactions').Transaction; const TxnState = require('./transactions').TxnState; const isPromiseLike = require('./utils').isPromiseLike; const ReadPreference = require('./topologies/read_preference'); const maybePromise = require('../utils').maybePromise; const isTransactionCommand = require('./transactions').isTransactionCommand; const resolveClusterTime = require('./topologies/shared').resolveClusterTime; const isSharded = require('./wireprotocol/shared').isSharded; const maxWireVersion = require('./utils').maxWireVersion; const now = require('./../utils').now; const calculateDurationInMs = require('./../utils').calculateDurationInMs; const minWireVersionForShardedTransactions = 8; function assertAlive(session, callback) { if (session.serverSession == null) { const error = new MongoError('Cannot use a session that has ended'); if (typeof callback === 'function') { callback(error, null); return false; } throw error; } return true; } /** * Options to pass when creating a Client Session * @typedef {Object} SessionOptions * @property {boolean} [causalConsistency=true] Whether causal consistency should be enabled on this session * @property {TransactionOptions} [defaultTransactionOptions] The default TransactionOptions to use for transactions started on this session. */ /** * A BSON document reflecting the lsid of a {@link ClientSession} * @typedef {Object} SessionId */ const kServerSession = Symbol('serverSession'); /** * A class representing a client session on the server * WARNING: not meant to be instantiated directly. * @class * @hideconstructor */ class ClientSession extends EventEmitter { /** * Create a client session. * WARNING: not meant to be instantiated directly * * @param {Topology} topology The current client's topology (Internal Class) * @param {ServerSessionPool} sessionPool The server session pool (Internal Class) * @param {SessionOptions} [options] Optional settings * @param {Object} [clientOptions] Optional settings provided when creating a client in the porcelain driver */ constructor(topology, sessionPool, options, clientOptions) { super(); if (topology == null) { throw new Error('ClientSession requires a topology'); } if (sessionPool == null || !(sessionPool instanceof ServerSessionPool)) { throw new Error('ClientSession requires a ServerSessionPool'); } options = options || {}; clientOptions = clientOptions || {}; this.topology = topology; this.sessionPool = sessionPool; this.hasEnded = false; this.clientOptions = clientOptions; this[kServerSession] = undefined; this.supports = { causalConsistency: typeof options.causalConsistency !== 'undefined' ? options.causalConsistency : true }; this.clusterTime = options.initialClusterTime; this.operationTime = null; this.explicit = !!options.explicit; this.owner = options.owner; this.defaultTransactionOptions = Object.assign({}, options.defaultTransactionOptions); this.transaction = new Transaction(); } /** * The server id associated with this session * @type {SessionId} */ get id() { return this.serverSession.id; } get serverSession() { if (this[kServerSession] == null) { this[kServerSession] = this.sessionPool.acquire(); } return this[kServerSession]; } /** * Ends this session on the server * * @param {Object} [options] Optional settings. Currently reserved for future use * @param {Function} [callback] Optional callback for completion of this operation */ endSession(options, callback) { if (typeof options === 'function') (callback = options), (options = {}); options = options || {}; const session = this; return maybePromise(this, callback, done => { if (session.hasEnded) { return done(); } function completeEndSession() { // release the server session back to the pool session.sessionPool.release(session.serverSession); session[kServerSession] = undefined; // mark the session as ended, and emit a signal session.hasEnded = true; session.emit('ended', session); // spec indicates that we should ignore all errors for `endSessions` done(); } if (session.serverSession && session.inTransaction()) { session.abortTransaction(err => { if (err) return done(err); completeEndSession(); }); return; } completeEndSession(); }); } /** * Advances the operationTime for a ClientSession. * * @param {Timestamp} operationTime the `BSON.Timestamp` of the operation type it is desired to advance to */ advanceOperationTime(operationTime) { if (this.operationTime == null) { this.operationTime = operationTime; return; } if (operationTime.greaterThan(this.operationTime)) { this.operationTime = operationTime; } } /** * Used to determine if this session equals another * @param {ClientSession} session * @return {boolean} true if the sessions are equal */ equals(session) { if (!(session instanceof ClientSession)) { return false; } return this.id.id.buffer.equals(session.id.id.buffer); } /** * Increment the transaction number on the internal ServerSession */ incrementTransactionNumber() { this.serverSession.txnNumber++; } /** * @returns {boolean} whether this session is currently in a transaction or not */ inTransaction() { return this.transaction.isActive; } /** * Starts a new transaction with the given options. * * @param {TransactionOptions} options Options for the transaction */ startTransaction(options) { assertAlive(this); if (this.inTransaction()) { throw new MongoError('Transaction already in progress'); } const topologyMaxWireVersion = maxWireVersion(this.topology); if ( isSharded(this.topology) && topologyMaxWireVersion != null && topologyMaxWireVersion < minWireVersionForShardedTransactions ) { throw new MongoError('Transactions are not supported on sharded clusters in MongoDB < 4.2.'); } // increment txnNumber this.incrementTransactionNumber(); // create transaction state this.transaction = new Transaction( Object.assign({}, this.clientOptions, options || this.defaultTransactionOptions) ); this.transaction.transition(TxnState.STARTING_TRANSACTION); } /** * Commits the currently active transaction in this session. * * @param {Function} [callback] optional callback for completion of this operation * @return {Promise} A promise is returned if no callback is provided */ commitTransaction(callback) { return maybePromise(this, callback, done => endTransaction(this, 'commitTransaction', done)); } /** * Aborts the currently active transaction in this session. * * @param {Function} [callback] optional callback for completion of this operation * @return {Promise} A promise is returned if no callback is provided */ abortTransaction(callback) { return maybePromise(this, callback, done => endTransaction(this, 'abortTransaction', done)); } /** * This is here to ensure that ClientSession is never serialized to BSON. * @ignore */ toBSON() { throw new Error('ClientSession cannot be serialized to BSON.'); } /** * A user provided function to be run within a transaction * * @callback WithTransactionCallback * @param {ClientSession} session The parent session of the transaction running the operation. This should be passed into each operation within the lambda. * @returns {Promise} The resulting Promise of operations run within this transaction */ /** * Runs a provided lambda within a transaction, retrying either the commit operation * or entire transaction as needed (and when the error permits) to better ensure that * the transaction can complete successfully. * * IMPORTANT: This method requires the user to return a Promise, all lambdas that do not * return a Promise will result in undefined behavior. * * @param {WithTransactionCallback} fn * @param {TransactionOptions} [options] Optional settings for the transaction */ withTransaction(fn, options) { const startTime = now(); return attemptTransaction(this, startTime, fn, options); } } const MAX_WITH_TRANSACTION_TIMEOUT = 120000; const UNSATISFIABLE_WRITE_CONCERN_CODE = 100; const UNKNOWN_REPL_WRITE_CONCERN_CODE = 79; const MAX_TIME_MS_EXPIRED_CODE = 50; const NON_DETERMINISTIC_WRITE_CONCERN_ERRORS = new Set([ 'CannotSatisfyWriteConcern', 'UnknownReplWriteConcern', 'UnsatisfiableWriteConcern' ]); function hasNotTimedOut(startTime, max) { return calculateDurationInMs(startTime) < max; } function isUnknownTransactionCommitResult(err) { return ( isMaxTimeMSExpiredError(err) || (!NON_DETERMINISTIC_WRITE_CONCERN_ERRORS.has(err.codeName) && err.code !== UNSATISFIABLE_WRITE_CONCERN_CODE && err.code !== UNKNOWN_REPL_WRITE_CONCERN_CODE) ); } function isMaxTimeMSExpiredError(err) { if (err == null) return false; return ( err.code === MAX_TIME_MS_EXPIRED_CODE || (err.writeConcernError && err.writeConcernError.code === MAX_TIME_MS_EXPIRED_CODE) ); } function attemptTransactionCommit(session, startTime, fn, options) { return session.commitTransaction().catch(err => { if ( err instanceof MongoError && hasNotTimedOut(startTime, MAX_WITH_TRANSACTION_TIMEOUT) && !isMaxTimeMSExpiredError(err) ) { if (err.hasErrorLabel('UnknownTransactionCommitResult')) { return attemptTransactionCommit(session, startTime, fn, options); } if (err.hasErrorLabel('TransientTransactionError')) { return attemptTransaction(session, startTime, fn, options); } } throw err; }); } const USER_EXPLICIT_TXN_END_STATES = new Set([ TxnState.NO_TRANSACTION, TxnState.TRANSACTION_COMMITTED, TxnState.TRANSACTION_ABORTED ]); function userExplicitlyEndedTransaction(session) { return USER_EXPLICIT_TXN_END_STATES.has(session.transaction.state); } function attemptTransaction(session, startTime, fn, options) { session.startTransaction(options); let promise; try { promise = fn(session); } catch (err) { promise = Promise.reject(err); } if (!isPromiseLike(promise)) { session.abortTransaction(); throw new TypeError('Function provided to `withTransaction` must return a Promise'); } return promise .then(() => { if (userExplicitlyEndedTransaction(session)) { return; } return attemptTransactionCommit(session, startTime, fn, options); }) .catch(err => { function maybeRetryOrThrow(err) { if ( err instanceof MongoError && err.hasErrorLabel('TransientTransactionError') && hasNotTimedOut(startTime, MAX_WITH_TRANSACTION_TIMEOUT) ) { return attemptTransaction(session, startTime, fn, options); } if (isMaxTimeMSExpiredError(err)) { err.addErrorLabel('UnknownTransactionCommitResult'); } throw err; } if (session.transaction.isActive) { return session.abortTransaction().then(() => maybeRetryOrThrow(err)); } return maybeRetryOrThrow(err); }); } function endTransaction(session, commandName, callback) { if (!assertAlive(session, callback)) { // checking result in case callback was called return; } // handle any initial problematic cases let txnState = session.transaction.state; if (txnState === TxnState.NO_TRANSACTION) { callback(new MongoError('No transaction started')); return; } if (commandName === 'commitTransaction') { if ( txnState === TxnState.STARTING_TRANSACTION || txnState === TxnState.TRANSACTION_COMMITTED_EMPTY ) { // the transaction was never started, we can safely exit here session.transaction.transition(TxnState.TRANSACTION_COMMITTED_EMPTY); callback(null, null); return; } if (txnState === TxnState.TRANSACTION_ABORTED) { callback(new MongoError('Cannot call commitTransaction after calling abortTransaction')); return; } } else { if (txnState === TxnState.STARTING_TRANSACTION) { // the transaction was never started, we can safely exit here session.transaction.transition(TxnState.TRANSACTION_ABORTED); callback(null, null); return; } if (txnState === TxnState.TRANSACTION_ABORTED) { callback(new MongoError('Cannot call abortTransaction twice')); return; } if ( txnState === TxnState.TRANSACTION_COMMITTED || txnState === TxnState.TRANSACTION_COMMITTED_EMPTY ) { callback(new MongoError('Cannot call abortTransaction after calling commitTransaction')); return; } } // construct and send the command const command = { [commandName]: 1 }; // apply a writeConcern if specified let writeConcern; if (session.transaction.options.writeConcern) { writeConcern = Object.assign({}, session.transaction.options.writeConcern); } else if (session.clientOptions && session.clientOptions.w) { writeConcern = { w: session.clientOptions.w }; } if (txnState === TxnState.TRANSACTION_COMMITTED) { writeConcern = Object.assign({ wtimeout: 10000 }, writeConcern, { w: 'majority' }); } if (writeConcern) { Object.assign(command, { writeConcern }); } if (commandName === 'commitTransaction' && session.transaction.options.maxTimeMS) { Object.assign(command, { maxTimeMS: session.transaction.options.maxTimeMS }); } function commandHandler(e, r) { if (commandName === 'commitTransaction') { session.transaction.transition(TxnState.TRANSACTION_COMMITTED); if ( e && (e instanceof MongoNetworkError || e instanceof MongoWriteConcernError || isRetryableError(e) || isMaxTimeMSExpiredError(e)) ) { if (isUnknownTransactionCommitResult(e)) { e.addErrorLabel('UnknownTransactionCommitResult'); // per txns spec, must unpin session in this case session.transaction.unpinServer(); } } } else { session.transaction.transition(TxnState.TRANSACTION_ABORTED); } callback(e, r); } // The spec indicates that we should ignore all errors on `abortTransaction` function transactionError(err) { return commandName === 'commitTransaction' ? err : null; } if ( // Assumption here that commandName is "commitTransaction" or "abortTransaction" session.transaction.recoveryToken && supportsRecoveryToken(session) ) { command.recoveryToken = session.transaction.recoveryToken; } // send the command session.topology.command('admin.$cmd', command, { session }, (err, reply) => { if (err && isRetryableError(err)) { // SPEC-1185: apply majority write concern when retrying commitTransaction if (command.commitTransaction) { // per txns spec, must unpin session in this case session.transaction.unpinServer(); command.writeConcern = Object.assign({ wtimeout: 10000 }, command.writeConcern, { w: 'majority' }); } return session.topology.command('admin.$cmd', command, { session }, (_err, _reply) => commandHandler(transactionError(_err), _reply) ); } commandHandler(transactionError(err), reply); }); } function supportsRecoveryToken(session) { const topology = session.topology; return !!topology.s.options.useRecoveryToken; } /** * Reflects the existence of a session on the server. Can be reused by the session pool. * WARNING: not meant to be instantiated directly. For internal use only. * @ignore */ class ServerSession { constructor() { this.id = { id: new Binary(uuidV4(), Binary.SUBTYPE_UUID) }; this.lastUse = now(); this.txnNumber = 0; this.isDirty = false; } /** * Determines if the server session has timed out. * @ignore * @param {Date} sessionTimeoutMinutes The server's "logicalSessionTimeoutMinutes" * @return {boolean} true if the session has timed out. */ hasTimedOut(sessionTimeoutMinutes) { // Take the difference of the lastUse timestamp and now, which will result in a value in // milliseconds, and then convert milliseconds to minutes to compare to `sessionTimeoutMinutes` const idleTimeMinutes = Math.round( ((calculateDurationInMs(this.lastUse) % 86400000) % 3600000) / 60000 ); return idleTimeMinutes > sessionTimeoutMinutes - 1; } } /** * Maintains a pool of Server Sessions. * For internal use only * @ignore */ class ServerSessionPool { constructor(topology) { if (topology == null) { throw new Error('ServerSessionPool requires a topology'); } this.topology = topology; this.sessions = []; } /** * Ends all sessions in the session pool. * @ignore */ endAllPooledSessions(callback) { if (this.sessions.length) { this.topology.endSessions( this.sessions.map(session => session.id), () => { this.sessions = []; if (typeof callback === 'function') { callback(); } } ); return; } if (typeof callback === 'function') { callback(); } } /** * Acquire a Server Session from the pool. * Iterates through each session in the pool, removing any stale sessions * along the way. The first non-stale session found is removed from the * pool and returned. If no non-stale session is found, a new ServerSession * is created. * @ignore * @returns {ServerSession} */ acquire() { const sessionTimeoutMinutes = this.topology.logicalSessionTimeoutMinutes; while (this.sessions.length) { const session = this.sessions.shift(); if (!session.hasTimedOut(sessionTimeoutMinutes)) { return session; } } return new ServerSession(); } /** * Release a session to the session pool * Adds the session back to the session pool if the session has not timed out yet. * This method also removes any stale sessions from the pool. * @ignore * @param {ServerSession} session The session to release to the pool */ release(session) { const sessionTimeoutMinutes = this.topology.logicalSessionTimeoutMinutes; while (this.sessions.length) { const pooledSession = this.sessions[this.sessions.length - 1]; if (pooledSession.hasTimedOut(sessionTimeoutMinutes)) { this.sessions.pop(); } else { break; } } if (!session.hasTimedOut(sessionTimeoutMinutes)) { if (session.isDirty) { return; } // otherwise, readd this session to the session pool this.sessions.unshift(session); } } } // TODO: this should be codified in command construction // @see https://github.com/mongodb/specifications/blob/master/source/read-write-concern/read-write-concern.rst#read-concern function commandSupportsReadConcern(command, options) { if ( command.aggregate || command.count || command.distinct || command.find || command.parallelCollectionScan || command.geoNear || command.geoSearch ) { return true; } if ( command.mapReduce && options && options.out && (options.out.inline === 1 || options.out === 'inline') ) { return true; } return false; } /** * Optionally decorate a command with sessions specific keys * * @ignore * @param {ClientSession} session the session tracking transaction state * @param {Object} command the command to decorate * @param {Object} topology the topology for tracking the cluster time * @param {Object} [options] Optional settings passed to calling operation * @return {MongoError|null} An error, if some error condition was met */ function applySession(session, command, options) { if (session.hasEnded) { // TODO: merge this with `assertAlive`, did not want to throw a try/catch here return new MongoError('Cannot use a session that has ended'); } // SPEC-1019: silently ignore explicit session with unacknowledged write for backwards compatibility if (options && options.writeConcern && options.writeConcern.w === 0) { return; } const serverSession = session.serverSession; serverSession.lastUse = now(); command.lsid = serverSession.id; // first apply non-transaction-specific sessions data const inTransaction = session.inTransaction() || isTransactionCommand(command); const isRetryableWrite = options.willRetryWrite; const shouldApplyReadConcern = commandSupportsReadConcern(command, options); if (serverSession.txnNumber && (isRetryableWrite || inTransaction)) { command.txnNumber = BSON.Long.fromNumber(serverSession.txnNumber); } // now attempt to apply transaction-specific sessions data if (!inTransaction) { if (session.transaction.state !== TxnState.NO_TRANSACTION) { session.transaction.transition(TxnState.NO_TRANSACTION); } // TODO: the following should only be applied to read operation per spec. // for causal consistency if (session.supports.causalConsistency && session.operationTime && shouldApplyReadConcern) { command.readConcern = command.readConcern || {}; Object.assign(command.readConcern, { afterClusterTime: session.operationTime }); } return; } if (options.readPreference && !options.readPreference.equals(ReadPreference.primary)) { return new MongoError( `Read preference in a transaction must be primary, not: ${options.readPreference.mode}` ); } // `autocommit` must always be false to differentiate from retryable writes command.autocommit = false; if (session.transaction.state === TxnState.STARTING_TRANSACTION) { session.transaction.transition(TxnState.TRANSACTION_IN_PROGRESS); command.startTransaction = true; const readConcern = session.transaction.options.readConcern || session.clientOptions.readConcern; if (readConcern) { command.readConcern = readConcern; } if (session.supports.causalConsistency && session.operationTime) { command.readConcern = command.readConcern || {}; Object.assign(command.readConcern, { afterClusterTime: session.operationTime }); } } } function updateSessionFromResponse(session, document) { if (document.$clusterTime) { resolveClusterTime(session, document.$clusterTime); } if (document.operationTime && session && session.supports.causalConsistency) { session.advanceOperationTime(document.operationTime); } if (document.recoveryToken && session && session.inTransaction()) { session.transaction._recoveryToken = document.recoveryToken; } } module.exports = { ClientSession, ServerSession, ServerSessionPool, TxnState, applySession, updateSessionFromResponse, commandSupportsReadConcern };