server.js 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.Server = void 0;
  4. const connection_1 = require("../cmap/connection");
  5. const connection_pool_1 = require("../cmap/connection_pool");
  6. const constants_1 = require("../constants");
  7. const error_1 = require("../error");
  8. const logger_1 = require("../logger");
  9. const mongo_types_1 = require("../mongo_types");
  10. const transactions_1 = require("../transactions");
  11. const utils_1 = require("../utils");
  12. const common_1 = require("./common");
  13. const monitor_1 = require("./monitor");
  14. const server_description_1 = require("./server_description");
  15. const stateTransition = (0, utils_1.makeStateMachine)({
  16. [common_1.STATE_CLOSED]: [common_1.STATE_CLOSED, common_1.STATE_CONNECTING],
  17. [common_1.STATE_CONNECTING]: [common_1.STATE_CONNECTING, common_1.STATE_CLOSING, common_1.STATE_CONNECTED, common_1.STATE_CLOSED],
  18. [common_1.STATE_CONNECTED]: [common_1.STATE_CONNECTED, common_1.STATE_CLOSING, common_1.STATE_CLOSED],
  19. [common_1.STATE_CLOSING]: [common_1.STATE_CLOSING, common_1.STATE_CLOSED]
  20. });
  21. /** @internal */
  22. const kMonitor = Symbol('monitor');
  23. /** @internal */
  24. class Server extends mongo_types_1.TypedEventEmitter {
  25. /**
  26. * Create a server
  27. */
  28. constructor(topology, description, options) {
  29. super();
  30. this.serverApi = options.serverApi;
  31. const poolOptions = { hostAddress: description.hostAddress, ...options };
  32. this.s = {
  33. description,
  34. options,
  35. logger: new logger_1.Logger('Server'),
  36. state: common_1.STATE_CLOSED,
  37. topology,
  38. pool: new connection_pool_1.ConnectionPool(poolOptions)
  39. };
  40. for (const event of [...constants_1.CMAP_EVENTS, ...constants_1.APM_EVENTS]) {
  41. this.s.pool.on(event, (e) => this.emit(event, e));
  42. }
  43. this.s.pool.on(connection_1.Connection.CLUSTER_TIME_RECEIVED, (clusterTime) => {
  44. this.clusterTime = clusterTime;
  45. });
  46. // monitoring is disabled in load balancing mode
  47. if (this.loadBalanced)
  48. return;
  49. // create the monitor
  50. this[kMonitor] = new monitor_1.Monitor(this, this.s.options);
  51. for (const event of constants_1.HEARTBEAT_EVENTS) {
  52. this[kMonitor].on(event, (e) => this.emit(event, e));
  53. }
  54. this[kMonitor].on('resetConnectionPool', () => {
  55. this.s.pool.clear();
  56. });
  57. this[kMonitor].on('resetServer', (error) => markServerUnknown(this, error));
  58. this[kMonitor].on(Server.SERVER_HEARTBEAT_SUCCEEDED, (event) => {
  59. this.emit(Server.DESCRIPTION_RECEIVED, new server_description_1.ServerDescription(this.description.hostAddress, event.reply, {
  60. roundTripTime: calculateRoundTripTime(this.description.roundTripTime, event.duration)
  61. }));
  62. if (this.s.state === common_1.STATE_CONNECTING) {
  63. stateTransition(this, common_1.STATE_CONNECTED);
  64. this.emit(Server.CONNECT, this);
  65. }
  66. });
  67. }
  68. get clusterTime() {
  69. return this.s.topology.clusterTime;
  70. }
  71. set clusterTime(clusterTime) {
  72. this.s.topology.clusterTime = clusterTime;
  73. }
  74. get description() {
  75. return this.s.description;
  76. }
  77. get name() {
  78. return this.s.description.address;
  79. }
  80. get autoEncrypter() {
  81. if (this.s.options && this.s.options.autoEncrypter) {
  82. return this.s.options.autoEncrypter;
  83. }
  84. }
  85. get loadBalanced() {
  86. return this.s.topology.description.type === common_1.TopologyType.LoadBalanced;
  87. }
  88. /**
  89. * Initiate server connect
  90. */
  91. connect() {
  92. if (this.s.state !== common_1.STATE_CLOSED) {
  93. return;
  94. }
  95. stateTransition(this, common_1.STATE_CONNECTING);
  96. // If in load balancer mode we automatically set the server to
  97. // a load balancer. It never transitions out of this state and
  98. // has no monitor.
  99. if (!this.loadBalanced) {
  100. this[kMonitor].connect();
  101. }
  102. else {
  103. stateTransition(this, common_1.STATE_CONNECTED);
  104. this.emit(Server.CONNECT, this);
  105. }
  106. }
  107. /** Destroy the server connection */
  108. destroy(options, callback) {
  109. if (typeof options === 'function')
  110. (callback = options), (options = {});
  111. options = Object.assign({}, { force: false }, options);
  112. if (this.s.state === common_1.STATE_CLOSED) {
  113. if (typeof callback === 'function') {
  114. callback();
  115. }
  116. return;
  117. }
  118. stateTransition(this, common_1.STATE_CLOSING);
  119. if (!this.loadBalanced) {
  120. this[kMonitor].close();
  121. }
  122. this.s.pool.close(options, err => {
  123. stateTransition(this, common_1.STATE_CLOSED);
  124. this.emit('closed');
  125. if (typeof callback === 'function') {
  126. callback(err);
  127. }
  128. });
  129. }
  130. /**
  131. * Immediately schedule monitoring of this server. If there already an attempt being made
  132. * this will be a no-op.
  133. */
  134. requestCheck() {
  135. if (!this.loadBalanced) {
  136. this[kMonitor].requestCheck();
  137. }
  138. }
  139. command(ns, cmd, options, callback) {
  140. if (typeof options === 'function') {
  141. (callback = options), (options = {}), (options = options !== null && options !== void 0 ? options : {});
  142. }
  143. if (callback == null) {
  144. throw new error_1.MongoInvalidArgumentError('Callback must be provided');
  145. }
  146. if (ns.db == null || typeof ns === 'string') {
  147. throw new error_1.MongoInvalidArgumentError('Namespace must not be a string');
  148. }
  149. if (this.s.state === common_1.STATE_CLOSING || this.s.state === common_1.STATE_CLOSED) {
  150. callback(new error_1.MongoServerClosedError());
  151. return;
  152. }
  153. // Clone the options
  154. const finalOptions = Object.assign({}, options, { wireProtocolCommand: false });
  155. // There are cases where we need to flag the read preference not to get sent in
  156. // the command, such as pre-5.0 servers attempting to perform an aggregate write
  157. // with a non-primary read preference. In this case the effective read preference
  158. // (primary) is not the same as the provided and must be removed completely.
  159. if (finalOptions.omitReadPreference) {
  160. delete finalOptions.readPreference;
  161. }
  162. // error if collation not supported
  163. if ((0, utils_1.collationNotSupported)(this, cmd)) {
  164. callback(new error_1.MongoCompatibilityError(`Server ${this.name} does not support collation`));
  165. return;
  166. }
  167. const session = finalOptions.session;
  168. const conn = session === null || session === void 0 ? void 0 : session.pinnedConnection;
  169. // NOTE: This is a hack! We can't retrieve the connections used for executing an operation
  170. // (and prevent them from being checked back in) at the point of operation execution.
  171. // This should be considered as part of the work for NODE-2882
  172. if (this.loadBalanced && session && conn == null && isPinnableCommand(cmd, session)) {
  173. this.s.pool.checkOut((err, checkedOut) => {
  174. if (err || checkedOut == null) {
  175. if (callback)
  176. return callback(err);
  177. return;
  178. }
  179. session.pin(checkedOut);
  180. this.command(ns, cmd, finalOptions, callback);
  181. });
  182. return;
  183. }
  184. this.s.pool.withConnection(conn, (err, conn, cb) => {
  185. if (err || !conn) {
  186. markServerUnknown(this, err);
  187. return cb(err);
  188. }
  189. conn.command(ns, cmd, finalOptions, makeOperationHandler(this, conn, cmd, finalOptions, cb));
  190. }, callback);
  191. }
  192. /**
  193. * Execute a query against the server
  194. * @internal
  195. */
  196. query(ns, cmd, options, callback) {
  197. if (this.s.state === common_1.STATE_CLOSING || this.s.state === common_1.STATE_CLOSED) {
  198. callback(new error_1.MongoServerClosedError());
  199. return;
  200. }
  201. this.s.pool.withConnection(undefined, (err, conn, cb) => {
  202. if (err || !conn) {
  203. markServerUnknown(this, err);
  204. return cb(err);
  205. }
  206. conn.query(ns, cmd, options, makeOperationHandler(this, conn, cmd, options, cb));
  207. }, callback);
  208. }
  209. /**
  210. * Execute a `getMore` against the server
  211. * @internal
  212. */
  213. getMore(ns, cursorId, options, callback) {
  214. var _a;
  215. if (this.s.state === common_1.STATE_CLOSING || this.s.state === common_1.STATE_CLOSED) {
  216. callback(new error_1.MongoServerClosedError());
  217. return;
  218. }
  219. this.s.pool.withConnection((_a = options.session) === null || _a === void 0 ? void 0 : _a.pinnedConnection, (err, conn, cb) => {
  220. if (err || !conn) {
  221. markServerUnknown(this, err);
  222. return cb(err);
  223. }
  224. conn.getMore(ns, cursorId, options, makeOperationHandler(this, conn, {}, options, cb));
  225. }, callback);
  226. }
  227. /**
  228. * Execute a `killCursors` command against the server
  229. * @internal
  230. */
  231. killCursors(ns, cursorIds, options, callback) {
  232. var _a;
  233. if (this.s.state === common_1.STATE_CLOSING || this.s.state === common_1.STATE_CLOSED) {
  234. if (typeof callback === 'function') {
  235. callback(new error_1.MongoServerClosedError());
  236. }
  237. return;
  238. }
  239. this.s.pool.withConnection((_a = options.session) === null || _a === void 0 ? void 0 : _a.pinnedConnection, (err, conn, cb) => {
  240. if (err || !conn) {
  241. markServerUnknown(this, err);
  242. return cb(err);
  243. }
  244. conn.killCursors(ns, cursorIds, options, makeOperationHandler(this, conn, {}, undefined, cb));
  245. }, callback);
  246. }
  247. }
  248. exports.Server = Server;
  249. /** @event */
  250. Server.SERVER_HEARTBEAT_STARTED = constants_1.SERVER_HEARTBEAT_STARTED;
  251. /** @event */
  252. Server.SERVER_HEARTBEAT_SUCCEEDED = constants_1.SERVER_HEARTBEAT_SUCCEEDED;
  253. /** @event */
  254. Server.SERVER_HEARTBEAT_FAILED = constants_1.SERVER_HEARTBEAT_FAILED;
  255. /** @event */
  256. Server.CONNECT = constants_1.CONNECT;
  257. /** @event */
  258. Server.DESCRIPTION_RECEIVED = constants_1.DESCRIPTION_RECEIVED;
  259. /** @event */
  260. Server.CLOSED = constants_1.CLOSED;
  261. /** @event */
  262. Server.ENDED = constants_1.ENDED;
  263. function calculateRoundTripTime(oldRtt, duration) {
  264. if (oldRtt === -1) {
  265. return duration;
  266. }
  267. const alpha = 0.2;
  268. return alpha * duration + (1 - alpha) * oldRtt;
  269. }
  270. function markServerUnknown(server, error) {
  271. // Load balancer servers can never be marked unknown.
  272. if (server.loadBalanced) {
  273. return;
  274. }
  275. if (error instanceof error_1.MongoNetworkError && !(error instanceof error_1.MongoNetworkTimeoutError)) {
  276. server[kMonitor].reset();
  277. }
  278. server.emit(Server.DESCRIPTION_RECEIVED, new server_description_1.ServerDescription(server.description.hostAddress, undefined, {
  279. error,
  280. topologyVersion: error && error.topologyVersion ? error.topologyVersion : server.description.topologyVersion
  281. }));
  282. }
  283. function isPinnableCommand(cmd, session) {
  284. if (session) {
  285. return (session.inTransaction() ||
  286. 'aggregate' in cmd ||
  287. 'find' in cmd ||
  288. 'getMore' in cmd ||
  289. 'listCollections' in cmd ||
  290. 'listIndexes' in cmd);
  291. }
  292. return false;
  293. }
  294. function connectionIsStale(pool, connection) {
  295. if (connection.serviceId) {
  296. return (connection.generation !== pool.serviceGenerations.get(connection.serviceId.toHexString()));
  297. }
  298. return connection.generation !== pool.generation;
  299. }
  300. function shouldHandleStateChangeError(server, err) {
  301. const etv = err.topologyVersion;
  302. const stv = server.description.topologyVersion;
  303. return (0, server_description_1.compareTopologyVersion)(stv, etv) < 0;
  304. }
  305. function inActiveTransaction(session, cmd) {
  306. return session && session.inTransaction() && !(0, transactions_1.isTransactionCommand)(cmd);
  307. }
  308. /** this checks the retryWrites option passed down from the client options, it
  309. * does not check if the server supports retryable writes */
  310. function isRetryableWritesEnabled(topology) {
  311. return topology.s.options.retryWrites !== false;
  312. }
  313. function makeOperationHandler(server, connection, cmd, options, callback) {
  314. const session = options === null || options === void 0 ? void 0 : options.session;
  315. return function handleOperationResult(err, result) {
  316. if (err && !connectionIsStale(server.s.pool, connection)) {
  317. if (err instanceof error_1.MongoNetworkError) {
  318. if (session && !session.hasEnded && session.serverSession) {
  319. session.serverSession.isDirty = true;
  320. }
  321. // inActiveTransaction check handles commit and abort.
  322. if (inActiveTransaction(session, cmd) && !err.hasErrorLabel('TransientTransactionError')) {
  323. err.addErrorLabel('TransientTransactionError');
  324. }
  325. if ((isRetryableWritesEnabled(server.s.topology) || (0, transactions_1.isTransactionCommand)(cmd)) &&
  326. (0, utils_1.supportsRetryableWrites)(server) &&
  327. !inActiveTransaction(session, cmd)) {
  328. err.addErrorLabel('RetryableWriteError');
  329. }
  330. if (!(err instanceof error_1.MongoNetworkTimeoutError) || (0, error_1.isNetworkErrorBeforeHandshake)(err)) {
  331. // In load balanced mode we never mark the server as unknown and always
  332. // clear for the specific service id.
  333. server.s.pool.clear(connection.serviceId);
  334. if (!server.loadBalanced) {
  335. markServerUnknown(server, err);
  336. }
  337. }
  338. }
  339. else {
  340. // if pre-4.4 server, then add error label if its a retryable write error
  341. if ((isRetryableWritesEnabled(server.s.topology) || (0, transactions_1.isTransactionCommand)(cmd)) &&
  342. (0, utils_1.maxWireVersion)(server) < 9 &&
  343. (0, error_1.isRetryableWriteError)(err) &&
  344. !inActiveTransaction(session, cmd)) {
  345. err.addErrorLabel('RetryableWriteError');
  346. }
  347. if ((0, error_1.isSDAMUnrecoverableError)(err)) {
  348. if (shouldHandleStateChangeError(server, err)) {
  349. if ((0, utils_1.maxWireVersion)(server) <= 7 || (0, error_1.isNodeShuttingDownError)(err)) {
  350. server.s.pool.clear(connection.serviceId);
  351. }
  352. if (!server.loadBalanced) {
  353. markServerUnknown(server, err);
  354. process.nextTick(() => server.requestCheck());
  355. }
  356. }
  357. }
  358. }
  359. if (session && session.isPinned && err.hasErrorLabel('TransientTransactionError')) {
  360. session.unpin({ force: true });
  361. }
  362. }
  363. callback(err, result);
  364. };
  365. }
  366. //# sourceMappingURL=server.js.map