topology.js 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.ServerCapabilities = exports.Topology = void 0;
  4. const Denque = require("denque");
  5. const bson_1 = require("../bson");
  6. const connection_string_1 = require("../connection_string");
  7. const constants_1 = require("../constants");
  8. const error_1 = require("../error");
  9. const mongo_types_1 = require("../mongo_types");
  10. const read_preference_1 = require("../read_preference");
  11. const sessions_1 = require("../sessions");
  12. const utils_1 = require("../utils");
  13. const common_1 = require("./common");
  14. const events_1 = require("./events");
  15. const server_1 = require("./server");
  16. const server_description_1 = require("./server_description");
  17. const server_selection_1 = require("./server_selection");
  18. const srv_polling_1 = require("./srv_polling");
  19. const topology_description_1 = require("./topology_description");
  20. // Global state
  21. let globalTopologyCounter = 0;
  22. const stateTransition = (0, utils_1.makeStateMachine)({
  23. [common_1.STATE_CLOSED]: [common_1.STATE_CLOSED, common_1.STATE_CONNECTING],
  24. [common_1.STATE_CONNECTING]: [common_1.STATE_CONNECTING, common_1.STATE_CLOSING, common_1.STATE_CONNECTED, common_1.STATE_CLOSED],
  25. [common_1.STATE_CONNECTED]: [common_1.STATE_CONNECTED, common_1.STATE_CLOSING, common_1.STATE_CLOSED],
  26. [common_1.STATE_CLOSING]: [common_1.STATE_CLOSING, common_1.STATE_CLOSED]
  27. });
  28. /** @internal */
  29. const kCancelled = Symbol('cancelled');
  30. /** @internal */
  31. const kWaitQueue = Symbol('waitQueue');
  32. /**
  33. * A container of server instances representing a connection to a MongoDB topology.
  34. * @internal
  35. */
  36. class Topology extends mongo_types_1.TypedEventEmitter {
  37. /**
  38. * @param seedlist - a list of HostAddress instances to connect to
  39. */
  40. constructor(seeds, options) {
  41. var _a;
  42. super();
  43. // Legacy CSFLE support
  44. this.bson = Object.create(null);
  45. this.bson.serialize = bson_1.serialize;
  46. this.bson.deserialize = bson_1.deserialize;
  47. // Options should only be undefined in tests, MongoClient will always have defined options
  48. options = options !== null && options !== void 0 ? options : {
  49. hosts: [utils_1.HostAddress.fromString('localhost:27017')],
  50. retryReads: connection_string_1.DEFAULT_OPTIONS.get('retryReads'),
  51. retryWrites: connection_string_1.DEFAULT_OPTIONS.get('retryWrites'),
  52. serverSelectionTimeoutMS: connection_string_1.DEFAULT_OPTIONS.get('serverSelectionTimeoutMS'),
  53. directConnection: connection_string_1.DEFAULT_OPTIONS.get('directConnection'),
  54. loadBalanced: connection_string_1.DEFAULT_OPTIONS.get('loadBalanced'),
  55. metadata: connection_string_1.DEFAULT_OPTIONS.get('metadata'),
  56. monitorCommands: connection_string_1.DEFAULT_OPTIONS.get('monitorCommands'),
  57. tls: connection_string_1.DEFAULT_OPTIONS.get('tls'),
  58. maxPoolSize: connection_string_1.DEFAULT_OPTIONS.get('maxPoolSize'),
  59. minPoolSize: connection_string_1.DEFAULT_OPTIONS.get('minPoolSize'),
  60. waitQueueTimeoutMS: connection_string_1.DEFAULT_OPTIONS.get('waitQueueTimeoutMS'),
  61. connectionType: connection_string_1.DEFAULT_OPTIONS.get('connectionType'),
  62. connectTimeoutMS: connection_string_1.DEFAULT_OPTIONS.get('connectTimeoutMS'),
  63. maxIdleTimeMS: connection_string_1.DEFAULT_OPTIONS.get('maxIdleTimeMS'),
  64. heartbeatFrequencyMS: connection_string_1.DEFAULT_OPTIONS.get('heartbeatFrequencyMS'),
  65. minHeartbeatFrequencyMS: connection_string_1.DEFAULT_OPTIONS.get('minHeartbeatFrequencyMS')
  66. };
  67. if (typeof seeds === 'string') {
  68. seeds = [utils_1.HostAddress.fromString(seeds)];
  69. }
  70. else if (!Array.isArray(seeds)) {
  71. seeds = [seeds];
  72. }
  73. const seedlist = [];
  74. for (const seed of seeds) {
  75. if (typeof seed === 'string') {
  76. seedlist.push(utils_1.HostAddress.fromString(seed));
  77. }
  78. else if (seed instanceof utils_1.HostAddress) {
  79. seedlist.push(seed);
  80. }
  81. else {
  82. // FIXME(NODE-3483): May need to be a MongoParseError
  83. throw new error_1.MongoRuntimeError(`Topology cannot be constructed from ${JSON.stringify(seed)}`);
  84. }
  85. }
  86. const topologyType = topologyTypeFromOptions(options);
  87. const topologyId = globalTopologyCounter++;
  88. const selectedHosts = options.srvMaxHosts == null ||
  89. options.srvMaxHosts === 0 ||
  90. options.srvMaxHosts >= seedlist.length
  91. ? seedlist
  92. : (0, utils_1.shuffle)(seedlist, options.srvMaxHosts);
  93. const serverDescriptions = new Map();
  94. for (const hostAddress of selectedHosts) {
  95. serverDescriptions.set(hostAddress.toString(), new server_description_1.ServerDescription(hostAddress));
  96. }
  97. this[kWaitQueue] = new Denque();
  98. this.s = {
  99. // the id of this topology
  100. id: topologyId,
  101. // passed in options
  102. options,
  103. // initial seedlist of servers to connect to
  104. seedlist,
  105. // initial state
  106. state: common_1.STATE_CLOSED,
  107. // the topology description
  108. description: new topology_description_1.TopologyDescription(topologyType, serverDescriptions, options.replicaSet, undefined, undefined, undefined, options),
  109. serverSelectionTimeoutMS: options.serverSelectionTimeoutMS,
  110. heartbeatFrequencyMS: options.heartbeatFrequencyMS,
  111. minHeartbeatFrequencyMS: options.minHeartbeatFrequencyMS,
  112. // a map of server instances to normalized addresses
  113. servers: new Map(),
  114. // Server Session Pool
  115. sessionPool: new sessions_1.ServerSessionPool(this),
  116. // Active client sessions
  117. sessions: new Set(),
  118. credentials: options === null || options === void 0 ? void 0 : options.credentials,
  119. clusterTime: undefined,
  120. // timer management
  121. connectionTimers: new Set(),
  122. detectShardedTopology: ev => this.detectShardedTopology(ev),
  123. detectSrvRecords: ev => this.detectSrvRecords(ev)
  124. };
  125. if (options.srvHost && !options.loadBalanced) {
  126. this.s.srvPoller =
  127. (_a = options.srvPoller) !== null && _a !== void 0 ? _a : new srv_polling_1.SrvPoller({
  128. heartbeatFrequencyMS: this.s.heartbeatFrequencyMS,
  129. srvHost: options.srvHost,
  130. srvMaxHosts: options.srvMaxHosts,
  131. srvServiceName: options.srvServiceName
  132. });
  133. this.on(Topology.TOPOLOGY_DESCRIPTION_CHANGED, this.s.detectShardedTopology);
  134. }
  135. }
  136. detectShardedTopology(event) {
  137. var _a, _b, _c;
  138. const previousType = event.previousDescription.type;
  139. const newType = event.newDescription.type;
  140. const transitionToSharded = previousType !== common_1.TopologyType.Sharded && newType === common_1.TopologyType.Sharded;
  141. const srvListeners = (_a = this.s.srvPoller) === null || _a === void 0 ? void 0 : _a.listeners(srv_polling_1.SrvPoller.SRV_RECORD_DISCOVERY);
  142. const listeningToSrvPolling = !!(srvListeners === null || srvListeners === void 0 ? void 0 : srvListeners.includes(this.s.detectSrvRecords));
  143. if (transitionToSharded && !listeningToSrvPolling) {
  144. (_b = this.s.srvPoller) === null || _b === void 0 ? void 0 : _b.on(srv_polling_1.SrvPoller.SRV_RECORD_DISCOVERY, this.s.detectSrvRecords);
  145. (_c = this.s.srvPoller) === null || _c === void 0 ? void 0 : _c.start();
  146. }
  147. }
  148. detectSrvRecords(ev) {
  149. const previousTopologyDescription = this.s.description;
  150. this.s.description = this.s.description.updateFromSrvPollingEvent(ev, this.s.options.srvMaxHosts);
  151. if (this.s.description === previousTopologyDescription) {
  152. // Nothing changed, so return
  153. return;
  154. }
  155. updateServers(this);
  156. this.emit(Topology.TOPOLOGY_DESCRIPTION_CHANGED, new events_1.TopologyDescriptionChangedEvent(this.s.id, previousTopologyDescription, this.s.description));
  157. }
  158. /**
  159. * @returns A `TopologyDescription` for this topology
  160. */
  161. get description() {
  162. return this.s.description;
  163. }
  164. get loadBalanced() {
  165. return this.s.options.loadBalanced;
  166. }
  167. get capabilities() {
  168. return new ServerCapabilities(this.lastHello());
  169. }
  170. /** Initiate server connect */
  171. connect(options, callback) {
  172. var _a;
  173. if (typeof options === 'function')
  174. (callback = options), (options = {});
  175. options = options !== null && options !== void 0 ? options : {};
  176. if (this.s.state === common_1.STATE_CONNECTED) {
  177. if (typeof callback === 'function') {
  178. callback();
  179. }
  180. return;
  181. }
  182. stateTransition(this, common_1.STATE_CONNECTING);
  183. // emit SDAM monitoring events
  184. this.emit(Topology.TOPOLOGY_OPENING, new events_1.TopologyOpeningEvent(this.s.id));
  185. // emit an event for the topology change
  186. this.emit(Topology.TOPOLOGY_DESCRIPTION_CHANGED, new events_1.TopologyDescriptionChangedEvent(this.s.id, new topology_description_1.TopologyDescription(common_1.TopologyType.Unknown), // initial is always Unknown
  187. this.s.description));
  188. // connect all known servers, then attempt server selection to connect
  189. const serverDescriptions = Array.from(this.s.description.servers.values());
  190. connectServers(this, serverDescriptions);
  191. // In load balancer mode we need to fake a server description getting
  192. // emitted from the monitor, since the monitor doesn't exist.
  193. if (this.s.options.loadBalanced) {
  194. for (const description of serverDescriptions) {
  195. const newDescription = new server_description_1.ServerDescription(description.hostAddress, undefined, {
  196. loadBalanced: this.s.options.loadBalanced
  197. });
  198. this.serverUpdateHandler(newDescription);
  199. }
  200. }
  201. const readPreference = (_a = options.readPreference) !== null && _a !== void 0 ? _a : read_preference_1.ReadPreference.primary;
  202. this.selectServer((0, server_selection_1.readPreferenceServerSelector)(readPreference), options, (err, server) => {
  203. if (err) {
  204. this.close();
  205. typeof callback === 'function' ? callback(err) : this.emit(Topology.ERROR, err);
  206. return;
  207. }
  208. // TODO: NODE-2471
  209. if (server && this.s.credentials) {
  210. server.command((0, utils_1.ns)('admin.$cmd'), { ping: 1 }, err => {
  211. if (err) {
  212. typeof callback === 'function' ? callback(err) : this.emit(Topology.ERROR, err);
  213. return;
  214. }
  215. stateTransition(this, common_1.STATE_CONNECTED);
  216. this.emit(Topology.OPEN, this);
  217. this.emit(Topology.CONNECT, this);
  218. if (typeof callback === 'function')
  219. callback(undefined, this);
  220. });
  221. return;
  222. }
  223. stateTransition(this, common_1.STATE_CONNECTED);
  224. this.emit(Topology.OPEN, this);
  225. this.emit(Topology.CONNECT, this);
  226. if (typeof callback === 'function')
  227. callback(undefined, this);
  228. });
  229. }
  230. /** Close this topology */
  231. close(options, callback) {
  232. if (typeof options === 'function') {
  233. callback = options;
  234. options = {};
  235. }
  236. if (typeof options === 'boolean') {
  237. options = { force: options };
  238. }
  239. options = options !== null && options !== void 0 ? options : {};
  240. if (this.s.state === common_1.STATE_CLOSED || this.s.state === common_1.STATE_CLOSING) {
  241. if (typeof callback === 'function') {
  242. callback();
  243. }
  244. return;
  245. }
  246. stateTransition(this, common_1.STATE_CLOSING);
  247. drainWaitQueue(this[kWaitQueue], new error_1.MongoTopologyClosedError());
  248. (0, common_1.drainTimerQueue)(this.s.connectionTimers);
  249. if (this.s.srvPoller) {
  250. this.s.srvPoller.stop();
  251. this.s.srvPoller.removeListener(srv_polling_1.SrvPoller.SRV_RECORD_DISCOVERY, this.s.detectSrvRecords);
  252. }
  253. this.removeListener(Topology.TOPOLOGY_DESCRIPTION_CHANGED, this.s.detectShardedTopology);
  254. (0, utils_1.eachAsync)(Array.from(this.s.sessions.values()), (session, cb) => session.endSession(cb), () => {
  255. this.s.sessionPool.endAllPooledSessions(() => {
  256. (0, utils_1.eachAsync)(Array.from(this.s.servers.values()), (server, cb) => destroyServer(server, this, options, cb), err => {
  257. this.s.servers.clear();
  258. // emit an event for close
  259. this.emit(Topology.TOPOLOGY_CLOSED, new events_1.TopologyClosedEvent(this.s.id));
  260. stateTransition(this, common_1.STATE_CLOSED);
  261. if (typeof callback === 'function') {
  262. callback(err);
  263. }
  264. });
  265. });
  266. });
  267. }
  268. selectServer(selector, _options, _callback) {
  269. let options = _options;
  270. const callback = (_callback !== null && _callback !== void 0 ? _callback : _options);
  271. if (typeof options === 'function') {
  272. options = {};
  273. }
  274. let serverSelector;
  275. if (typeof selector !== 'function') {
  276. if (typeof selector === 'string') {
  277. serverSelector = (0, server_selection_1.readPreferenceServerSelector)(read_preference_1.ReadPreference.fromString(selector));
  278. }
  279. else {
  280. let readPreference;
  281. if (selector instanceof read_preference_1.ReadPreference) {
  282. readPreference = selector;
  283. }
  284. else {
  285. read_preference_1.ReadPreference.translate(options);
  286. readPreference = options.readPreference || read_preference_1.ReadPreference.primary;
  287. }
  288. serverSelector = (0, server_selection_1.readPreferenceServerSelector)(readPreference);
  289. }
  290. }
  291. else {
  292. serverSelector = selector;
  293. }
  294. options = Object.assign({}, { serverSelectionTimeoutMS: this.s.serverSelectionTimeoutMS }, options);
  295. const isSharded = this.description.type === common_1.TopologyType.Sharded;
  296. const session = options.session;
  297. const transaction = session && session.transaction;
  298. if (isSharded && transaction && transaction.server) {
  299. callback(undefined, transaction.server);
  300. return;
  301. }
  302. const waitQueueMember = {
  303. serverSelector,
  304. transaction,
  305. callback
  306. };
  307. const serverSelectionTimeoutMS = options.serverSelectionTimeoutMS;
  308. if (serverSelectionTimeoutMS) {
  309. waitQueueMember.timer = setTimeout(() => {
  310. waitQueueMember[kCancelled] = true;
  311. waitQueueMember.timer = undefined;
  312. const timeoutError = new error_1.MongoServerSelectionError(`Server selection timed out after ${serverSelectionTimeoutMS} ms`, this.description);
  313. waitQueueMember.callback(timeoutError);
  314. }, serverSelectionTimeoutMS);
  315. }
  316. this[kWaitQueue].push(waitQueueMember);
  317. processWaitQueue(this);
  318. }
  319. // Sessions related methods
  320. /**
  321. * @returns Whether the topology should initiate selection to determine session support
  322. */
  323. shouldCheckForSessionSupport() {
  324. if (this.description.type === common_1.TopologyType.Single) {
  325. return !this.description.hasKnownServers;
  326. }
  327. return !this.description.hasDataBearingServers;
  328. }
  329. /**
  330. * @returns Whether sessions are supported on the current topology
  331. */
  332. hasSessionSupport() {
  333. return this.loadBalanced || this.description.logicalSessionTimeoutMinutes != null;
  334. }
  335. /** Start a logical session */
  336. startSession(options, clientOptions) {
  337. const session = new sessions_1.ClientSession(this, this.s.sessionPool, options, clientOptions);
  338. session.once('ended', () => {
  339. this.s.sessions.delete(session);
  340. });
  341. this.s.sessions.add(session);
  342. return session;
  343. }
  344. /** Send endSessions command(s) with the given session ids */
  345. endSessions(sessions, callback) {
  346. if (!Array.isArray(sessions)) {
  347. sessions = [sessions];
  348. }
  349. this.selectServer((0, server_selection_1.readPreferenceServerSelector)(read_preference_1.ReadPreference.primaryPreferred), (err, server) => {
  350. if (err || !server) {
  351. if (typeof callback === 'function')
  352. callback(err);
  353. return;
  354. }
  355. server.command((0, utils_1.ns)('admin.$cmd'), { endSessions: sessions }, { noResponse: true }, (err, result) => {
  356. if (typeof callback === 'function')
  357. callback(err, result);
  358. });
  359. });
  360. }
  361. /**
  362. * Update the internal TopologyDescription with a ServerDescription
  363. *
  364. * @param serverDescription - The server to update in the internal list of server descriptions
  365. */
  366. serverUpdateHandler(serverDescription) {
  367. if (!this.s.description.hasServer(serverDescription.address)) {
  368. return;
  369. }
  370. // ignore this server update if its from an outdated topologyVersion
  371. if (isStaleServerDescription(this.s.description, serverDescription)) {
  372. return;
  373. }
  374. // these will be used for monitoring events later
  375. const previousTopologyDescription = this.s.description;
  376. const previousServerDescription = this.s.description.servers.get(serverDescription.address);
  377. if (!previousServerDescription) {
  378. return;
  379. }
  380. // Driver Sessions Spec: "Whenever a driver receives a cluster time from
  381. // a server it MUST compare it to the current highest seen cluster time
  382. // for the deployment. If the new cluster time is higher than the
  383. // highest seen cluster time it MUST become the new highest seen cluster
  384. // time. Two cluster times are compared using only the BsonTimestamp
  385. // value of the clusterTime embedded field."
  386. const clusterTime = serverDescription.$clusterTime;
  387. if (clusterTime) {
  388. (0, common_1._advanceClusterTime)(this, clusterTime);
  389. }
  390. // If we already know all the information contained in this updated description, then
  391. // we don't need to emit SDAM events, but still need to update the description, in order
  392. // to keep client-tracked attributes like last update time and round trip time up to date
  393. const equalDescriptions = previousServerDescription && previousServerDescription.equals(serverDescription);
  394. // first update the TopologyDescription
  395. this.s.description = this.s.description.update(serverDescription);
  396. if (this.s.description.compatibilityError) {
  397. this.emit(Topology.ERROR, new error_1.MongoCompatibilityError(this.s.description.compatibilityError));
  398. return;
  399. }
  400. // emit monitoring events for this change
  401. if (!equalDescriptions) {
  402. const newDescription = this.s.description.servers.get(serverDescription.address);
  403. if (newDescription) {
  404. this.emit(Topology.SERVER_DESCRIPTION_CHANGED, new events_1.ServerDescriptionChangedEvent(this.s.id, serverDescription.address, previousServerDescription, newDescription));
  405. }
  406. }
  407. // update server list from updated descriptions
  408. updateServers(this, serverDescription);
  409. // attempt to resolve any outstanding server selection attempts
  410. if (this[kWaitQueue].length > 0) {
  411. processWaitQueue(this);
  412. }
  413. if (!equalDescriptions) {
  414. this.emit(Topology.TOPOLOGY_DESCRIPTION_CHANGED, new events_1.TopologyDescriptionChangedEvent(this.s.id, previousTopologyDescription, this.s.description));
  415. }
  416. }
  417. auth(credentials, callback) {
  418. if (typeof credentials === 'function')
  419. (callback = credentials), (credentials = undefined);
  420. if (typeof callback === 'function')
  421. callback(undefined, true);
  422. }
  423. get clientMetadata() {
  424. return this.s.options.metadata;
  425. }
  426. isConnected() {
  427. return this.s.state === common_1.STATE_CONNECTED;
  428. }
  429. isDestroyed() {
  430. return this.s.state === common_1.STATE_CLOSED;
  431. }
  432. /**
  433. * @deprecated This function is deprecated and will be removed in the next major version.
  434. */
  435. unref() {
  436. (0, utils_1.emitWarning)('`unref` is a noop and will be removed in the next major version');
  437. }
  438. // NOTE: There are many places in code where we explicitly check the last hello
  439. // to do feature support detection. This should be done any other way, but for
  440. // now we will just return the first hello seen, which should suffice.
  441. lastHello() {
  442. const serverDescriptions = Array.from(this.description.servers.values());
  443. if (serverDescriptions.length === 0)
  444. return {};
  445. const sd = serverDescriptions.filter((sd) => sd.type !== common_1.ServerType.Unknown)[0];
  446. const result = sd || { maxWireVersion: this.description.commonWireVersion };
  447. return result;
  448. }
  449. get commonWireVersion() {
  450. return this.description.commonWireVersion;
  451. }
  452. get logicalSessionTimeoutMinutes() {
  453. return this.description.logicalSessionTimeoutMinutes;
  454. }
  455. get clusterTime() {
  456. return this.s.clusterTime;
  457. }
  458. set clusterTime(clusterTime) {
  459. this.s.clusterTime = clusterTime;
  460. }
  461. }
  462. exports.Topology = Topology;
  463. /** @event */
  464. Topology.SERVER_OPENING = constants_1.SERVER_OPENING;
  465. /** @event */
  466. Topology.SERVER_CLOSED = constants_1.SERVER_CLOSED;
  467. /** @event */
  468. Topology.SERVER_DESCRIPTION_CHANGED = constants_1.SERVER_DESCRIPTION_CHANGED;
  469. /** @event */
  470. Topology.TOPOLOGY_OPENING = constants_1.TOPOLOGY_OPENING;
  471. /** @event */
  472. Topology.TOPOLOGY_CLOSED = constants_1.TOPOLOGY_CLOSED;
  473. /** @event */
  474. Topology.TOPOLOGY_DESCRIPTION_CHANGED = constants_1.TOPOLOGY_DESCRIPTION_CHANGED;
  475. /** @event */
  476. Topology.ERROR = constants_1.ERROR;
  477. /** @event */
  478. Topology.OPEN = constants_1.OPEN;
  479. /** @event */
  480. Topology.CONNECT = constants_1.CONNECT;
  481. /** @event */
  482. Topology.CLOSE = constants_1.CLOSE;
  483. /** @event */
  484. Topology.TIMEOUT = constants_1.TIMEOUT;
  485. /** Destroys a server, and removes all event listeners from the instance */
  486. function destroyServer(server, topology, options, callback) {
  487. options = options !== null && options !== void 0 ? options : {};
  488. for (const event of constants_1.LOCAL_SERVER_EVENTS) {
  489. server.removeAllListeners(event);
  490. }
  491. server.destroy(options, () => {
  492. topology.emit(Topology.SERVER_CLOSED, new events_1.ServerClosedEvent(topology.s.id, server.description.address));
  493. for (const event of constants_1.SERVER_RELAY_EVENTS) {
  494. server.removeAllListeners(event);
  495. }
  496. if (typeof callback === 'function') {
  497. callback();
  498. }
  499. });
  500. }
  501. /** Predicts the TopologyType from options */
  502. function topologyTypeFromOptions(options) {
  503. if (options === null || options === void 0 ? void 0 : options.directConnection) {
  504. return common_1.TopologyType.Single;
  505. }
  506. if (options === null || options === void 0 ? void 0 : options.replicaSet) {
  507. return common_1.TopologyType.ReplicaSetNoPrimary;
  508. }
  509. if (options === null || options === void 0 ? void 0 : options.loadBalanced) {
  510. return common_1.TopologyType.LoadBalanced;
  511. }
  512. return common_1.TopologyType.Unknown;
  513. }
  514. function randomSelection(array) {
  515. return array[Math.floor(Math.random() * array.length)];
  516. }
  517. /**
  518. * Creates new server instances and attempts to connect them
  519. *
  520. * @param topology - The topology that this server belongs to
  521. * @param serverDescription - The description for the server to initialize and connect to
  522. * @param connectDelay - Time to wait before attempting initial connection
  523. */
  524. function createAndConnectServer(topology, serverDescription, connectDelay) {
  525. topology.emit(Topology.SERVER_OPENING, new events_1.ServerOpeningEvent(topology.s.id, serverDescription.address));
  526. const server = new server_1.Server(topology, serverDescription, topology.s.options);
  527. for (const event of constants_1.SERVER_RELAY_EVENTS) {
  528. server.on(event, (e) => topology.emit(event, e));
  529. }
  530. server.on(server_1.Server.DESCRIPTION_RECEIVED, description => topology.serverUpdateHandler(description));
  531. if (connectDelay) {
  532. const connectTimer = setTimeout(() => {
  533. (0, common_1.clearAndRemoveTimerFrom)(connectTimer, topology.s.connectionTimers);
  534. server.connect();
  535. }, connectDelay);
  536. topology.s.connectionTimers.add(connectTimer);
  537. return server;
  538. }
  539. server.connect();
  540. return server;
  541. }
  542. /**
  543. * Create `Server` instances for all initially known servers, connect them, and assign
  544. * them to the passed in `Topology`.
  545. *
  546. * @param topology - The topology responsible for the servers
  547. * @param serverDescriptions - A list of server descriptions to connect
  548. */
  549. function connectServers(topology, serverDescriptions) {
  550. topology.s.servers = serverDescriptions.reduce((servers, serverDescription) => {
  551. const server = createAndConnectServer(topology, serverDescription);
  552. servers.set(serverDescription.address, server);
  553. return servers;
  554. }, new Map());
  555. }
  556. /**
  557. * @param topology - Topology to update.
  558. * @param incomingServerDescription - New server description.
  559. */
  560. function updateServers(topology, incomingServerDescription) {
  561. // update the internal server's description
  562. if (incomingServerDescription && topology.s.servers.has(incomingServerDescription.address)) {
  563. const server = topology.s.servers.get(incomingServerDescription.address);
  564. if (server) {
  565. server.s.description = incomingServerDescription;
  566. }
  567. }
  568. // add new servers for all descriptions we currently don't know about locally
  569. for (const serverDescription of topology.description.servers.values()) {
  570. if (!topology.s.servers.has(serverDescription.address)) {
  571. const server = createAndConnectServer(topology, serverDescription);
  572. topology.s.servers.set(serverDescription.address, server);
  573. }
  574. }
  575. // for all servers no longer known, remove their descriptions and destroy their instances
  576. for (const entry of topology.s.servers) {
  577. const serverAddress = entry[0];
  578. if (topology.description.hasServer(serverAddress)) {
  579. continue;
  580. }
  581. if (!topology.s.servers.has(serverAddress)) {
  582. continue;
  583. }
  584. const server = topology.s.servers.get(serverAddress);
  585. topology.s.servers.delete(serverAddress);
  586. // prepare server for garbage collection
  587. if (server) {
  588. destroyServer(server, topology);
  589. }
  590. }
  591. }
  592. function drainWaitQueue(queue, err) {
  593. while (queue.length) {
  594. const waitQueueMember = queue.shift();
  595. if (!waitQueueMember) {
  596. continue;
  597. }
  598. if (waitQueueMember.timer) {
  599. clearTimeout(waitQueueMember.timer);
  600. }
  601. if (!waitQueueMember[kCancelled]) {
  602. waitQueueMember.callback(err);
  603. }
  604. }
  605. }
  606. function processWaitQueue(topology) {
  607. if (topology.s.state === common_1.STATE_CLOSED) {
  608. drainWaitQueue(topology[kWaitQueue], new error_1.MongoTopologyClosedError());
  609. return;
  610. }
  611. const isSharded = topology.description.type === common_1.TopologyType.Sharded;
  612. const serverDescriptions = Array.from(topology.description.servers.values());
  613. const membersToProcess = topology[kWaitQueue].length;
  614. for (let i = 0; i < membersToProcess; ++i) {
  615. const waitQueueMember = topology[kWaitQueue].shift();
  616. if (!waitQueueMember) {
  617. continue;
  618. }
  619. if (waitQueueMember[kCancelled]) {
  620. continue;
  621. }
  622. let selectedDescriptions;
  623. try {
  624. const serverSelector = waitQueueMember.serverSelector;
  625. selectedDescriptions = serverSelector
  626. ? serverSelector(topology.description, serverDescriptions)
  627. : serverDescriptions;
  628. }
  629. catch (e) {
  630. if (waitQueueMember.timer) {
  631. clearTimeout(waitQueueMember.timer);
  632. }
  633. waitQueueMember.callback(e);
  634. continue;
  635. }
  636. if (selectedDescriptions.length === 0) {
  637. topology[kWaitQueue].push(waitQueueMember);
  638. continue;
  639. }
  640. const selectedServerDescription = randomSelection(selectedDescriptions);
  641. const selectedServer = topology.s.servers.get(selectedServerDescription.address);
  642. const transaction = waitQueueMember.transaction;
  643. if (isSharded && transaction && transaction.isActive && selectedServer) {
  644. transaction.pinServer(selectedServer);
  645. }
  646. if (waitQueueMember.timer) {
  647. clearTimeout(waitQueueMember.timer);
  648. }
  649. waitQueueMember.callback(undefined, selectedServer);
  650. }
  651. if (topology[kWaitQueue].length > 0) {
  652. // ensure all server monitors attempt monitoring soon
  653. for (const [, server] of topology.s.servers) {
  654. process.nextTick(function scheduleServerCheck() {
  655. return server.requestCheck();
  656. });
  657. }
  658. }
  659. }
  660. function isStaleServerDescription(topologyDescription, incomingServerDescription) {
  661. const currentServerDescription = topologyDescription.servers.get(incomingServerDescription.address);
  662. const currentTopologyVersion = currentServerDescription === null || currentServerDescription === void 0 ? void 0 : currentServerDescription.topologyVersion;
  663. return ((0, server_description_1.compareTopologyVersion)(currentTopologyVersion, incomingServerDescription.topologyVersion) > 0);
  664. }
  665. /** @public */
  666. class ServerCapabilities {
  667. constructor(hello) {
  668. this.minWireVersion = hello.minWireVersion || 0;
  669. this.maxWireVersion = hello.maxWireVersion || 0;
  670. }
  671. get hasAggregationCursor() {
  672. return this.maxWireVersion >= 1;
  673. }
  674. get hasWriteCommands() {
  675. return this.maxWireVersion >= 2;
  676. }
  677. get hasTextSearch() {
  678. return this.minWireVersion >= 0;
  679. }
  680. get hasAuthCommands() {
  681. return this.maxWireVersion >= 1;
  682. }
  683. get hasListCollectionsCommand() {
  684. return this.maxWireVersion >= 3;
  685. }
  686. get hasListIndexesCommand() {
  687. return this.maxWireVersion >= 3;
  688. }
  689. get supportsSnapshotReads() {
  690. return this.maxWireVersion >= 13;
  691. }
  692. get commandsTakeWriteConcern() {
  693. return this.maxWireVersion >= 5;
  694. }
  695. get commandsTakeCollation() {
  696. return this.maxWireVersion >= 5;
  697. }
  698. }
  699. exports.ServerCapabilities = ServerCapabilities;
  700. //# sourceMappingURL=topology.js.map