pool-base.js 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590
  1. 'use strict';
  2. const EventEmitter = require('events');
  3. const util = require('util');
  4. const Queue = require('denque');
  5. const Errors = require('./misc/errors');
  6. const Utils = require('./misc/utils');
  7. function PoolBase(options, processTask, createConnectionPool, pingPromise) {
  8. //*****************************************************************
  9. // public methods
  10. //*****************************************************************
  11. /**
  12. * Retrieve a connection from pool.
  13. * Create a new one, if limit is not reached.
  14. * wait until acquireTimeout.
  15. *
  16. * @return {Promise}
  17. */
  18. this.getConnection = function () {
  19. return addRequest(this);
  20. };
  21. /**
  22. * Execute a query on one connection from pool.
  23. *
  24. * @param sql sql command
  25. * @param value parameter value of sql command (not mandatory)
  26. * @return {Promise}
  27. */
  28. this.query = function (sql, value) {
  29. return addRequest(this, sql, value, false);
  30. };
  31. /**
  32. * Execute a batch on one connection from pool.
  33. *
  34. * @param sql sql command
  35. * @param value parameter value of sql command (not mandatory)
  36. * @return {Promise}
  37. */
  38. this.batch = function (sql, value) {
  39. return addRequest(this, sql, value, true);
  40. };
  41. /**
  42. * Close all connection in pool
  43. *
  44. * @return Promise
  45. */
  46. this.end = function () {
  47. if (closed) {
  48. return Promise.reject(
  49. Errors.createError(
  50. 'pool is already closed',
  51. false,
  52. null,
  53. 'HY000',
  54. Errors.ER_POOL_ALREADY_CLOSED,
  55. undefined,
  56. false
  57. )
  58. );
  59. }
  60. closed = true;
  61. clearInterval(idleMaintainingTask);
  62. //close unused connections
  63. const idleConnectionsEndings = [];
  64. let conn;
  65. while ((conn = idleConnections.shift())) {
  66. idleConnectionsEndings.push(conn.forceEnd());
  67. }
  68. firstTaskTimeout = clearTimeout(firstTaskTimeout);
  69. //reject all waiting task
  70. if (taskQueue.size() > 0) {
  71. let task;
  72. const err = Errors.createError(
  73. 'pool is ending, connection request aborted',
  74. false,
  75. null,
  76. 'HY000',
  77. Errors.ER_CLOSING_POOL,
  78. undefined,
  79. false
  80. );
  81. while ((task = taskQueue.shift())) {
  82. process.nextTick(task.reject, err);
  83. }
  84. }
  85. return Promise.all(idleConnectionsEndings);
  86. };
  87. /**
  88. * Get current active connections.
  89. * @return {number}
  90. */
  91. this.activeConnections = function () {
  92. return Object.keys(activeConnections).length;
  93. };
  94. /**
  95. * Get current total connection number.
  96. * @return {number}
  97. */
  98. this.totalConnections = function () {
  99. return this.activeConnections() + this.idleConnections();
  100. };
  101. /**
  102. * Get current idle connection number.
  103. * @return {number}
  104. */
  105. this.idleConnections = function () {
  106. return idleConnections.size();
  107. };
  108. /**
  109. * Get current stacked connection request.
  110. * @return {number}
  111. */
  112. this.taskQueueSize = function () {
  113. return taskQueue.size();
  114. };
  115. /**
  116. * First connection creation.
  117. * activation is slightly different than pooling grow : If connection fails, there is many retries for 30s
  118. * (option initializationTimeout).
  119. * If connection fails, error will be thrown to request / console if no request, to ensure that error is thrown.
  120. */
  121. this.initialize = function () {
  122. connectionInCreation = true;
  123. const self = this;
  124. const timeoutEnd = Date.now() + opts.initializationTimeout;
  125. connectionCreationLoop(self, 0, timeoutEnd)
  126. .then((conn) => {
  127. //add to pool
  128. if (closed) {
  129. conn.forceEnd().catch((err) => {});
  130. } else {
  131. addPoolConnection(self, conn);
  132. if (opts.idleTimeout > 0) {
  133. idleMaintainingTask = setInterval(idleMaintainer, 500, self);
  134. }
  135. }
  136. })
  137. .catch((err) => {
  138. connectionInCreation = false;
  139. const task = taskQueue.shift();
  140. if (task) {
  141. firstTaskTimeout = clearTimeout(firstTaskTimeout);
  142. process.nextTick(task.reject, err);
  143. resetTimeoutToNextTask();
  144. } else if (!closed) {
  145. console.error(err);
  146. }
  147. })
  148. .finally(() => {
  149. ensurePoolSize(self);
  150. });
  151. };
  152. this.escape = (value) => {
  153. return Utils.escape(options.connOptions, searchInfo(), value);
  154. };
  155. this.escapeId = (value) => {
  156. return Utils.escapeId(options.connOptions, searchInfo(), value);
  157. };
  158. //*****************************************************************
  159. // internal methods
  160. //*****************************************************************
  161. /**
  162. * Search info object of an existing connection. to know server type and version.
  163. * @returns information object if connection available.
  164. */
  165. const searchInfo = () => {
  166. let info = null;
  167. let conn = idleConnections.get(0);
  168. if (conn == null) {
  169. conn = Object.keys(activeConnections)[0];
  170. }
  171. if (conn != null) {
  172. info = conn.info;
  173. }
  174. return info;
  175. };
  176. /**
  177. * Get a connection from pool / execute query
  178. *
  179. * @param pool current pool
  180. * @param sql sql value (not mandatory)
  181. * @param values sql parameter (not mandatory)
  182. * @param isBatch is batch request
  183. * @return {*}
  184. */
  185. const addRequest = function (pool, sql, values, isBatch) {
  186. if (closed) {
  187. return Promise.reject(
  188. Errors.createError(
  189. 'pool is closed',
  190. false,
  191. null,
  192. 'HY000',
  193. Errors.ER_POOL_ALREADY_CLOSED,
  194. undefined,
  195. false
  196. )
  197. );
  198. }
  199. return getIdleValidConnection(pool).then(
  200. (conn) => {
  201. pool.emit('acquire', conn);
  202. return processTask(conn, sql, values, isBatch);
  203. },
  204. () => {
  205. process.nextTick(() => pool.emit('enqueue'));
  206. //no idle connection available
  207. //create a new connection if limit is not reached
  208. ensurePoolSize(pool);
  209. //connections are all used, stack demand.
  210. return new Promise((resolve, reject) => {
  211. const task = {
  212. timeout: Date.now() + opts.acquireTimeout,
  213. reject: reject,
  214. resolve: resolve,
  215. sql: sql,
  216. values: values,
  217. isBatch: isBatch
  218. };
  219. if (!firstTaskTimeout) {
  220. firstTaskTimeout = setTimeout(rejectAndResetTimeout, opts.acquireTimeout, task);
  221. }
  222. taskQueue.push(task);
  223. });
  224. }
  225. );
  226. };
  227. /**
  228. * Return an idle Connection.
  229. * If connection has not been used for some time ( minDelayValidation), validate connection status.
  230. *
  231. * @param pool pool
  232. * @returns {Promise<Connection|null>)} connection of null of no valid idle connection.
  233. */
  234. const getIdleValidConnection = function (pool) {
  235. if (idleConnections.isEmpty()) {
  236. return Promise.reject(null);
  237. }
  238. const conn = idleConnections.shift();
  239. activeConnections[conn.threadId] = conn;
  240. if (opts.minDelayValidation <= 0 || Date.now() - conn.lastUse > opts.minDelayValidation) {
  241. return pingPromise(conn)
  242. .then(() => {
  243. initLeakProcess(conn);
  244. return Promise.resolve(conn);
  245. })
  246. .catch((err) => {
  247. delete activeConnections[conn.threadId];
  248. pool.emit('_remove-conn');
  249. return getIdleValidConnection(pool);
  250. });
  251. } else {
  252. //just check connection state
  253. if (conn.isValid()) {
  254. initLeakProcess(conn);
  255. return Promise.resolve(conn);
  256. } else {
  257. delete activeConnections[conn.threadId];
  258. pool.emit('_remove-conn');
  259. return getIdleValidConnection(pool);
  260. }
  261. }
  262. };
  263. /**
  264. * Task request timeout handler
  265. * @param task
  266. */
  267. const timeoutTask = (task) => {
  268. firstTaskTimeout = null;
  269. if (task === taskQueue.peekFront()) {
  270. taskQueue.shift();
  271. process.nextTick(
  272. task.reject,
  273. Errors.createError(
  274. 'retrieve connection from pool timeout after ' +
  275. Math.abs(Date.now() - (task.timeout - opts.acquireTimeout)) +
  276. 'ms',
  277. false,
  278. null,
  279. 'HY000',
  280. Errors.ER_GET_CONNECTION_TIMEOUT,
  281. undefined,
  282. false
  283. )
  284. );
  285. } else {
  286. throw new Error('Rejection by timeout without task !!!');
  287. }
  288. };
  289. /**
  290. * Reject task, and reset timeout to next waiting task if any.
  291. * @param task
  292. */
  293. const rejectAndResetTimeout = (task) => {
  294. timeoutTask(task);
  295. resetTimeoutToNextTask();
  296. };
  297. /**
  298. * Loop for connection creation.
  299. * This permits to wait before next try after a connection fail.
  300. *
  301. * @param pool current pool
  302. * @param iteration current iteration
  303. * @param timeoutEnd ending timeout
  304. * @returns {Promise<any>} Connection if found, error if not
  305. */
  306. const connectionCreationLoop = function (pool, iteration, timeoutEnd) {
  307. return new Promise(function (resolve, reject) {
  308. const creationTryout = function (resolve, reject) {
  309. if (closed) {
  310. reject(
  311. Errors.createError(
  312. 'Cannot create new connection to pool, pool closed',
  313. true,
  314. null,
  315. '08S01',
  316. Errors.ER_ADD_CONNECTION_CLOSED_POOL,
  317. null
  318. )
  319. );
  320. return;
  321. }
  322. iteration++;
  323. createConnectionPool(pool)
  324. .then((conn) => {
  325. resolve(conn);
  326. })
  327. .catch((err) => {
  328. //if timeout is reached or authentication fail return error
  329. if (
  330. closed ||
  331. (err.errno && (err.errno === 1524 || err.errno === 1045 || err.errno === 1698)) ||
  332. timeoutEnd < Date.now()
  333. ) {
  334. reject(err);
  335. return;
  336. }
  337. setTimeout(creationTryout.bind(null, resolve, reject), 500);
  338. });
  339. };
  340. //initial without timeout
  341. creationTryout(resolve, reject);
  342. });
  343. };
  344. const addPoolConnection = function (pool, conn) {
  345. conn.lastUse = Date.now();
  346. const initialDestroyFct = conn.destroy;
  347. conn.destroy = () => {
  348. removeLeakProcess(conn);
  349. delete activeConnections[conn.threadId];
  350. initialDestroyFct();
  351. pool.emit('_remove-conn');
  352. };
  353. //Connection error
  354. // -> evict connection from pool
  355. conn.on('error', (err) => {
  356. let idx = 0;
  357. let currConn;
  358. removeLeakProcess(conn);
  359. delete activeConnections[conn.threadId];
  360. while ((currConn = idleConnections.peekAt(idx))) {
  361. if (currConn === conn) {
  362. idleConnections.removeOne(idx);
  363. break;
  364. } else {
  365. //since connection did have an error, other waiting connection might too
  366. //forcing validation when borrowed next time, even if "minDelayValidation" is not reached.
  367. currConn.lastUse = Math.min(Date.now() - opts.minDelayValidation, currConn.lastUse);
  368. }
  369. idx++;
  370. }
  371. pool.emit('_remove-conn');
  372. });
  373. connectionInCreation = false;
  374. idleConnections.push(conn);
  375. pool.emit('_idle-conn');
  376. process.nextTick(() => pool.emit('connection', conn));
  377. };
  378. this._releaseConnection = function (conn) {
  379. removeLeakProcess(conn);
  380. conn.lastUse = Date.now();
  381. delete activeConnections[conn.threadId];
  382. const pool = this;
  383. if (closed) {
  384. return conn.forceEnd().catch(() => {
  385. return Promise.resolve();
  386. });
  387. } else if (conn.isValid()) {
  388. pool.emit('release', conn);
  389. idleConnections.push(conn);
  390. process.nextTick(() => pool.emit('_idle-conn'));
  391. } else {
  392. ensurePoolSize(pool);
  393. }
  394. };
  395. /**
  396. * Grow pool connections until reaching connection limit.
  397. */
  398. const ensurePoolSize = function (pool) {
  399. if (
  400. !connectionInCreation &&
  401. pool.idleConnections() < opts.minimumIdle &&
  402. pool.totalConnections() < opts.connectionLimit &&
  403. !closed
  404. ) {
  405. connectionInCreation = true;
  406. process.nextTick(() => {
  407. const timeoutEnd = Date.now() + opts.initializationTimeout;
  408. if (!closed) {
  409. connectionCreationLoop(pool, 0, timeoutEnd)
  410. .then((conn) => {
  411. if (closed) {
  412. return conn.forceEnd().catch((err) => {});
  413. }
  414. addPoolConnection(pool, conn);
  415. })
  416. .catch((err) => {
  417. if (pool.totalConnections() === 0) {
  418. const task = taskQueue.shift();
  419. if (task) {
  420. firstTaskTimeout = clearTimeout(firstTaskTimeout);
  421. process.nextTick(task.reject, err);
  422. resetTimeoutToNextTask();
  423. }
  424. } else if (!closed) {
  425. console.error(`pool fail to create connection (${err.message})`);
  426. }
  427. //delay next try
  428. setTimeout(() => {
  429. connectionInCreation = false;
  430. if (taskQueue.size() > 0) {
  431. ensurePoolSize(pool);
  432. }
  433. }, 500);
  434. });
  435. }
  436. });
  437. }
  438. };
  439. const resetTimeoutToNextTask = () => {
  440. //handle next Timer
  441. const currTime = Date.now();
  442. let nextTask;
  443. while ((nextTask = taskQueue.peekFront())) {
  444. if (nextTask.timeout < currTime) {
  445. timeoutTask(nextTask);
  446. } else {
  447. firstTaskTimeout = setTimeout(rejectAndResetTimeout, nextTask.timeout - currTime, nextTask);
  448. return;
  449. }
  450. }
  451. };
  452. /**
  453. * Permit to remove idle connection if unused for some time.
  454. * @param pool current pool
  455. */
  456. const idleMaintainer = function (pool) {
  457. let toRemove = Math.max(1, pool.idleConnections() - opts.minimumIdle);
  458. while (toRemove > 0) {
  459. const conn = idleConnections.peek();
  460. --toRemove;
  461. if (conn && conn.lastUse + opts.idleTimeout * 1000 < Date.now()) {
  462. idleConnections.shift();
  463. conn.forceEnd().catch((err) => {});
  464. continue;
  465. }
  466. break;
  467. }
  468. ensurePoolSize(pool);
  469. };
  470. this._discardConnection = (conn) => {
  471. removeLeakProcess(conn);
  472. delete activeConnections[conn.threadId];
  473. conn.forceEnd().catch((err) => {});
  474. this.emit('_remove-conn');
  475. };
  476. const logLeak = (conn) => {
  477. console.log(
  478. 'Possible connection leak on thread ' +
  479. conn.info.threadId +
  480. ' (connection not returned to pool since ' +
  481. (Date.now() - conn.lastUse) +
  482. 'ms. Did connection.released() been implemented'
  483. );
  484. conn.leaked = true;
  485. };
  486. const _initLeakProcess = (conn) => {
  487. conn.lastUse = Date.now();
  488. conn.leaked = false;
  489. conn.leakProcess = setTimeout(logLeak, opts.leakDetectionTimeout, conn);
  490. };
  491. const _removeLeakProcess = (conn) => {
  492. conn.leakProcess = clearTimeout(conn.leakProcess);
  493. if (conn.leaked) {
  494. console.log(
  495. 'Previous possible leak connection with thread ' +
  496. conn.info.threadId +
  497. ' was returned to pool'
  498. );
  499. }
  500. };
  501. /**
  502. * Launch next waiting task request if available connections.
  503. */
  504. const handleTaskQueue = function () {
  505. firstTaskTimeout = clearTimeout(firstTaskTimeout);
  506. const task = taskQueue.shift();
  507. if (task) {
  508. const conn = idleConnections.shift();
  509. if (conn) {
  510. initLeakProcess(conn);
  511. this.emit('acquire', conn);
  512. activeConnections[conn.threadId] = conn;
  513. resetTimeoutToNextTask();
  514. processTask(conn, task.sql, task.values, task.isBatch)
  515. .then(task.resolve)
  516. .catch(task.reject);
  517. } else {
  518. taskQueue.unshift(task);
  519. }
  520. }
  521. };
  522. const opts = options;
  523. let closed = false;
  524. let connectionInCreation = false;
  525. const initLeakProcess = opts.leakDetectionTimeout > 0 ? _initLeakProcess : () => {};
  526. const removeLeakProcess = opts.leakDetectionTimeout > 0 ? _removeLeakProcess : () => {};
  527. const idleConnections = new Queue();
  528. const activeConnections = {};
  529. const taskQueue = new Queue();
  530. let idleMaintainingTask;
  531. let firstTaskTimeout;
  532. Object.defineProperty(this, 'closed', {
  533. get() {
  534. return closed;
  535. }
  536. });
  537. EventEmitter.call(this);
  538. this.on('_idle-conn', handleTaskQueue.bind(this));
  539. this.on('_remove-conn', ensurePoolSize.bind(this, this));
  540. this.on('connection', ensurePoolSize.bind(this, this));
  541. }
  542. util.inherits(PoolBase, EventEmitter);
  543. module.exports = PoolBase;