server.js 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.Server = void 0;
  4. const connection_1 = require("../cmap/connection");
  5. const connection_pool_1 = require("../cmap/connection_pool");
  6. const constants_1 = require("../constants");
  7. const error_1 = require("../error");
  8. const logger_1 = require("../logger");
  9. const mongo_types_1 = require("../mongo_types");
  10. const transactions_1 = require("../transactions");
  11. const utils_1 = require("../utils");
  12. const common_1 = require("./common");
  13. const monitor_1 = require("./monitor");
  14. const server_description_1 = require("./server_description");
  15. const stateTransition = (0, utils_1.makeStateMachine)({
  16. [common_1.STATE_CLOSED]: [common_1.STATE_CLOSED, common_1.STATE_CONNECTING],
  17. [common_1.STATE_CONNECTING]: [common_1.STATE_CONNECTING, common_1.STATE_CLOSING, common_1.STATE_CONNECTED, common_1.STATE_CLOSED],
  18. [common_1.STATE_CONNECTED]: [common_1.STATE_CONNECTED, common_1.STATE_CLOSING, common_1.STATE_CLOSED],
  19. [common_1.STATE_CLOSING]: [common_1.STATE_CLOSING, common_1.STATE_CLOSED]
  20. });
  21. /** @internal */
  22. const kMonitor = Symbol('monitor');
  23. /** @internal */
  24. class Server extends mongo_types_1.TypedEventEmitter {
  25. /**
  26. * Create a server
  27. */
  28. constructor(topology, description, options) {
  29. super();
  30. this.serverApi = options.serverApi;
  31. const poolOptions = { hostAddress: description.hostAddress, ...options };
  32. this.s = {
  33. description,
  34. options,
  35. logger: new logger_1.Logger('Server'),
  36. state: common_1.STATE_CLOSED,
  37. topology,
  38. pool: new connection_pool_1.ConnectionPool(poolOptions),
  39. operationCount: 0
  40. };
  41. for (const event of [...constants_1.CMAP_EVENTS, ...constants_1.APM_EVENTS]) {
  42. this.s.pool.on(event, (e) => this.emit(event, e));
  43. }
  44. this.s.pool.on(connection_1.Connection.CLUSTER_TIME_RECEIVED, (clusterTime) => {
  45. this.clusterTime = clusterTime;
  46. });
  47. if (this.loadBalanced) {
  48. this[kMonitor] = null;
  49. // monitoring is disabled in load balancing mode
  50. return;
  51. }
  52. // create the monitor
  53. // TODO(NODE-4144): Remove new variable for type narrowing
  54. const monitor = new monitor_1.Monitor(this, this.s.options);
  55. this[kMonitor] = monitor;
  56. for (const event of constants_1.HEARTBEAT_EVENTS) {
  57. monitor.on(event, (e) => this.emit(event, e));
  58. }
  59. monitor.on('resetConnectionPool', () => {
  60. this.s.pool.clear();
  61. });
  62. monitor.on('resetServer', (error) => markServerUnknown(this, error));
  63. monitor.on(Server.SERVER_HEARTBEAT_SUCCEEDED, (event) => {
  64. this.emit(Server.DESCRIPTION_RECEIVED, new server_description_1.ServerDescription(this.description.hostAddress, event.reply, {
  65. roundTripTime: calculateRoundTripTime(this.description.roundTripTime, event.duration)
  66. }));
  67. if (this.s.state === common_1.STATE_CONNECTING) {
  68. stateTransition(this, common_1.STATE_CONNECTED);
  69. this.emit(Server.CONNECT, this);
  70. }
  71. });
  72. }
  73. get clusterTime() {
  74. return this.s.topology.clusterTime;
  75. }
  76. set clusterTime(clusterTime) {
  77. this.s.topology.clusterTime = clusterTime;
  78. }
  79. get description() {
  80. return this.s.description;
  81. }
  82. get name() {
  83. return this.s.description.address;
  84. }
  85. get autoEncrypter() {
  86. if (this.s.options && this.s.options.autoEncrypter) {
  87. return this.s.options.autoEncrypter;
  88. }
  89. return;
  90. }
  91. get loadBalanced() {
  92. return this.s.topology.description.type === common_1.TopologyType.LoadBalanced;
  93. }
  94. /**
  95. * Initiate server connect
  96. */
  97. connect() {
  98. var _a;
  99. if (this.s.state !== common_1.STATE_CLOSED) {
  100. return;
  101. }
  102. stateTransition(this, common_1.STATE_CONNECTING);
  103. // If in load balancer mode we automatically set the server to
  104. // a load balancer. It never transitions out of this state and
  105. // has no monitor.
  106. if (!this.loadBalanced) {
  107. (_a = this[kMonitor]) === null || _a === void 0 ? void 0 : _a.connect();
  108. }
  109. else {
  110. stateTransition(this, common_1.STATE_CONNECTED);
  111. this.emit(Server.CONNECT, this);
  112. }
  113. }
  114. /** Destroy the server connection */
  115. destroy(options, callback) {
  116. var _a;
  117. if (typeof options === 'function')
  118. (callback = options), (options = {});
  119. options = Object.assign({}, { force: false }, options);
  120. if (this.s.state === common_1.STATE_CLOSED) {
  121. if (typeof callback === 'function') {
  122. callback();
  123. }
  124. return;
  125. }
  126. stateTransition(this, common_1.STATE_CLOSING);
  127. if (!this.loadBalanced) {
  128. (_a = this[kMonitor]) === null || _a === void 0 ? void 0 : _a.close();
  129. }
  130. this.s.pool.close(options, err => {
  131. stateTransition(this, common_1.STATE_CLOSED);
  132. this.emit('closed');
  133. if (typeof callback === 'function') {
  134. callback(err);
  135. }
  136. });
  137. }
  138. /**
  139. * Immediately schedule monitoring of this server. If there already an attempt being made
  140. * this will be a no-op.
  141. */
  142. requestCheck() {
  143. var _a;
  144. if (!this.loadBalanced) {
  145. (_a = this[kMonitor]) === null || _a === void 0 ? void 0 : _a.requestCheck();
  146. }
  147. }
  148. /**
  149. * Execute a command
  150. * @internal
  151. */
  152. command(ns, cmd, options, callback) {
  153. if (callback == null) {
  154. throw new error_1.MongoInvalidArgumentError('Callback must be provided');
  155. }
  156. if (ns.db == null || typeof ns === 'string') {
  157. throw new error_1.MongoInvalidArgumentError('Namespace must not be a string');
  158. }
  159. if (this.s.state === common_1.STATE_CLOSING || this.s.state === common_1.STATE_CLOSED) {
  160. callback(new error_1.MongoServerClosedError());
  161. return;
  162. }
  163. // Clone the options
  164. const finalOptions = Object.assign({}, options, { wireProtocolCommand: false });
  165. // There are cases where we need to flag the read preference not to get sent in
  166. // the command, such as pre-5.0 servers attempting to perform an aggregate write
  167. // with a non-primary read preference. In this case the effective read preference
  168. // (primary) is not the same as the provided and must be removed completely.
  169. if (finalOptions.omitReadPreference) {
  170. delete finalOptions.readPreference;
  171. }
  172. // error if collation not supported
  173. if ((0, utils_1.collationNotSupported)(this, cmd)) {
  174. callback(new error_1.MongoCompatibilityError(`Server ${this.name} does not support collation`));
  175. return;
  176. }
  177. const session = finalOptions.session;
  178. const conn = session === null || session === void 0 ? void 0 : session.pinnedConnection;
  179. // NOTE: This is a hack! We can't retrieve the connections used for executing an operation
  180. // (and prevent them from being checked back in) at the point of operation execution.
  181. // This should be considered as part of the work for NODE-2882
  182. // NOTE:
  183. // When incrementing operation count, it's important that we increment it before we
  184. // attempt to check out a connection from the pool. This ensures that operations that
  185. // are waiting for a connection are included in the operation count. Load balanced
  186. // mode will only ever have a single server, so the operation count doesn't matter.
  187. // Incrementing the operation count above the logic to handle load balanced mode would
  188. // require special logic to decrement it again, or would double increment (the load
  189. // balanced code makes a recursive call). Instead, we increment the count after this
  190. // check.
  191. if (this.loadBalanced && session && conn == null && isPinnableCommand(cmd, session)) {
  192. this.s.pool.checkOut((err, checkedOut) => {
  193. if (err || checkedOut == null) {
  194. if (callback)
  195. return callback(err);
  196. return;
  197. }
  198. session.pin(checkedOut);
  199. this.command(ns, cmd, finalOptions, callback);
  200. });
  201. return;
  202. }
  203. this.s.operationCount += 1;
  204. this.s.pool.withConnection(conn, (err, conn, cb) => {
  205. if (err || !conn) {
  206. this.s.operationCount -= 1;
  207. markServerUnknown(this, err);
  208. return cb(err);
  209. }
  210. conn.command(ns, cmd, finalOptions, makeOperationHandler(this, conn, cmd, finalOptions, (error, response) => {
  211. this.s.operationCount -= 1;
  212. cb(error, response);
  213. }));
  214. }, callback);
  215. }
  216. /**
  217. * Execute a `getMore` against the server
  218. * @internal
  219. */
  220. getMore(ns, cursorId, options, callback) {
  221. var _a;
  222. if (this.s.state === common_1.STATE_CLOSING || this.s.state === common_1.STATE_CLOSED) {
  223. callback(new error_1.MongoServerClosedError());
  224. return;
  225. }
  226. this.s.operationCount += 1;
  227. this.s.pool.withConnection((_a = options.session) === null || _a === void 0 ? void 0 : _a.pinnedConnection, (err, conn, cb) => {
  228. if (err || !conn) {
  229. this.s.operationCount -= 1;
  230. markServerUnknown(this, err);
  231. return cb(err);
  232. }
  233. conn.getMore(ns, cursorId, options, makeOperationHandler(this, conn, {}, options, (error, response) => {
  234. this.s.operationCount -= 1;
  235. cb(error, response);
  236. }));
  237. }, callback);
  238. }
  239. /**
  240. * Execute a `killCursors` command against the server
  241. * @internal
  242. */
  243. killCursors(ns, cursorIds, options, callback) {
  244. var _a;
  245. if (this.s.state === common_1.STATE_CLOSING || this.s.state === common_1.STATE_CLOSED) {
  246. if (typeof callback === 'function') {
  247. callback(new error_1.MongoServerClosedError());
  248. }
  249. return;
  250. }
  251. this.s.operationCount += 1;
  252. this.s.pool.withConnection((_a = options.session) === null || _a === void 0 ? void 0 : _a.pinnedConnection, (err, conn, cb) => {
  253. if (err || !conn) {
  254. this.s.operationCount -= 1;
  255. markServerUnknown(this, err);
  256. return cb(err);
  257. }
  258. conn.killCursors(ns, cursorIds, options, makeOperationHandler(this, conn, {}, undefined, (error, response) => {
  259. this.s.operationCount -= 1;
  260. cb(error, response);
  261. }));
  262. }, callback);
  263. }
  264. }
  265. exports.Server = Server;
  266. /** @event */
  267. Server.SERVER_HEARTBEAT_STARTED = constants_1.SERVER_HEARTBEAT_STARTED;
  268. /** @event */
  269. Server.SERVER_HEARTBEAT_SUCCEEDED = constants_1.SERVER_HEARTBEAT_SUCCEEDED;
  270. /** @event */
  271. Server.SERVER_HEARTBEAT_FAILED = constants_1.SERVER_HEARTBEAT_FAILED;
  272. /** @event */
  273. Server.CONNECT = constants_1.CONNECT;
  274. /** @event */
  275. Server.DESCRIPTION_RECEIVED = constants_1.DESCRIPTION_RECEIVED;
  276. /** @event */
  277. Server.CLOSED = constants_1.CLOSED;
  278. /** @event */
  279. Server.ENDED = constants_1.ENDED;
  280. function calculateRoundTripTime(oldRtt, duration) {
  281. if (oldRtt === -1) {
  282. return duration;
  283. }
  284. const alpha = 0.2;
  285. return alpha * duration + (1 - alpha) * oldRtt;
  286. }
  287. function markServerUnknown(server, error) {
  288. var _a;
  289. // Load balancer servers can never be marked unknown.
  290. if (server.loadBalanced) {
  291. return;
  292. }
  293. if (error instanceof error_1.MongoNetworkError && !(error instanceof error_1.MongoNetworkTimeoutError)) {
  294. (_a = server[kMonitor]) === null || _a === void 0 ? void 0 : _a.reset();
  295. }
  296. server.emit(Server.DESCRIPTION_RECEIVED, new server_description_1.ServerDescription(server.description.hostAddress, undefined, {
  297. error,
  298. topologyVersion: error && error.topologyVersion ? error.topologyVersion : server.description.topologyVersion
  299. }));
  300. }
  301. function isPinnableCommand(cmd, session) {
  302. if (session) {
  303. return (session.inTransaction() ||
  304. 'aggregate' in cmd ||
  305. 'find' in cmd ||
  306. 'getMore' in cmd ||
  307. 'listCollections' in cmd ||
  308. 'listIndexes' in cmd);
  309. }
  310. return false;
  311. }
  312. function connectionIsStale(pool, connection) {
  313. if (connection.serviceId) {
  314. return (connection.generation !== pool.serviceGenerations.get(connection.serviceId.toHexString()));
  315. }
  316. return connection.generation !== pool.generation;
  317. }
  318. function shouldHandleStateChangeError(server, err) {
  319. const etv = err.topologyVersion;
  320. const stv = server.description.topologyVersion;
  321. return (0, server_description_1.compareTopologyVersion)(stv, etv) < 0;
  322. }
  323. function inActiveTransaction(session, cmd) {
  324. return session && session.inTransaction() && !(0, transactions_1.isTransactionCommand)(cmd);
  325. }
  326. /** this checks the retryWrites option passed down from the client options, it
  327. * does not check if the server supports retryable writes */
  328. function isRetryableWritesEnabled(topology) {
  329. return topology.s.options.retryWrites !== false;
  330. }
  331. function makeOperationHandler(server, connection, cmd, options, callback) {
  332. const session = options === null || options === void 0 ? void 0 : options.session;
  333. return function handleOperationResult(error, result) {
  334. if (result != null) {
  335. return callback(undefined, result);
  336. }
  337. if (!error) {
  338. return callback(new error_1.MongoUnexpectedServerResponseError('Empty response with no error'));
  339. }
  340. if (!(error instanceof error_1.MongoError)) {
  341. // Node.js or some other error we have not special handling for
  342. return callback(error);
  343. }
  344. if (connectionIsStale(server.s.pool, connection)) {
  345. return callback(error);
  346. }
  347. if (error instanceof error_1.MongoNetworkError) {
  348. if (session && !session.hasEnded && session.serverSession) {
  349. session.serverSession.isDirty = true;
  350. }
  351. // inActiveTransaction check handles commit and abort.
  352. if (inActiveTransaction(session, cmd) &&
  353. !error.hasErrorLabel(error_1.MongoErrorLabel.TransientTransactionError)) {
  354. error.addErrorLabel(error_1.MongoErrorLabel.TransientTransactionError);
  355. }
  356. if ((isRetryableWritesEnabled(server.s.topology) || (0, transactions_1.isTransactionCommand)(cmd)) &&
  357. (0, utils_1.supportsRetryableWrites)(server) &&
  358. !inActiveTransaction(session, cmd)) {
  359. error.addErrorLabel(error_1.MongoErrorLabel.RetryableWriteError);
  360. }
  361. if (!(error instanceof error_1.MongoNetworkTimeoutError) || (0, error_1.isNetworkErrorBeforeHandshake)(error)) {
  362. // In load balanced mode we never mark the server as unknown and always
  363. // clear for the specific service id.
  364. server.s.pool.clear(connection.serviceId);
  365. if (!server.loadBalanced) {
  366. markServerUnknown(server, error);
  367. }
  368. }
  369. }
  370. else {
  371. if ((isRetryableWritesEnabled(server.s.topology) || (0, transactions_1.isTransactionCommand)(cmd)) &&
  372. (0, error_1.needsRetryableWriteLabel)(error, (0, utils_1.maxWireVersion)(server)) &&
  373. !inActiveTransaction(session, cmd)) {
  374. error.addErrorLabel(error_1.MongoErrorLabel.RetryableWriteError);
  375. }
  376. if ((0, error_1.isSDAMUnrecoverableError)(error)) {
  377. if (shouldHandleStateChangeError(server, error)) {
  378. if ((0, utils_1.maxWireVersion)(server) <= 7 || (0, error_1.isNodeShuttingDownError)(error)) {
  379. server.s.pool.clear(connection.serviceId);
  380. }
  381. if (!server.loadBalanced) {
  382. markServerUnknown(server, error);
  383. process.nextTick(() => server.requestCheck());
  384. }
  385. }
  386. }
  387. }
  388. if (session &&
  389. session.isPinned &&
  390. error.hasErrorLabel(error_1.MongoErrorLabel.TransientTransactionError)) {
  391. session.unpin({ force: true });
  392. }
  393. return callback(error);
  394. };
  395. }
  396. //# sourceMappingURL=server.js.map