connection_pool.js 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.ConnectionPool = void 0;
  4. const Denque = require("denque");
  5. const timers_1 = require("timers");
  6. const constants_1 = require("../constants");
  7. const error_1 = require("../error");
  8. const logger_1 = require("../logger");
  9. const mongo_types_1 = require("../mongo_types");
  10. const utils_1 = require("../utils");
  11. const connect_1 = require("./connect");
  12. const connection_1 = require("./connection");
  13. const connection_pool_events_1 = require("./connection_pool_events");
  14. const errors_1 = require("./errors");
  15. const metrics_1 = require("./metrics");
  16. /** @internal */
  17. const kLogger = Symbol('logger');
  18. /** @internal */
  19. const kConnections = Symbol('connections');
  20. /** @internal */
  21. const kPending = Symbol('pending');
  22. /** @internal */
  23. const kCheckedOut = Symbol('checkedOut');
  24. /** @internal */
  25. const kMinPoolSizeTimer = Symbol('minPoolSizeTimer');
  26. /** @internal */
  27. const kGeneration = Symbol('generation');
  28. /** @internal */
  29. const kServiceGenerations = Symbol('serviceGenerations');
  30. /** @internal */
  31. const kConnectionCounter = Symbol('connectionCounter');
  32. /** @internal */
  33. const kCancellationToken = Symbol('cancellationToken');
  34. /** @internal */
  35. const kWaitQueue = Symbol('waitQueue');
  36. /** @internal */
  37. const kCancelled = Symbol('cancelled');
  38. /** @internal */
  39. const kMetrics = Symbol('metrics');
  40. /** @internal */
  41. const kProcessingWaitQueue = Symbol('processingWaitQueue');
  42. /**
  43. * A pool of connections which dynamically resizes, and emit events related to pool activity
  44. * @internal
  45. */
  46. class ConnectionPool extends mongo_types_1.TypedEventEmitter {
  47. /** @internal */
  48. constructor(options) {
  49. var _a, _b, _c, _d, _e;
  50. super();
  51. this.closed = false;
  52. this.options = Object.freeze({
  53. ...options,
  54. connectionType: connection_1.Connection,
  55. maxPoolSize: (_a = options.maxPoolSize) !== null && _a !== void 0 ? _a : 100,
  56. minPoolSize: (_b = options.minPoolSize) !== null && _b !== void 0 ? _b : 0,
  57. maxConnecting: (_c = options.maxConnecting) !== null && _c !== void 0 ? _c : 2,
  58. maxIdleTimeMS: (_d = options.maxIdleTimeMS) !== null && _d !== void 0 ? _d : 0,
  59. waitQueueTimeoutMS: (_e = options.waitQueueTimeoutMS) !== null && _e !== void 0 ? _e : 0,
  60. autoEncrypter: options.autoEncrypter,
  61. metadata: options.metadata
  62. });
  63. if (this.options.minPoolSize > this.options.maxPoolSize) {
  64. throw new error_1.MongoInvalidArgumentError('Connection pool minimum size must not be greater than maximum pool size');
  65. }
  66. this[kLogger] = new logger_1.Logger('ConnectionPool');
  67. this[kConnections] = new Denque();
  68. this[kPending] = 0;
  69. this[kCheckedOut] = 0;
  70. this[kMinPoolSizeTimer] = undefined;
  71. this[kGeneration] = 0;
  72. this[kServiceGenerations] = new Map();
  73. this[kConnectionCounter] = (0, utils_1.makeCounter)(1);
  74. this[kCancellationToken] = new mongo_types_1.CancellationToken();
  75. this[kCancellationToken].setMaxListeners(Infinity);
  76. this[kWaitQueue] = new Denque();
  77. this[kMetrics] = new metrics_1.ConnectionPoolMetrics();
  78. this[kProcessingWaitQueue] = false;
  79. process.nextTick(() => {
  80. this.emit(ConnectionPool.CONNECTION_POOL_CREATED, new connection_pool_events_1.ConnectionPoolCreatedEvent(this));
  81. ensureMinPoolSize(this);
  82. });
  83. }
  84. /** The address of the endpoint the pool is connected to */
  85. get address() {
  86. return this.options.hostAddress.toString();
  87. }
  88. /** An integer representing the SDAM generation of the pool */
  89. get generation() {
  90. return this[kGeneration];
  91. }
  92. /** An integer expressing how many total connections (available + pending + in use) the pool currently has */
  93. get totalConnectionCount() {
  94. return (this.availableConnectionCount + this.pendingConnectionCount + this.currentCheckedOutCount);
  95. }
  96. /** An integer expressing how many connections are currently available in the pool. */
  97. get availableConnectionCount() {
  98. return this[kConnections].length;
  99. }
  100. get pendingConnectionCount() {
  101. return this[kPending];
  102. }
  103. get currentCheckedOutCount() {
  104. return this[kCheckedOut];
  105. }
  106. get waitQueueSize() {
  107. return this[kWaitQueue].length;
  108. }
  109. get loadBalanced() {
  110. return this.options.loadBalanced;
  111. }
  112. get serviceGenerations() {
  113. return this[kServiceGenerations];
  114. }
  115. /**
  116. * Get the metrics information for the pool when a wait queue timeout occurs.
  117. */
  118. waitQueueErrorMetrics() {
  119. return this[kMetrics].info(this.options.maxPoolSize);
  120. }
  121. /**
  122. * Check a connection out of this pool. The connection will continue to be tracked, but no reference to it
  123. * will be held by the pool. This means that if a connection is checked out it MUST be checked back in or
  124. * explicitly destroyed by the new owner.
  125. */
  126. checkOut(callback) {
  127. this.emit(ConnectionPool.CONNECTION_CHECK_OUT_STARTED, new connection_pool_events_1.ConnectionCheckOutStartedEvent(this));
  128. if (this.closed) {
  129. this.emit(ConnectionPool.CONNECTION_CHECK_OUT_FAILED, new connection_pool_events_1.ConnectionCheckOutFailedEvent(this, 'poolClosed'));
  130. callback(new errors_1.PoolClosedError(this));
  131. return;
  132. }
  133. const waitQueueMember = { callback };
  134. const waitQueueTimeoutMS = this.options.waitQueueTimeoutMS;
  135. if (waitQueueTimeoutMS) {
  136. waitQueueMember.timer = (0, timers_1.setTimeout)(() => {
  137. waitQueueMember[kCancelled] = true;
  138. waitQueueMember.timer = undefined;
  139. this.emit(ConnectionPool.CONNECTION_CHECK_OUT_FAILED, new connection_pool_events_1.ConnectionCheckOutFailedEvent(this, 'timeout'));
  140. waitQueueMember.callback(new errors_1.WaitQueueTimeoutError(this.loadBalanced
  141. ? this.waitQueueErrorMetrics()
  142. : 'Timed out while checking out a connection from connection pool', this.address));
  143. }, waitQueueTimeoutMS);
  144. }
  145. this[kWaitQueue].push(waitQueueMember);
  146. process.nextTick(processWaitQueue, this);
  147. }
  148. /**
  149. * Check a connection into the pool.
  150. *
  151. * @param connection - The connection to check in
  152. */
  153. checkIn(connection) {
  154. const poolClosed = this.closed;
  155. const stale = connectionIsStale(this, connection);
  156. const willDestroy = !!(poolClosed || stale || connection.closed);
  157. if (!willDestroy) {
  158. connection.markAvailable();
  159. this[kConnections].unshift(connection);
  160. }
  161. this[kCheckedOut]--;
  162. this.emit(ConnectionPool.CONNECTION_CHECKED_IN, new connection_pool_events_1.ConnectionCheckedInEvent(this, connection));
  163. if (willDestroy) {
  164. const reason = connection.closed ? 'error' : poolClosed ? 'poolClosed' : 'stale';
  165. destroyConnection(this, connection, reason);
  166. }
  167. process.nextTick(processWaitQueue, this);
  168. }
  169. /**
  170. * Clear the pool
  171. *
  172. * Pool reset is handled by incrementing the pool's generation count. Any existing connection of a
  173. * previous generation will eventually be pruned during subsequent checkouts.
  174. */
  175. clear(serviceId) {
  176. if (this.loadBalanced && serviceId) {
  177. const sid = serviceId.toHexString();
  178. const generation = this.serviceGenerations.get(sid);
  179. // Only need to worry if the generation exists, since it should
  180. // always be there but typescript needs the check.
  181. if (generation == null) {
  182. // TODO(NODE-3483)
  183. throw new error_1.MongoRuntimeError('Service generations are required in load balancer mode.');
  184. }
  185. else {
  186. // Increment the generation for the service id.
  187. this.serviceGenerations.set(sid, generation + 1);
  188. }
  189. }
  190. else {
  191. this[kGeneration] += 1;
  192. }
  193. this.emit('connectionPoolCleared', new connection_pool_events_1.ConnectionPoolClearedEvent(this, serviceId));
  194. }
  195. close(_options, _cb) {
  196. let options = _options;
  197. const callback = (_cb !== null && _cb !== void 0 ? _cb : _options);
  198. if (typeof options === 'function') {
  199. options = {};
  200. }
  201. options = Object.assign({ force: false }, options);
  202. if (this.closed) {
  203. return callback();
  204. }
  205. // immediately cancel any in-flight connections
  206. this[kCancellationToken].emit('cancel');
  207. // drain the wait queue
  208. while (this.waitQueueSize) {
  209. const waitQueueMember = this[kWaitQueue].pop();
  210. if (waitQueueMember) {
  211. if (waitQueueMember.timer) {
  212. clearTimeout(waitQueueMember.timer);
  213. }
  214. if (!waitQueueMember[kCancelled]) {
  215. // TODO(NODE-3483): Replace with MongoConnectionPoolClosedError
  216. waitQueueMember.callback(new error_1.MongoRuntimeError('Connection pool closed'));
  217. }
  218. }
  219. }
  220. // clear the min pool size timer
  221. const minPoolSizeTimer = this[kMinPoolSizeTimer];
  222. if (minPoolSizeTimer) {
  223. clearTimeout(minPoolSizeTimer);
  224. }
  225. // end the connection counter
  226. if (typeof this[kConnectionCounter].return === 'function') {
  227. this[kConnectionCounter].return(undefined);
  228. }
  229. // mark the pool as closed immediately
  230. this.closed = true;
  231. (0, utils_1.eachAsync)(this[kConnections].toArray(), (conn, cb) => {
  232. this.emit(ConnectionPool.CONNECTION_CLOSED, new connection_pool_events_1.ConnectionClosedEvent(this, conn, 'poolClosed'));
  233. conn.destroy(options, cb);
  234. }, err => {
  235. this[kConnections].clear();
  236. this.emit(ConnectionPool.CONNECTION_POOL_CLOSED, new connection_pool_events_1.ConnectionPoolClosedEvent(this));
  237. callback(err);
  238. });
  239. }
  240. /**
  241. * Runs a lambda with an implicitly checked out connection, checking that connection back in when the lambda
  242. * has completed by calling back.
  243. *
  244. * NOTE: please note the required signature of `fn`
  245. *
  246. * @remarks When in load balancer mode, connections can be pinned to cursors or transactions.
  247. * In these cases we pass the connection in to this method to ensure it is used and a new
  248. * connection is not checked out.
  249. *
  250. * @param conn - A pinned connection for use in load balancing mode.
  251. * @param fn - A function which operates on a managed connection
  252. * @param callback - The original callback
  253. */
  254. withConnection(conn, fn, callback) {
  255. if (conn) {
  256. // use the provided connection, and do _not_ check it in after execution
  257. fn(undefined, conn, (fnErr, result) => {
  258. if (typeof callback === 'function') {
  259. if (fnErr) {
  260. callback(fnErr);
  261. }
  262. else {
  263. callback(undefined, result);
  264. }
  265. }
  266. });
  267. return;
  268. }
  269. this.checkOut((err, conn) => {
  270. // don't callback with `err` here, we might want to act upon it inside `fn`
  271. fn(err, conn, (fnErr, result) => {
  272. if (typeof callback === 'function') {
  273. if (fnErr) {
  274. callback(fnErr);
  275. }
  276. else {
  277. callback(undefined, result);
  278. }
  279. }
  280. if (conn) {
  281. this.checkIn(conn);
  282. }
  283. });
  284. });
  285. }
  286. }
  287. exports.ConnectionPool = ConnectionPool;
  288. /**
  289. * Emitted when the connection pool is created.
  290. * @event
  291. */
  292. ConnectionPool.CONNECTION_POOL_CREATED = constants_1.CONNECTION_POOL_CREATED;
  293. /**
  294. * Emitted once when the connection pool is closed
  295. * @event
  296. */
  297. ConnectionPool.CONNECTION_POOL_CLOSED = constants_1.CONNECTION_POOL_CLOSED;
  298. /**
  299. * Emitted each time the connection pool is cleared and it's generation incremented
  300. * @event
  301. */
  302. ConnectionPool.CONNECTION_POOL_CLEARED = constants_1.CONNECTION_POOL_CLEARED;
  303. /**
  304. * Emitted when a connection is created.
  305. * @event
  306. */
  307. ConnectionPool.CONNECTION_CREATED = constants_1.CONNECTION_CREATED;
  308. /**
  309. * Emitted when a connection becomes established, and is ready to use
  310. * @event
  311. */
  312. ConnectionPool.CONNECTION_READY = constants_1.CONNECTION_READY;
  313. /**
  314. * Emitted when a connection is closed
  315. * @event
  316. */
  317. ConnectionPool.CONNECTION_CLOSED = constants_1.CONNECTION_CLOSED;
  318. /**
  319. * Emitted when an attempt to check out a connection begins
  320. * @event
  321. */
  322. ConnectionPool.CONNECTION_CHECK_OUT_STARTED = constants_1.CONNECTION_CHECK_OUT_STARTED;
  323. /**
  324. * Emitted when an attempt to check out a connection fails
  325. * @event
  326. */
  327. ConnectionPool.CONNECTION_CHECK_OUT_FAILED = constants_1.CONNECTION_CHECK_OUT_FAILED;
  328. /**
  329. * Emitted each time a connection is successfully checked out of the connection pool
  330. * @event
  331. */
  332. ConnectionPool.CONNECTION_CHECKED_OUT = constants_1.CONNECTION_CHECKED_OUT;
  333. /**
  334. * Emitted each time a connection is successfully checked into the connection pool
  335. * @event
  336. */
  337. ConnectionPool.CONNECTION_CHECKED_IN = constants_1.CONNECTION_CHECKED_IN;
  338. function ensureMinPoolSize(pool) {
  339. const minPoolSize = pool.options.minPoolSize;
  340. if (pool.closed || minPoolSize === 0) {
  341. return;
  342. }
  343. if (pool.totalConnectionCount < minPoolSize &&
  344. pool.pendingConnectionCount < pool.options.maxConnecting) {
  345. // NOTE: ensureMinPoolSize should not try to get all the pending
  346. // connection permits because that potentially delays the availability of
  347. // the connection to a checkout request
  348. createConnection(pool, (err, connection) => {
  349. pool[kPending]--;
  350. if (!err && connection) {
  351. pool[kConnections].push(connection);
  352. process.nextTick(processWaitQueue, pool);
  353. }
  354. pool[kMinPoolSizeTimer] = (0, timers_1.setTimeout)(() => ensureMinPoolSize(pool), 10);
  355. });
  356. }
  357. else {
  358. pool[kMinPoolSizeTimer] = (0, timers_1.setTimeout)(() => ensureMinPoolSize(pool), 100);
  359. }
  360. }
  361. function connectionIsStale(pool, connection) {
  362. const serviceId = connection.serviceId;
  363. if (pool.loadBalanced && serviceId) {
  364. const sid = serviceId.toHexString();
  365. const generation = pool.serviceGenerations.get(sid);
  366. return connection.generation !== generation;
  367. }
  368. return connection.generation !== pool[kGeneration];
  369. }
  370. function connectionIsIdle(pool, connection) {
  371. return !!(pool.options.maxIdleTimeMS && connection.idleTime > pool.options.maxIdleTimeMS);
  372. }
  373. function createConnection(pool, callback) {
  374. const connectOptions = {
  375. ...pool.options,
  376. id: pool[kConnectionCounter].next().value,
  377. generation: pool[kGeneration],
  378. cancellationToken: pool[kCancellationToken]
  379. };
  380. pool[kPending]++;
  381. // This is our version of a "virtual" no-I/O connection as the spec requires
  382. pool.emit(ConnectionPool.CONNECTION_CREATED, new connection_pool_events_1.ConnectionCreatedEvent(pool, { id: connectOptions.id }));
  383. (0, connect_1.connect)(connectOptions, (err, connection) => {
  384. if (err || !connection) {
  385. pool[kLogger].debug(`connection attempt failed with error [${JSON.stringify(err)}]`);
  386. callback(err);
  387. return;
  388. }
  389. // The pool might have closed since we started trying to create a connection
  390. if (pool.closed) {
  391. pool[kPending]--;
  392. connection.destroy({ force: true });
  393. return;
  394. }
  395. // forward all events from the connection to the pool
  396. for (const event of [...constants_1.APM_EVENTS, connection_1.Connection.CLUSTER_TIME_RECEIVED]) {
  397. connection.on(event, (e) => pool.emit(event, e));
  398. }
  399. if (pool.loadBalanced) {
  400. connection.on(connection_1.Connection.PINNED, pinType => pool[kMetrics].markPinned(pinType));
  401. connection.on(connection_1.Connection.UNPINNED, pinType => pool[kMetrics].markUnpinned(pinType));
  402. const serviceId = connection.serviceId;
  403. if (serviceId) {
  404. let generation;
  405. const sid = serviceId.toHexString();
  406. if ((generation = pool.serviceGenerations.get(sid))) {
  407. connection.generation = generation;
  408. }
  409. else {
  410. pool.serviceGenerations.set(sid, 0);
  411. connection.generation = 0;
  412. }
  413. }
  414. }
  415. connection.markAvailable();
  416. pool.emit(ConnectionPool.CONNECTION_READY, new connection_pool_events_1.ConnectionReadyEvent(pool, connection));
  417. callback(undefined, connection);
  418. return;
  419. });
  420. }
  421. function destroyConnection(pool, connection, reason) {
  422. pool.emit(ConnectionPool.CONNECTION_CLOSED, new connection_pool_events_1.ConnectionClosedEvent(pool, connection, reason));
  423. // destroy the connection
  424. process.nextTick(() => connection.destroy());
  425. }
  426. function processWaitQueue(pool) {
  427. if (pool.closed || pool[kProcessingWaitQueue]) {
  428. return;
  429. }
  430. pool[kProcessingWaitQueue] = true;
  431. while (pool.waitQueueSize) {
  432. const waitQueueMember = pool[kWaitQueue].peekFront();
  433. if (!waitQueueMember) {
  434. pool[kWaitQueue].shift();
  435. continue;
  436. }
  437. if (waitQueueMember[kCancelled]) {
  438. pool[kWaitQueue].shift();
  439. continue;
  440. }
  441. if (!pool.availableConnectionCount) {
  442. break;
  443. }
  444. const connection = pool[kConnections].shift();
  445. if (!connection) {
  446. break;
  447. }
  448. const isStale = connectionIsStale(pool, connection);
  449. const isIdle = connectionIsIdle(pool, connection);
  450. if (!isStale && !isIdle && !connection.closed) {
  451. pool[kCheckedOut]++;
  452. pool.emit(ConnectionPool.CONNECTION_CHECKED_OUT, new connection_pool_events_1.ConnectionCheckedOutEvent(pool, connection));
  453. if (waitQueueMember.timer) {
  454. clearTimeout(waitQueueMember.timer);
  455. }
  456. pool[kWaitQueue].shift();
  457. waitQueueMember.callback(undefined, connection);
  458. }
  459. else {
  460. const reason = connection.closed ? 'error' : isStale ? 'stale' : 'idle';
  461. destroyConnection(pool, connection, reason);
  462. }
  463. }
  464. const { maxPoolSize, maxConnecting } = pool.options;
  465. while (pool.waitQueueSize > 0 &&
  466. pool.pendingConnectionCount < maxConnecting &&
  467. (maxPoolSize === 0 || pool.totalConnectionCount < maxPoolSize)) {
  468. const waitQueueMember = pool[kWaitQueue].shift();
  469. if (!waitQueueMember || waitQueueMember[kCancelled]) {
  470. continue;
  471. }
  472. createConnection(pool, (err, connection) => {
  473. pool[kPending]--;
  474. if (waitQueueMember[kCancelled]) {
  475. if (!err && connection) {
  476. pool[kConnections].push(connection);
  477. }
  478. }
  479. else {
  480. if (err) {
  481. pool.emit(ConnectionPool.CONNECTION_CHECK_OUT_FAILED, new connection_pool_events_1.ConnectionCheckOutFailedEvent(pool, err));
  482. }
  483. else if (connection) {
  484. pool[kCheckedOut]++;
  485. pool.emit(ConnectionPool.CONNECTION_CHECKED_OUT, new connection_pool_events_1.ConnectionCheckedOutEvent(pool, connection));
  486. }
  487. if (waitQueueMember.timer) {
  488. clearTimeout(waitQueueMember.timer);
  489. }
  490. waitQueueMember.callback(err, connection);
  491. }
  492. process.nextTick(processWaitQueue, pool);
  493. });
  494. }
  495. pool[kProcessingWaitQueue] = false;
  496. }
  497. //# sourceMappingURL=connection_pool.js.map