123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780 |
- 'use strict';
- const retrieveBSON = require('./connection/utils').retrieveBSON;
- const EventEmitter = require('events');
- const BSON = retrieveBSON();
- const Binary = BSON.Binary;
- const uuidV4 = require('./utils').uuidV4;
- const MongoError = require('./error').MongoError;
- const isRetryableError = require('././error').isRetryableError;
- const MongoNetworkError = require('./error').MongoNetworkError;
- const MongoWriteConcernError = require('./error').MongoWriteConcernError;
- const Transaction = require('./transactions').Transaction;
- const TxnState = require('./transactions').TxnState;
- const isPromiseLike = require('./utils').isPromiseLike;
- const ReadPreference = require('./topologies/read_preference');
- const maybePromise = require('../utils').maybePromise;
- const isTransactionCommand = require('./transactions').isTransactionCommand;
- const resolveClusterTime = require('./topologies/shared').resolveClusterTime;
- const isSharded = require('./wireprotocol/shared').isSharded;
- const maxWireVersion = require('./utils').maxWireVersion;
- const now = require('./../utils').now;
- const calculateDurationInMs = require('./../utils').calculateDurationInMs;
- const minWireVersionForShardedTransactions = 8;
- function assertAlive(session, callback) {
- if (session.serverSession == null) {
- const error = new MongoError('Cannot use a session that has ended');
- if (typeof callback === 'function') {
- callback(error, null);
- return false;
- }
- throw error;
- }
- return true;
- }
- /**
- * Options to pass when creating a Client Session
- * @typedef {Object} SessionOptions
- * @property {boolean} [causalConsistency=true] Whether causal consistency should be enabled on this session
- * @property {TransactionOptions} [defaultTransactionOptions] The default TransactionOptions to use for transactions started on this session.
- */
- /**
- * A BSON document reflecting the lsid of a {@link ClientSession}
- * @typedef {Object} SessionId
- */
- const kServerSession = Symbol('serverSession');
- /**
- * A class representing a client session on the server
- * WARNING: not meant to be instantiated directly.
- * @class
- * @hideconstructor
- */
- class ClientSession extends EventEmitter {
- /**
- * Create a client session.
- * WARNING: not meant to be instantiated directly
- *
- * @param {Topology} topology The current client's topology (Internal Class)
- * @param {ServerSessionPool} sessionPool The server session pool (Internal Class)
- * @param {SessionOptions} [options] Optional settings
- * @param {Object} [clientOptions] Optional settings provided when creating a client in the porcelain driver
- */
- constructor(topology, sessionPool, options, clientOptions) {
- super();
- if (topology == null) {
- throw new Error('ClientSession requires a topology');
- }
- if (sessionPool == null || !(sessionPool instanceof ServerSessionPool)) {
- throw new Error('ClientSession requires a ServerSessionPool');
- }
- options = options || {};
- clientOptions = clientOptions || {};
- this.topology = topology;
- this.sessionPool = sessionPool;
- this.hasEnded = false;
- this.clientOptions = clientOptions;
- this[kServerSession] = undefined;
- this.supports = {
- causalConsistency:
- typeof options.causalConsistency !== 'undefined' ? options.causalConsistency : true
- };
- this.clusterTime = options.initialClusterTime;
- this.operationTime = null;
- this.explicit = !!options.explicit;
- this.owner = options.owner;
- this.defaultTransactionOptions = Object.assign({}, options.defaultTransactionOptions);
- this.transaction = new Transaction();
- }
- /**
- * The server id associated with this session
- * @type {SessionId}
- */
- get id() {
- return this.serverSession.id;
- }
- get serverSession() {
- if (this[kServerSession] == null) {
- this[kServerSession] = this.sessionPool.acquire();
- }
- return this[kServerSession];
- }
- /**
- * Ends this session on the server
- *
- * @param {Object} [options] Optional settings. Currently reserved for future use
- * @param {Function} [callback] Optional callback for completion of this operation
- */
- endSession(options, callback) {
- if (typeof options === 'function') (callback = options), (options = {});
- options = options || {};
- const session = this;
- return maybePromise(this, callback, done => {
- if (session.hasEnded) {
- return done();
- }
- function completeEndSession() {
- // release the server session back to the pool
- session.sessionPool.release(session.serverSession);
- session[kServerSession] = undefined;
- // mark the session as ended, and emit a signal
- session.hasEnded = true;
- session.emit('ended', session);
- // spec indicates that we should ignore all errors for `endSessions`
- done();
- }
- if (session.serverSession && session.inTransaction()) {
- session.abortTransaction(err => {
- if (err) return done(err);
- completeEndSession();
- });
- return;
- }
- completeEndSession();
- });
- }
- /**
- * Advances the operationTime for a ClientSession.
- *
- * @param {Timestamp} operationTime the `BSON.Timestamp` of the operation type it is desired to advance to
- */
- advanceOperationTime(operationTime) {
- if (this.operationTime == null) {
- this.operationTime = operationTime;
- return;
- }
- if (operationTime.greaterThan(this.operationTime)) {
- this.operationTime = operationTime;
- }
- }
- /**
- * Used to determine if this session equals another
- * @param {ClientSession} session
- * @return {boolean} true if the sessions are equal
- */
- equals(session) {
- if (!(session instanceof ClientSession)) {
- return false;
- }
- return this.id.id.buffer.equals(session.id.id.buffer);
- }
- /**
- * Increment the transaction number on the internal ServerSession
- */
- incrementTransactionNumber() {
- this.serverSession.txnNumber++;
- }
- /**
- * @returns {boolean} whether this session is currently in a transaction or not
- */
- inTransaction() {
- return this.transaction.isActive;
- }
- /**
- * Starts a new transaction with the given options.
- *
- * @param {TransactionOptions} options Options for the transaction
- */
- startTransaction(options) {
- assertAlive(this);
- if (this.inTransaction()) {
- throw new MongoError('Transaction already in progress');
- }
- const topologyMaxWireVersion = maxWireVersion(this.topology);
- if (
- isSharded(this.topology) &&
- topologyMaxWireVersion != null &&
- topologyMaxWireVersion < minWireVersionForShardedTransactions
- ) {
- throw new MongoError('Transactions are not supported on sharded clusters in MongoDB < 4.2.');
- }
- // increment txnNumber
- this.incrementTransactionNumber();
- // create transaction state
- this.transaction = new Transaction(
- Object.assign({}, this.clientOptions, options || this.defaultTransactionOptions)
- );
- this.transaction.transition(TxnState.STARTING_TRANSACTION);
- }
- /**
- * Commits the currently active transaction in this session.
- *
- * @param {Function} [callback] optional callback for completion of this operation
- * @return {Promise} A promise is returned if no callback is provided
- */
- commitTransaction(callback) {
- return maybePromise(this, callback, done => endTransaction(this, 'commitTransaction', done));
- }
- /**
- * Aborts the currently active transaction in this session.
- *
- * @param {Function} [callback] optional callback for completion of this operation
- * @return {Promise} A promise is returned if no callback is provided
- */
- abortTransaction(callback) {
- return maybePromise(this, callback, done => endTransaction(this, 'abortTransaction', done));
- }
- /**
- * This is here to ensure that ClientSession is never serialized to BSON.
- * @ignore
- */
- toBSON() {
- throw new Error('ClientSession cannot be serialized to BSON.');
- }
- /**
- * A user provided function to be run within a transaction
- *
- * @callback WithTransactionCallback
- * @param {ClientSession} session The parent session of the transaction running the operation. This should be passed into each operation within the lambda.
- * @returns {Promise} The resulting Promise of operations run within this transaction
- */
- /**
- * Runs a provided lambda within a transaction, retrying either the commit operation
- * or entire transaction as needed (and when the error permits) to better ensure that
- * the transaction can complete successfully.
- *
- * IMPORTANT: This method requires the user to return a Promise, all lambdas that do not
- * return a Promise will result in undefined behavior.
- *
- * @param {WithTransactionCallback} fn
- * @param {TransactionOptions} [options] Optional settings for the transaction
- */
- withTransaction(fn, options) {
- const startTime = now();
- return attemptTransaction(this, startTime, fn, options);
- }
- }
- const MAX_WITH_TRANSACTION_TIMEOUT = 120000;
- const UNSATISFIABLE_WRITE_CONCERN_CODE = 100;
- const UNKNOWN_REPL_WRITE_CONCERN_CODE = 79;
- const MAX_TIME_MS_EXPIRED_CODE = 50;
- const NON_DETERMINISTIC_WRITE_CONCERN_ERRORS = new Set([
- 'CannotSatisfyWriteConcern',
- 'UnknownReplWriteConcern',
- 'UnsatisfiableWriteConcern'
- ]);
- function hasNotTimedOut(startTime, max) {
- return calculateDurationInMs(startTime) < max;
- }
- function isUnknownTransactionCommitResult(err) {
- return (
- isMaxTimeMSExpiredError(err) ||
- (!NON_DETERMINISTIC_WRITE_CONCERN_ERRORS.has(err.codeName) &&
- err.code !== UNSATISFIABLE_WRITE_CONCERN_CODE &&
- err.code !== UNKNOWN_REPL_WRITE_CONCERN_CODE)
- );
- }
- function isMaxTimeMSExpiredError(err) {
- if (err == null) return false;
- return (
- err.code === MAX_TIME_MS_EXPIRED_CODE ||
- (err.writeConcernError && err.writeConcernError.code === MAX_TIME_MS_EXPIRED_CODE)
- );
- }
- function attemptTransactionCommit(session, startTime, fn, options) {
- return session.commitTransaction().catch(err => {
- if (
- err instanceof MongoError &&
- hasNotTimedOut(startTime, MAX_WITH_TRANSACTION_TIMEOUT) &&
- !isMaxTimeMSExpiredError(err)
- ) {
- if (err.hasErrorLabel('UnknownTransactionCommitResult')) {
- return attemptTransactionCommit(session, startTime, fn, options);
- }
- if (err.hasErrorLabel('TransientTransactionError')) {
- return attemptTransaction(session, startTime, fn, options);
- }
- }
- throw err;
- });
- }
- const USER_EXPLICIT_TXN_END_STATES = new Set([
- TxnState.NO_TRANSACTION,
- TxnState.TRANSACTION_COMMITTED,
- TxnState.TRANSACTION_ABORTED
- ]);
- function userExplicitlyEndedTransaction(session) {
- return USER_EXPLICIT_TXN_END_STATES.has(session.transaction.state);
- }
- function attemptTransaction(session, startTime, fn, options) {
- session.startTransaction(options);
- let promise;
- try {
- promise = fn(session);
- } catch (err) {
- promise = Promise.reject(err);
- }
- if (!isPromiseLike(promise)) {
- session.abortTransaction();
- throw new TypeError('Function provided to `withTransaction` must return a Promise');
- }
- return promise
- .then(() => {
- if (userExplicitlyEndedTransaction(session)) {
- return;
- }
- return attemptTransactionCommit(session, startTime, fn, options);
- })
- .catch(err => {
- function maybeRetryOrThrow(err) {
- if (
- err instanceof MongoError &&
- err.hasErrorLabel('TransientTransactionError') &&
- hasNotTimedOut(startTime, MAX_WITH_TRANSACTION_TIMEOUT)
- ) {
- return attemptTransaction(session, startTime, fn, options);
- }
- if (isMaxTimeMSExpiredError(err)) {
- err.addErrorLabel('UnknownTransactionCommitResult');
- }
- throw err;
- }
- if (session.transaction.isActive) {
- return session.abortTransaction().then(() => maybeRetryOrThrow(err));
- }
- return maybeRetryOrThrow(err);
- });
- }
- function endTransaction(session, commandName, callback) {
- if (!assertAlive(session, callback)) {
- // checking result in case callback was called
- return;
- }
- // handle any initial problematic cases
- let txnState = session.transaction.state;
- if (txnState === TxnState.NO_TRANSACTION) {
- callback(new MongoError('No transaction started'));
- return;
- }
- if (commandName === 'commitTransaction') {
- if (
- txnState === TxnState.STARTING_TRANSACTION ||
- txnState === TxnState.TRANSACTION_COMMITTED_EMPTY
- ) {
- // the transaction was never started, we can safely exit here
- session.transaction.transition(TxnState.TRANSACTION_COMMITTED_EMPTY);
- callback(null, null);
- return;
- }
- if (txnState === TxnState.TRANSACTION_ABORTED) {
- callback(new MongoError('Cannot call commitTransaction after calling abortTransaction'));
- return;
- }
- } else {
- if (txnState === TxnState.STARTING_TRANSACTION) {
- // the transaction was never started, we can safely exit here
- session.transaction.transition(TxnState.TRANSACTION_ABORTED);
- callback(null, null);
- return;
- }
- if (txnState === TxnState.TRANSACTION_ABORTED) {
- callback(new MongoError('Cannot call abortTransaction twice'));
- return;
- }
- if (
- txnState === TxnState.TRANSACTION_COMMITTED ||
- txnState === TxnState.TRANSACTION_COMMITTED_EMPTY
- ) {
- callback(new MongoError('Cannot call abortTransaction after calling commitTransaction'));
- return;
- }
- }
- // construct and send the command
- const command = { [commandName]: 1 };
- // apply a writeConcern if specified
- let writeConcern;
- if (session.transaction.options.writeConcern) {
- writeConcern = Object.assign({}, session.transaction.options.writeConcern);
- } else if (session.clientOptions && session.clientOptions.w) {
- writeConcern = { w: session.clientOptions.w };
- }
- if (txnState === TxnState.TRANSACTION_COMMITTED) {
- writeConcern = Object.assign({ wtimeout: 10000 }, writeConcern, { w: 'majority' });
- }
- if (writeConcern) {
- Object.assign(command, { writeConcern });
- }
- if (commandName === 'commitTransaction' && session.transaction.options.maxTimeMS) {
- Object.assign(command, { maxTimeMS: session.transaction.options.maxTimeMS });
- }
- function commandHandler(e, r) {
- if (commandName === 'commitTransaction') {
- session.transaction.transition(TxnState.TRANSACTION_COMMITTED);
- if (
- e &&
- (e instanceof MongoNetworkError ||
- e instanceof MongoWriteConcernError ||
- isRetryableError(e) ||
- isMaxTimeMSExpiredError(e))
- ) {
- if (isUnknownTransactionCommitResult(e)) {
- e.addErrorLabel('UnknownTransactionCommitResult');
- // per txns spec, must unpin session in this case
- session.transaction.unpinServer();
- }
- }
- } else {
- session.transaction.transition(TxnState.TRANSACTION_ABORTED);
- }
- callback(e, r);
- }
- // The spec indicates that we should ignore all errors on `abortTransaction`
- function transactionError(err) {
- return commandName === 'commitTransaction' ? err : null;
- }
- if (
- // Assumption here that commandName is "commitTransaction" or "abortTransaction"
- session.transaction.recoveryToken &&
- supportsRecoveryToken(session)
- ) {
- command.recoveryToken = session.transaction.recoveryToken;
- }
- // send the command
- session.topology.command('admin.$cmd', command, { session }, (err, reply) => {
- if (err && isRetryableError(err)) {
- // SPEC-1185: apply majority write concern when retrying commitTransaction
- if (command.commitTransaction) {
- // per txns spec, must unpin session in this case
- session.transaction.unpinServer();
- command.writeConcern = Object.assign({ wtimeout: 10000 }, command.writeConcern, {
- w: 'majority'
- });
- }
- return session.topology.command('admin.$cmd', command, { session }, (_err, _reply) =>
- commandHandler(transactionError(_err), _reply)
- );
- }
- commandHandler(transactionError(err), reply);
- });
- }
- function supportsRecoveryToken(session) {
- const topology = session.topology;
- return !!topology.s.options.useRecoveryToken;
- }
- /**
- * Reflects the existence of a session on the server. Can be reused by the session pool.
- * WARNING: not meant to be instantiated directly. For internal use only.
- * @ignore
- */
- class ServerSession {
- constructor() {
- this.id = { id: new Binary(uuidV4(), Binary.SUBTYPE_UUID) };
- this.lastUse = now();
- this.txnNumber = 0;
- this.isDirty = false;
- }
- /**
- * Determines if the server session has timed out.
- * @ignore
- * @param {Date} sessionTimeoutMinutes The server's "logicalSessionTimeoutMinutes"
- * @return {boolean} true if the session has timed out.
- */
- hasTimedOut(sessionTimeoutMinutes) {
- // Take the difference of the lastUse timestamp and now, which will result in a value in
- // milliseconds, and then convert milliseconds to minutes to compare to `sessionTimeoutMinutes`
- const idleTimeMinutes = Math.round(
- ((calculateDurationInMs(this.lastUse) % 86400000) % 3600000) / 60000
- );
- return idleTimeMinutes > sessionTimeoutMinutes - 1;
- }
- }
- /**
- * Maintains a pool of Server Sessions.
- * For internal use only
- * @ignore
- */
- class ServerSessionPool {
- constructor(topology) {
- if (topology == null) {
- throw new Error('ServerSessionPool requires a topology');
- }
- this.topology = topology;
- this.sessions = [];
- }
- /**
- * Ends all sessions in the session pool.
- * @ignore
- */
- endAllPooledSessions(callback) {
- if (this.sessions.length) {
- this.topology.endSessions(
- this.sessions.map(session => session.id),
- () => {
- this.sessions = [];
- if (typeof callback === 'function') {
- callback();
- }
- }
- );
- return;
- }
- if (typeof callback === 'function') {
- callback();
- }
- }
- /**
- * Acquire a Server Session from the pool.
- * Iterates through each session in the pool, removing any stale sessions
- * along the way. The first non-stale session found is removed from the
- * pool and returned. If no non-stale session is found, a new ServerSession
- * is created.
- * @ignore
- * @returns {ServerSession}
- */
- acquire() {
- const sessionTimeoutMinutes = this.topology.logicalSessionTimeoutMinutes;
- while (this.sessions.length) {
- const session = this.sessions.shift();
- if (!session.hasTimedOut(sessionTimeoutMinutes)) {
- return session;
- }
- }
- return new ServerSession();
- }
- /**
- * Release a session to the session pool
- * Adds the session back to the session pool if the session has not timed out yet.
- * This method also removes any stale sessions from the pool.
- * @ignore
- * @param {ServerSession} session The session to release to the pool
- */
- release(session) {
- const sessionTimeoutMinutes = this.topology.logicalSessionTimeoutMinutes;
- while (this.sessions.length) {
- const pooledSession = this.sessions[this.sessions.length - 1];
- if (pooledSession.hasTimedOut(sessionTimeoutMinutes)) {
- this.sessions.pop();
- } else {
- break;
- }
- }
- if (!session.hasTimedOut(sessionTimeoutMinutes)) {
- if (session.isDirty) {
- return;
- }
- // otherwise, readd this session to the session pool
- this.sessions.unshift(session);
- }
- }
- }
- // TODO: this should be codified in command construction
- // @see https://github.com/mongodb/specifications/blob/master/source/read-write-concern/read-write-concern.rst#read-concern
- function commandSupportsReadConcern(command, options) {
- if (
- command.aggregate ||
- command.count ||
- command.distinct ||
- command.find ||
- command.parallelCollectionScan ||
- command.geoNear ||
- command.geoSearch
- ) {
- return true;
- }
- if (
- command.mapReduce &&
- options &&
- options.out &&
- (options.out.inline === 1 || options.out === 'inline')
- ) {
- return true;
- }
- return false;
- }
- /**
- * Optionally decorate a command with sessions specific keys
- *
- * @ignore
- * @param {ClientSession} session the session tracking transaction state
- * @param {Object} command the command to decorate
- * @param {Object} topology the topology for tracking the cluster time
- * @param {Object} [options] Optional settings passed to calling operation
- * @return {MongoError|null} An error, if some error condition was met
- */
- function applySession(session, command, options) {
- if (session.hasEnded) {
- // TODO: merge this with `assertAlive`, did not want to throw a try/catch here
- return new MongoError('Cannot use a session that has ended');
- }
- // SPEC-1019: silently ignore explicit session with unacknowledged write for backwards compatibility
- if (options && options.writeConcern && options.writeConcern.w === 0) {
- return;
- }
- const serverSession = session.serverSession;
- serverSession.lastUse = now();
- command.lsid = serverSession.id;
- // first apply non-transaction-specific sessions data
- const inTransaction = session.inTransaction() || isTransactionCommand(command);
- const isRetryableWrite = options.willRetryWrite;
- const shouldApplyReadConcern = commandSupportsReadConcern(command, options);
- if (serverSession.txnNumber && (isRetryableWrite || inTransaction)) {
- command.txnNumber = BSON.Long.fromNumber(serverSession.txnNumber);
- }
- // now attempt to apply transaction-specific sessions data
- if (!inTransaction) {
- if (session.transaction.state !== TxnState.NO_TRANSACTION) {
- session.transaction.transition(TxnState.NO_TRANSACTION);
- }
- // TODO: the following should only be applied to read operation per spec.
- // for causal consistency
- if (session.supports.causalConsistency && session.operationTime && shouldApplyReadConcern) {
- command.readConcern = command.readConcern || {};
- Object.assign(command.readConcern, { afterClusterTime: session.operationTime });
- }
- return;
- }
- if (options.readPreference && !options.readPreference.equals(ReadPreference.primary)) {
- return new MongoError(
- `Read preference in a transaction must be primary, not: ${options.readPreference.mode}`
- );
- }
- // `autocommit` must always be false to differentiate from retryable writes
- command.autocommit = false;
- if (session.transaction.state === TxnState.STARTING_TRANSACTION) {
- session.transaction.transition(TxnState.TRANSACTION_IN_PROGRESS);
- command.startTransaction = true;
- const readConcern =
- session.transaction.options.readConcern || session.clientOptions.readConcern;
- if (readConcern) {
- command.readConcern = readConcern;
- }
- if (session.supports.causalConsistency && session.operationTime) {
- command.readConcern = command.readConcern || {};
- Object.assign(command.readConcern, { afterClusterTime: session.operationTime });
- }
- }
- }
- function updateSessionFromResponse(session, document) {
- if (document.$clusterTime) {
- resolveClusterTime(session, document.$clusterTime);
- }
- if (document.operationTime && session && session.supports.causalConsistency) {
- session.advanceOperationTime(document.operationTime);
- }
- if (document.recoveryToken && session && session.inTransaction()) {
- session.transaction._recoveryToken = document.recoveryToken;
- }
- }
- module.exports = {
- ClientSession,
- ServerSession,
- ServerSessionPool,
- TxnState,
- applySession,
- updateSessionFromResponse,
- commandSupportsReadConcern
- };
|