123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366 |
- "use strict";
- Object.defineProperty(exports, "__esModule", { value: true });
- exports.Server = void 0;
- const connection_1 = require("../cmap/connection");
- const connection_pool_1 = require("../cmap/connection_pool");
- const constants_1 = require("../constants");
- const error_1 = require("../error");
- const logger_1 = require("../logger");
- const mongo_types_1 = require("../mongo_types");
- const transactions_1 = require("../transactions");
- const utils_1 = require("../utils");
- const common_1 = require("./common");
- const monitor_1 = require("./monitor");
- const server_description_1 = require("./server_description");
- const stateTransition = (0, utils_1.makeStateMachine)({
- [common_1.STATE_CLOSED]: [common_1.STATE_CLOSED, common_1.STATE_CONNECTING],
- [common_1.STATE_CONNECTING]: [common_1.STATE_CONNECTING, common_1.STATE_CLOSING, common_1.STATE_CONNECTED, common_1.STATE_CLOSED],
- [common_1.STATE_CONNECTED]: [common_1.STATE_CONNECTED, common_1.STATE_CLOSING, common_1.STATE_CLOSED],
- [common_1.STATE_CLOSING]: [common_1.STATE_CLOSING, common_1.STATE_CLOSED]
- });
- /** @internal */
- const kMonitor = Symbol('monitor');
- /** @internal */
- class Server extends mongo_types_1.TypedEventEmitter {
- /**
- * Create a server
- */
- constructor(topology, description, options) {
- super();
- this.serverApi = options.serverApi;
- const poolOptions = { hostAddress: description.hostAddress, ...options };
- this.s = {
- description,
- options,
- logger: new logger_1.Logger('Server'),
- state: common_1.STATE_CLOSED,
- topology,
- pool: new connection_pool_1.ConnectionPool(poolOptions)
- };
- for (const event of [...constants_1.CMAP_EVENTS, ...constants_1.APM_EVENTS]) {
- this.s.pool.on(event, (e) => this.emit(event, e));
- }
- this.s.pool.on(connection_1.Connection.CLUSTER_TIME_RECEIVED, (clusterTime) => {
- this.clusterTime = clusterTime;
- });
- // monitoring is disabled in load balancing mode
- if (this.loadBalanced)
- return;
- // create the monitor
- this[kMonitor] = new monitor_1.Monitor(this, this.s.options);
- for (const event of constants_1.HEARTBEAT_EVENTS) {
- this[kMonitor].on(event, (e) => this.emit(event, e));
- }
- this[kMonitor].on('resetConnectionPool', () => {
- this.s.pool.clear();
- });
- this[kMonitor].on('resetServer', (error) => markServerUnknown(this, error));
- this[kMonitor].on(Server.SERVER_HEARTBEAT_SUCCEEDED, (event) => {
- this.emit(Server.DESCRIPTION_RECEIVED, new server_description_1.ServerDescription(this.description.hostAddress, event.reply, {
- roundTripTime: calculateRoundTripTime(this.description.roundTripTime, event.duration)
- }));
- if (this.s.state === common_1.STATE_CONNECTING) {
- stateTransition(this, common_1.STATE_CONNECTED);
- this.emit(Server.CONNECT, this);
- }
- });
- }
- get clusterTime() {
- return this.s.topology.clusterTime;
- }
- set clusterTime(clusterTime) {
- this.s.topology.clusterTime = clusterTime;
- }
- get description() {
- return this.s.description;
- }
- get name() {
- return this.s.description.address;
- }
- get autoEncrypter() {
- if (this.s.options && this.s.options.autoEncrypter) {
- return this.s.options.autoEncrypter;
- }
- }
- get loadBalanced() {
- return this.s.topology.description.type === common_1.TopologyType.LoadBalanced;
- }
- /**
- * Initiate server connect
- */
- connect() {
- if (this.s.state !== common_1.STATE_CLOSED) {
- return;
- }
- stateTransition(this, common_1.STATE_CONNECTING);
- // If in load balancer mode we automatically set the server to
- // a load balancer. It never transitions out of this state and
- // has no monitor.
- if (!this.loadBalanced) {
- this[kMonitor].connect();
- }
- else {
- stateTransition(this, common_1.STATE_CONNECTED);
- this.emit(Server.CONNECT, this);
- }
- }
- /** Destroy the server connection */
- destroy(options, callback) {
- if (typeof options === 'function')
- (callback = options), (options = {});
- options = Object.assign({}, { force: false }, options);
- if (this.s.state === common_1.STATE_CLOSED) {
- if (typeof callback === 'function') {
- callback();
- }
- return;
- }
- stateTransition(this, common_1.STATE_CLOSING);
- if (!this.loadBalanced) {
- this[kMonitor].close();
- }
- this.s.pool.close(options, err => {
- stateTransition(this, common_1.STATE_CLOSED);
- this.emit('closed');
- if (typeof callback === 'function') {
- callback(err);
- }
- });
- }
- /**
- * Immediately schedule monitoring of this server. If there already an attempt being made
- * this will be a no-op.
- */
- requestCheck() {
- if (!this.loadBalanced) {
- this[kMonitor].requestCheck();
- }
- }
- command(ns, cmd, options, callback) {
- if (typeof options === 'function') {
- (callback = options), (options = {}), (options = options !== null && options !== void 0 ? options : {});
- }
- if (callback == null) {
- throw new error_1.MongoInvalidArgumentError('Callback must be provided');
- }
- if (ns.db == null || typeof ns === 'string') {
- throw new error_1.MongoInvalidArgumentError('Namespace must not be a string');
- }
- if (this.s.state === common_1.STATE_CLOSING || this.s.state === common_1.STATE_CLOSED) {
- callback(new error_1.MongoServerClosedError());
- return;
- }
- // Clone the options
- const finalOptions = Object.assign({}, options, { wireProtocolCommand: false });
- // There are cases where we need to flag the read preference not to get sent in
- // the command, such as pre-5.0 servers attempting to perform an aggregate write
- // with a non-primary read preference. In this case the effective read preference
- // (primary) is not the same as the provided and must be removed completely.
- if (finalOptions.omitReadPreference) {
- delete finalOptions.readPreference;
- }
- // error if collation not supported
- if ((0, utils_1.collationNotSupported)(this, cmd)) {
- callback(new error_1.MongoCompatibilityError(`Server ${this.name} does not support collation`));
- return;
- }
- const session = finalOptions.session;
- const conn = session === null || session === void 0 ? void 0 : session.pinnedConnection;
- // NOTE: This is a hack! We can't retrieve the connections used for executing an operation
- // (and prevent them from being checked back in) at the point of operation execution.
- // This should be considered as part of the work for NODE-2882
- if (this.loadBalanced && session && conn == null && isPinnableCommand(cmd, session)) {
- this.s.pool.checkOut((err, checkedOut) => {
- if (err || checkedOut == null) {
- if (callback)
- return callback(err);
- return;
- }
- session.pin(checkedOut);
- this.command(ns, cmd, finalOptions, callback);
- });
- return;
- }
- this.s.pool.withConnection(conn, (err, conn, cb) => {
- if (err || !conn) {
- markServerUnknown(this, err);
- return cb(err);
- }
- conn.command(ns, cmd, finalOptions, makeOperationHandler(this, conn, cmd, finalOptions, cb));
- }, callback);
- }
- /**
- * Execute a query against the server
- * @internal
- */
- query(ns, cmd, options, callback) {
- if (this.s.state === common_1.STATE_CLOSING || this.s.state === common_1.STATE_CLOSED) {
- callback(new error_1.MongoServerClosedError());
- return;
- }
- this.s.pool.withConnection(undefined, (err, conn, cb) => {
- if (err || !conn) {
- markServerUnknown(this, err);
- return cb(err);
- }
- conn.query(ns, cmd, options, makeOperationHandler(this, conn, cmd, options, cb));
- }, callback);
- }
- /**
- * Execute a `getMore` against the server
- * @internal
- */
- getMore(ns, cursorId, options, callback) {
- var _a;
- if (this.s.state === common_1.STATE_CLOSING || this.s.state === common_1.STATE_CLOSED) {
- callback(new error_1.MongoServerClosedError());
- return;
- }
- this.s.pool.withConnection((_a = options.session) === null || _a === void 0 ? void 0 : _a.pinnedConnection, (err, conn, cb) => {
- if (err || !conn) {
- markServerUnknown(this, err);
- return cb(err);
- }
- conn.getMore(ns, cursorId, options, makeOperationHandler(this, conn, {}, options, cb));
- }, callback);
- }
- /**
- * Execute a `killCursors` command against the server
- * @internal
- */
- killCursors(ns, cursorIds, options, callback) {
- var _a;
- if (this.s.state === common_1.STATE_CLOSING || this.s.state === common_1.STATE_CLOSED) {
- if (typeof callback === 'function') {
- callback(new error_1.MongoServerClosedError());
- }
- return;
- }
- this.s.pool.withConnection((_a = options.session) === null || _a === void 0 ? void 0 : _a.pinnedConnection, (err, conn, cb) => {
- if (err || !conn) {
- markServerUnknown(this, err);
- return cb(err);
- }
- conn.killCursors(ns, cursorIds, options, makeOperationHandler(this, conn, {}, undefined, cb));
- }, callback);
- }
- }
- exports.Server = Server;
- /** @event */
- Server.SERVER_HEARTBEAT_STARTED = constants_1.SERVER_HEARTBEAT_STARTED;
- /** @event */
- Server.SERVER_HEARTBEAT_SUCCEEDED = constants_1.SERVER_HEARTBEAT_SUCCEEDED;
- /** @event */
- Server.SERVER_HEARTBEAT_FAILED = constants_1.SERVER_HEARTBEAT_FAILED;
- /** @event */
- Server.CONNECT = constants_1.CONNECT;
- /** @event */
- Server.DESCRIPTION_RECEIVED = constants_1.DESCRIPTION_RECEIVED;
- /** @event */
- Server.CLOSED = constants_1.CLOSED;
- /** @event */
- Server.ENDED = constants_1.ENDED;
- function calculateRoundTripTime(oldRtt, duration) {
- if (oldRtt === -1) {
- return duration;
- }
- const alpha = 0.2;
- return alpha * duration + (1 - alpha) * oldRtt;
- }
- function markServerUnknown(server, error) {
- // Load balancer servers can never be marked unknown.
- if (server.loadBalanced) {
- return;
- }
- if (error instanceof error_1.MongoNetworkError && !(error instanceof error_1.MongoNetworkTimeoutError)) {
- server[kMonitor].reset();
- }
- server.emit(Server.DESCRIPTION_RECEIVED, new server_description_1.ServerDescription(server.description.hostAddress, undefined, {
- error,
- topologyVersion: error && error.topologyVersion ? error.topologyVersion : server.description.topologyVersion
- }));
- }
- function isPinnableCommand(cmd, session) {
- if (session) {
- return (session.inTransaction() ||
- 'aggregate' in cmd ||
- 'find' in cmd ||
- 'getMore' in cmd ||
- 'listCollections' in cmd ||
- 'listIndexes' in cmd);
- }
- return false;
- }
- function connectionIsStale(pool, connection) {
- if (connection.serviceId) {
- return (connection.generation !== pool.serviceGenerations.get(connection.serviceId.toHexString()));
- }
- return connection.generation !== pool.generation;
- }
- function shouldHandleStateChangeError(server, err) {
- const etv = err.topologyVersion;
- const stv = server.description.topologyVersion;
- return (0, server_description_1.compareTopologyVersion)(stv, etv) < 0;
- }
- function inActiveTransaction(session, cmd) {
- return session && session.inTransaction() && !(0, transactions_1.isTransactionCommand)(cmd);
- }
- /** this checks the retryWrites option passed down from the client options, it
- * does not check if the server supports retryable writes */
- function isRetryableWritesEnabled(topology) {
- return topology.s.options.retryWrites !== false;
- }
- function makeOperationHandler(server, connection, cmd, options, callback) {
- const session = options === null || options === void 0 ? void 0 : options.session;
- return function handleOperationResult(err, result) {
- if (err && !connectionIsStale(server.s.pool, connection)) {
- if (err instanceof error_1.MongoNetworkError) {
- if (session && !session.hasEnded && session.serverSession) {
- session.serverSession.isDirty = true;
- }
- // inActiveTransaction check handles commit and abort.
- if (inActiveTransaction(session, cmd) && !err.hasErrorLabel('TransientTransactionError')) {
- err.addErrorLabel('TransientTransactionError');
- }
- if ((isRetryableWritesEnabled(server.s.topology) || (0, transactions_1.isTransactionCommand)(cmd)) &&
- (0, utils_1.supportsRetryableWrites)(server) &&
- !inActiveTransaction(session, cmd)) {
- err.addErrorLabel('RetryableWriteError');
- }
- if (!(err instanceof error_1.MongoNetworkTimeoutError) || (0, error_1.isNetworkErrorBeforeHandshake)(err)) {
- // In load balanced mode we never mark the server as unknown and always
- // clear for the specific service id.
- server.s.pool.clear(connection.serviceId);
- if (!server.loadBalanced) {
- markServerUnknown(server, err);
- }
- }
- }
- else {
- // if pre-4.4 server, then add error label if its a retryable write error
- if ((isRetryableWritesEnabled(server.s.topology) || (0, transactions_1.isTransactionCommand)(cmd)) &&
- (0, utils_1.maxWireVersion)(server) < 9 &&
- (0, error_1.isRetryableWriteError)(err) &&
- !inActiveTransaction(session, cmd)) {
- err.addErrorLabel('RetryableWriteError');
- }
- if ((0, error_1.isSDAMUnrecoverableError)(err)) {
- if (shouldHandleStateChangeError(server, err)) {
- if ((0, utils_1.maxWireVersion)(server) <= 7 || (0, error_1.isNodeShuttingDownError)(err)) {
- server.s.pool.clear(connection.serviceId);
- }
- if (!server.loadBalanced) {
- markServerUnknown(server, err);
- process.nextTick(() => server.requestCheck());
- }
- }
- }
- }
- if (session && session.isPinned && err.hasErrorLabel('TransientTransactionError')) {
- session.unpin({ force: true });
- }
- }
- callback(err, result);
- };
- }
- //# sourceMappingURL=server.js.map
|