123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497 |
- "use strict";
- Object.defineProperty(exports, "__esModule", { value: true });
- exports.ConnectionPool = void 0;
- const Denque = require("denque");
- const timers_1 = require("timers");
- const constants_1 = require("../constants");
- const error_1 = require("../error");
- const logger_1 = require("../logger");
- const mongo_types_1 = require("../mongo_types");
- const utils_1 = require("../utils");
- const connect_1 = require("./connect");
- const connection_1 = require("./connection");
- const connection_pool_events_1 = require("./connection_pool_events");
- const errors_1 = require("./errors");
- const metrics_1 = require("./metrics");
- /** @internal */
- const kLogger = Symbol('logger');
- /** @internal */
- const kConnections = Symbol('connections');
- /** @internal */
- const kPending = Symbol('pending');
- /** @internal */
- const kCheckedOut = Symbol('checkedOut');
- /** @internal */
- const kMinPoolSizeTimer = Symbol('minPoolSizeTimer');
- /** @internal */
- const kGeneration = Symbol('generation');
- /** @internal */
- const kServiceGenerations = Symbol('serviceGenerations');
- /** @internal */
- const kConnectionCounter = Symbol('connectionCounter');
- /** @internal */
- const kCancellationToken = Symbol('cancellationToken');
- /** @internal */
- const kWaitQueue = Symbol('waitQueue');
- /** @internal */
- const kCancelled = Symbol('cancelled');
- /** @internal */
- const kMetrics = Symbol('metrics');
- /** @internal */
- const kProcessingWaitQueue = Symbol('processingWaitQueue');
- /**
- * A pool of connections which dynamically resizes, and emit events related to pool activity
- * @internal
- */
- class ConnectionPool extends mongo_types_1.TypedEventEmitter {
- /** @internal */
- constructor(options) {
- var _a, _b, _c, _d, _e;
- super();
- this.closed = false;
- this.options = Object.freeze({
- ...options,
- connectionType: connection_1.Connection,
- maxPoolSize: (_a = options.maxPoolSize) !== null && _a !== void 0 ? _a : 100,
- minPoolSize: (_b = options.minPoolSize) !== null && _b !== void 0 ? _b : 0,
- maxConnecting: (_c = options.maxConnecting) !== null && _c !== void 0 ? _c : 2,
- maxIdleTimeMS: (_d = options.maxIdleTimeMS) !== null && _d !== void 0 ? _d : 0,
- waitQueueTimeoutMS: (_e = options.waitQueueTimeoutMS) !== null && _e !== void 0 ? _e : 0,
- autoEncrypter: options.autoEncrypter,
- metadata: options.metadata
- });
- if (this.options.minPoolSize > this.options.maxPoolSize) {
- throw new error_1.MongoInvalidArgumentError('Connection pool minimum size must not be greater than maximum pool size');
- }
- this[kLogger] = new logger_1.Logger('ConnectionPool');
- this[kConnections] = new Denque();
- this[kPending] = 0;
- this[kCheckedOut] = 0;
- this[kMinPoolSizeTimer] = undefined;
- this[kGeneration] = 0;
- this[kServiceGenerations] = new Map();
- this[kConnectionCounter] = (0, utils_1.makeCounter)(1);
- this[kCancellationToken] = new mongo_types_1.CancellationToken();
- this[kCancellationToken].setMaxListeners(Infinity);
- this[kWaitQueue] = new Denque();
- this[kMetrics] = new metrics_1.ConnectionPoolMetrics();
- this[kProcessingWaitQueue] = false;
- process.nextTick(() => {
- this.emit(ConnectionPool.CONNECTION_POOL_CREATED, new connection_pool_events_1.ConnectionPoolCreatedEvent(this));
- ensureMinPoolSize(this);
- });
- }
- /** The address of the endpoint the pool is connected to */
- get address() {
- return this.options.hostAddress.toString();
- }
- /** An integer representing the SDAM generation of the pool */
- get generation() {
- return this[kGeneration];
- }
- /** An integer expressing how many total connections (available + pending + in use) the pool currently has */
- get totalConnectionCount() {
- return (this.availableConnectionCount + this.pendingConnectionCount + this.currentCheckedOutCount);
- }
- /** An integer expressing how many connections are currently available in the pool. */
- get availableConnectionCount() {
- return this[kConnections].length;
- }
- get pendingConnectionCount() {
- return this[kPending];
- }
- get currentCheckedOutCount() {
- return this[kCheckedOut];
- }
- get waitQueueSize() {
- return this[kWaitQueue].length;
- }
- get loadBalanced() {
- return this.options.loadBalanced;
- }
- get serviceGenerations() {
- return this[kServiceGenerations];
- }
- /**
- * Get the metrics information for the pool when a wait queue timeout occurs.
- */
- waitQueueErrorMetrics() {
- return this[kMetrics].info(this.options.maxPoolSize);
- }
- /**
- * Check a connection out of this pool. The connection will continue to be tracked, but no reference to it
- * will be held by the pool. This means that if a connection is checked out it MUST be checked back in or
- * explicitly destroyed by the new owner.
- */
- checkOut(callback) {
- this.emit(ConnectionPool.CONNECTION_CHECK_OUT_STARTED, new connection_pool_events_1.ConnectionCheckOutStartedEvent(this));
- if (this.closed) {
- this.emit(ConnectionPool.CONNECTION_CHECK_OUT_FAILED, new connection_pool_events_1.ConnectionCheckOutFailedEvent(this, 'poolClosed'));
- callback(new errors_1.PoolClosedError(this));
- return;
- }
- const waitQueueMember = { callback };
- const waitQueueTimeoutMS = this.options.waitQueueTimeoutMS;
- if (waitQueueTimeoutMS) {
- waitQueueMember.timer = (0, timers_1.setTimeout)(() => {
- waitQueueMember[kCancelled] = true;
- waitQueueMember.timer = undefined;
- this.emit(ConnectionPool.CONNECTION_CHECK_OUT_FAILED, new connection_pool_events_1.ConnectionCheckOutFailedEvent(this, 'timeout'));
- waitQueueMember.callback(new errors_1.WaitQueueTimeoutError(this.loadBalanced
- ? this.waitQueueErrorMetrics()
- : 'Timed out while checking out a connection from connection pool', this.address));
- }, waitQueueTimeoutMS);
- }
- this[kWaitQueue].push(waitQueueMember);
- process.nextTick(processWaitQueue, this);
- }
- /**
- * Check a connection into the pool.
- *
- * @param connection - The connection to check in
- */
- checkIn(connection) {
- const poolClosed = this.closed;
- const stale = connectionIsStale(this, connection);
- const willDestroy = !!(poolClosed || stale || connection.closed);
- if (!willDestroy) {
- connection.markAvailable();
- this[kConnections].unshift(connection);
- }
- this[kCheckedOut]--;
- this.emit(ConnectionPool.CONNECTION_CHECKED_IN, new connection_pool_events_1.ConnectionCheckedInEvent(this, connection));
- if (willDestroy) {
- const reason = connection.closed ? 'error' : poolClosed ? 'poolClosed' : 'stale';
- destroyConnection(this, connection, reason);
- }
- process.nextTick(processWaitQueue, this);
- }
- /**
- * Clear the pool
- *
- * Pool reset is handled by incrementing the pool's generation count. Any existing connection of a
- * previous generation will eventually be pruned during subsequent checkouts.
- */
- clear(serviceId) {
- if (this.loadBalanced && serviceId) {
- const sid = serviceId.toHexString();
- const generation = this.serviceGenerations.get(sid);
- // Only need to worry if the generation exists, since it should
- // always be there but typescript needs the check.
- if (generation == null) {
- // TODO(NODE-3483)
- throw new error_1.MongoRuntimeError('Service generations are required in load balancer mode.');
- }
- else {
- // Increment the generation for the service id.
- this.serviceGenerations.set(sid, generation + 1);
- }
- }
- else {
- this[kGeneration] += 1;
- }
- this.emit('connectionPoolCleared', new connection_pool_events_1.ConnectionPoolClearedEvent(this, serviceId));
- }
- close(_options, _cb) {
- let options = _options;
- const callback = (_cb !== null && _cb !== void 0 ? _cb : _options);
- if (typeof options === 'function') {
- options = {};
- }
- options = Object.assign({ force: false }, options);
- if (this.closed) {
- return callback();
- }
- // immediately cancel any in-flight connections
- this[kCancellationToken].emit('cancel');
- // drain the wait queue
- while (this.waitQueueSize) {
- const waitQueueMember = this[kWaitQueue].pop();
- if (waitQueueMember) {
- if (waitQueueMember.timer) {
- clearTimeout(waitQueueMember.timer);
- }
- if (!waitQueueMember[kCancelled]) {
- // TODO(NODE-3483): Replace with MongoConnectionPoolClosedError
- waitQueueMember.callback(new error_1.MongoRuntimeError('Connection pool closed'));
- }
- }
- }
- // clear the min pool size timer
- const minPoolSizeTimer = this[kMinPoolSizeTimer];
- if (minPoolSizeTimer) {
- clearTimeout(minPoolSizeTimer);
- }
- // end the connection counter
- if (typeof this[kConnectionCounter].return === 'function') {
- this[kConnectionCounter].return(undefined);
- }
- // mark the pool as closed immediately
- this.closed = true;
- (0, utils_1.eachAsync)(this[kConnections].toArray(), (conn, cb) => {
- this.emit(ConnectionPool.CONNECTION_CLOSED, new connection_pool_events_1.ConnectionClosedEvent(this, conn, 'poolClosed'));
- conn.destroy(options, cb);
- }, err => {
- this[kConnections].clear();
- this.emit(ConnectionPool.CONNECTION_POOL_CLOSED, new connection_pool_events_1.ConnectionPoolClosedEvent(this));
- callback(err);
- });
- }
- /**
- * Runs a lambda with an implicitly checked out connection, checking that connection back in when the lambda
- * has completed by calling back.
- *
- * NOTE: please note the required signature of `fn`
- *
- * @remarks When in load balancer mode, connections can be pinned to cursors or transactions.
- * In these cases we pass the connection in to this method to ensure it is used and a new
- * connection is not checked out.
- *
- * @param conn - A pinned connection for use in load balancing mode.
- * @param fn - A function which operates on a managed connection
- * @param callback - The original callback
- */
- withConnection(conn, fn, callback) {
- if (conn) {
- // use the provided connection, and do _not_ check it in after execution
- fn(undefined, conn, (fnErr, result) => {
- if (typeof callback === 'function') {
- if (fnErr) {
- callback(fnErr);
- }
- else {
- callback(undefined, result);
- }
- }
- });
- return;
- }
- this.checkOut((err, conn) => {
- // don't callback with `err` here, we might want to act upon it inside `fn`
- fn(err, conn, (fnErr, result) => {
- if (typeof callback === 'function') {
- if (fnErr) {
- callback(fnErr);
- }
- else {
- callback(undefined, result);
- }
- }
- if (conn) {
- this.checkIn(conn);
- }
- });
- });
- }
- }
- exports.ConnectionPool = ConnectionPool;
- /**
- * Emitted when the connection pool is created.
- * @event
- */
- ConnectionPool.CONNECTION_POOL_CREATED = constants_1.CONNECTION_POOL_CREATED;
- /**
- * Emitted once when the connection pool is closed
- * @event
- */
- ConnectionPool.CONNECTION_POOL_CLOSED = constants_1.CONNECTION_POOL_CLOSED;
- /**
- * Emitted each time the connection pool is cleared and it's generation incremented
- * @event
- */
- ConnectionPool.CONNECTION_POOL_CLEARED = constants_1.CONNECTION_POOL_CLEARED;
- /**
- * Emitted when a connection is created.
- * @event
- */
- ConnectionPool.CONNECTION_CREATED = constants_1.CONNECTION_CREATED;
- /**
- * Emitted when a connection becomes established, and is ready to use
- * @event
- */
- ConnectionPool.CONNECTION_READY = constants_1.CONNECTION_READY;
- /**
- * Emitted when a connection is closed
- * @event
- */
- ConnectionPool.CONNECTION_CLOSED = constants_1.CONNECTION_CLOSED;
- /**
- * Emitted when an attempt to check out a connection begins
- * @event
- */
- ConnectionPool.CONNECTION_CHECK_OUT_STARTED = constants_1.CONNECTION_CHECK_OUT_STARTED;
- /**
- * Emitted when an attempt to check out a connection fails
- * @event
- */
- ConnectionPool.CONNECTION_CHECK_OUT_FAILED = constants_1.CONNECTION_CHECK_OUT_FAILED;
- /**
- * Emitted each time a connection is successfully checked out of the connection pool
- * @event
- */
- ConnectionPool.CONNECTION_CHECKED_OUT = constants_1.CONNECTION_CHECKED_OUT;
- /**
- * Emitted each time a connection is successfully checked into the connection pool
- * @event
- */
- ConnectionPool.CONNECTION_CHECKED_IN = constants_1.CONNECTION_CHECKED_IN;
- function ensureMinPoolSize(pool) {
- const minPoolSize = pool.options.minPoolSize;
- if (pool.closed || minPoolSize === 0) {
- return;
- }
- if (pool.totalConnectionCount < minPoolSize &&
- pool.pendingConnectionCount < pool.options.maxConnecting) {
- // NOTE: ensureMinPoolSize should not try to get all the pending
- // connection permits because that potentially delays the availability of
- // the connection to a checkout request
- createConnection(pool, (err, connection) => {
- pool[kPending]--;
- if (!err && connection) {
- pool[kConnections].push(connection);
- process.nextTick(processWaitQueue, pool);
- }
- pool[kMinPoolSizeTimer] = (0, timers_1.setTimeout)(() => ensureMinPoolSize(pool), 10);
- });
- }
- else {
- pool[kMinPoolSizeTimer] = (0, timers_1.setTimeout)(() => ensureMinPoolSize(pool), 100);
- }
- }
- function connectionIsStale(pool, connection) {
- const serviceId = connection.serviceId;
- if (pool.loadBalanced && serviceId) {
- const sid = serviceId.toHexString();
- const generation = pool.serviceGenerations.get(sid);
- return connection.generation !== generation;
- }
- return connection.generation !== pool[kGeneration];
- }
- function connectionIsIdle(pool, connection) {
- return !!(pool.options.maxIdleTimeMS && connection.idleTime > pool.options.maxIdleTimeMS);
- }
- function createConnection(pool, callback) {
- const connectOptions = {
- ...pool.options,
- id: pool[kConnectionCounter].next().value,
- generation: pool[kGeneration],
- cancellationToken: pool[kCancellationToken]
- };
- pool[kPending]++;
- // This is our version of a "virtual" no-I/O connection as the spec requires
- pool.emit(ConnectionPool.CONNECTION_CREATED, new connection_pool_events_1.ConnectionCreatedEvent(pool, { id: connectOptions.id }));
- (0, connect_1.connect)(connectOptions, (err, connection) => {
- if (err || !connection) {
- pool[kLogger].debug(`connection attempt failed with error [${JSON.stringify(err)}]`);
- callback(err);
- return;
- }
- // The pool might have closed since we started trying to create a connection
- if (pool.closed) {
- pool[kPending]--;
- connection.destroy({ force: true });
- return;
- }
- // forward all events from the connection to the pool
- for (const event of [...constants_1.APM_EVENTS, connection_1.Connection.CLUSTER_TIME_RECEIVED]) {
- connection.on(event, (e) => pool.emit(event, e));
- }
- if (pool.loadBalanced) {
- connection.on(connection_1.Connection.PINNED, pinType => pool[kMetrics].markPinned(pinType));
- connection.on(connection_1.Connection.UNPINNED, pinType => pool[kMetrics].markUnpinned(pinType));
- const serviceId = connection.serviceId;
- if (serviceId) {
- let generation;
- const sid = serviceId.toHexString();
- if ((generation = pool.serviceGenerations.get(sid))) {
- connection.generation = generation;
- }
- else {
- pool.serviceGenerations.set(sid, 0);
- connection.generation = 0;
- }
- }
- }
- connection.markAvailable();
- pool.emit(ConnectionPool.CONNECTION_READY, new connection_pool_events_1.ConnectionReadyEvent(pool, connection));
- callback(undefined, connection);
- return;
- });
- }
- function destroyConnection(pool, connection, reason) {
- pool.emit(ConnectionPool.CONNECTION_CLOSED, new connection_pool_events_1.ConnectionClosedEvent(pool, connection, reason));
- // destroy the connection
- process.nextTick(() => connection.destroy());
- }
- function processWaitQueue(pool) {
- if (pool.closed || pool[kProcessingWaitQueue]) {
- return;
- }
- pool[kProcessingWaitQueue] = true;
- while (pool.waitQueueSize) {
- const waitQueueMember = pool[kWaitQueue].peekFront();
- if (!waitQueueMember) {
- pool[kWaitQueue].shift();
- continue;
- }
- if (waitQueueMember[kCancelled]) {
- pool[kWaitQueue].shift();
- continue;
- }
- if (!pool.availableConnectionCount) {
- break;
- }
- const connection = pool[kConnections].shift();
- if (!connection) {
- break;
- }
- const isStale = connectionIsStale(pool, connection);
- const isIdle = connectionIsIdle(pool, connection);
- if (!isStale && !isIdle && !connection.closed) {
- pool[kCheckedOut]++;
- pool.emit(ConnectionPool.CONNECTION_CHECKED_OUT, new connection_pool_events_1.ConnectionCheckedOutEvent(pool, connection));
- if (waitQueueMember.timer) {
- clearTimeout(waitQueueMember.timer);
- }
- pool[kWaitQueue].shift();
- waitQueueMember.callback(undefined, connection);
- }
- else {
- const reason = connection.closed ? 'error' : isStale ? 'stale' : 'idle';
- destroyConnection(pool, connection, reason);
- }
- }
- const { maxPoolSize, maxConnecting } = pool.options;
- while (pool.waitQueueSize > 0 &&
- pool.pendingConnectionCount < maxConnecting &&
- (maxPoolSize === 0 || pool.totalConnectionCount < maxPoolSize)) {
- const waitQueueMember = pool[kWaitQueue].shift();
- if (!waitQueueMember || waitQueueMember[kCancelled]) {
- continue;
- }
- createConnection(pool, (err, connection) => {
- pool[kPending]--;
- if (waitQueueMember[kCancelled]) {
- if (!err && connection) {
- pool[kConnections].push(connection);
- }
- }
- else {
- if (err) {
- pool.emit(ConnectionPool.CONNECTION_CHECK_OUT_FAILED, new connection_pool_events_1.ConnectionCheckOutFailedEvent(pool, err));
- }
- else if (connection) {
- pool[kCheckedOut]++;
- pool.emit(ConnectionPool.CONNECTION_CHECKED_OUT, new connection_pool_events_1.ConnectionCheckedOutEvent(pool, connection));
- }
- if (waitQueueMember.timer) {
- clearTimeout(waitQueueMember.timer);
- }
- waitQueueMember.callback(err, connection);
- }
- process.nextTick(processWaitQueue, pool);
- });
- }
- pool[kProcessingWaitQueue] = false;
- }
- //# sourceMappingURL=connection_pool.js.map
|