connection-manager.js 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354
  1. 'use strict';
  2. const { Pool, TimeoutError } = require('sequelize-pool');
  3. const _ = require('lodash');
  4. const semver = require('semver');
  5. const errors = require('../../errors');
  6. const { logger } = require('../../utils/logger');
  7. const deprecations = require('../../utils/deprecations');
  8. const debug = logger.debugContext('pool');
  9. /**
  10. * Abstract Connection Manager
  11. *
  12. * Connection manager which handles pooling & replication.
  13. * Uses sequelize-pool for pooling
  14. *
  15. * @private
  16. */
  17. class ConnectionManager {
  18. constructor(dialect, sequelize) {
  19. const config = _.cloneDeep(sequelize.config);
  20. this.sequelize = sequelize;
  21. this.config = config;
  22. this.dialect = dialect;
  23. this.versionPromise = null;
  24. this.dialectName = this.sequelize.options.dialect;
  25. if (config.pool === false) {
  26. throw new Error('Support for pool:false was removed in v4.0');
  27. }
  28. config.pool = _.defaults(config.pool || {}, {
  29. max: 5,
  30. min: 0,
  31. idle: 10000,
  32. acquire: 60000,
  33. evict: 1000,
  34. validate: this._validate.bind(this)
  35. });
  36. this.initPools();
  37. }
  38. refreshTypeParser(dataTypes) {
  39. _.each(dataTypes, dataType => {
  40. if (Object.prototype.hasOwnProperty.call(dataType, 'parse')) {
  41. if (dataType.types[this.dialectName]) {
  42. this._refreshTypeParser(dataType);
  43. } else {
  44. throw new Error(`Parse function not supported for type ${dataType.key} in dialect ${this.dialectName}`);
  45. }
  46. }
  47. });
  48. }
  49. /**
  50. * Try to load dialect module from various configured options.
  51. * Priority goes like dialectModulePath > dialectModule > require(default)
  52. *
  53. * @param {string} moduleName Name of dialect module to lookup
  54. *
  55. * @private
  56. * @returns {object}
  57. */
  58. _loadDialectModule(moduleName) {
  59. try {
  60. if (this.sequelize.config.dialectModulePath) {
  61. return require(this.sequelize.config.dialectModulePath);
  62. }
  63. if (this.sequelize.config.dialectModule) {
  64. return this.sequelize.config.dialectModule;
  65. }
  66. return require(moduleName);
  67. } catch (err) {
  68. if (err.code === 'MODULE_NOT_FOUND') {
  69. if (this.sequelize.config.dialectModulePath) {
  70. throw new Error(`Unable to find dialect at ${this.sequelize.config.dialectModulePath}`);
  71. }
  72. throw new Error(`Please install ${moduleName} package manually`);
  73. }
  74. throw err;
  75. }
  76. }
  77. /**
  78. * Handler which executes on process exit or connection manager shutdown
  79. *
  80. * @private
  81. * @returns {Promise}
  82. */
  83. async _onProcessExit() {
  84. if (!this.pool) {
  85. return;
  86. }
  87. await this.pool.drain();
  88. debug('connection drain due to process exit');
  89. return await this.pool.destroyAllNow();
  90. }
  91. /**
  92. * Drain the pool and close it permanently
  93. *
  94. * @returns {Promise}
  95. */
  96. async close() {
  97. // Mark close of pool
  98. this.getConnection = async function getConnection() {
  99. throw new Error('ConnectionManager.getConnection was called after the connection manager was closed!');
  100. };
  101. return await this._onProcessExit();
  102. }
  103. /**
  104. * Initialize connection pool. By default pool autostart is set to false, so no connection will be
  105. * be created unless `pool.acquire` is called.
  106. */
  107. initPools() {
  108. const config = this.config;
  109. if (!config.replication) {
  110. this.pool = new Pool({
  111. name: 'sequelize',
  112. create: () => this._connect(config),
  113. destroy: async connection => {
  114. const result = await this._disconnect(connection);
  115. debug('connection destroy');
  116. return result;
  117. },
  118. validate: config.pool.validate,
  119. max: config.pool.max,
  120. min: config.pool.min,
  121. acquireTimeoutMillis: config.pool.acquire,
  122. idleTimeoutMillis: config.pool.idle,
  123. reapIntervalMillis: config.pool.evict,
  124. maxUses: config.pool.maxUses
  125. });
  126. debug(`pool created with max/min: ${config.pool.max}/${config.pool.min}, no replication`);
  127. return;
  128. }
  129. if (!Array.isArray(config.replication.read)) {
  130. config.replication.read = [config.replication.read];
  131. }
  132. // Map main connection config
  133. config.replication.write = _.defaults(config.replication.write, _.omit(config, 'replication'));
  134. // Apply defaults to each read config
  135. config.replication.read = config.replication.read.map(readConfig =>
  136. _.defaults(readConfig, _.omit(this.config, 'replication'))
  137. );
  138. // custom pooling for replication (original author @janmeier)
  139. let reads = 0;
  140. this.pool = {
  141. release: client => {
  142. if (client.queryType === 'read') {
  143. this.pool.read.release(client);
  144. } else {
  145. this.pool.write.release(client);
  146. }
  147. },
  148. acquire: (queryType, useMaster) => {
  149. useMaster = useMaster === undefined ? false : useMaster;
  150. if (queryType === 'SELECT' && !useMaster) {
  151. return this.pool.read.acquire();
  152. }
  153. return this.pool.write.acquire();
  154. },
  155. destroy: connection => {
  156. this.pool[connection.queryType].destroy(connection);
  157. debug('connection destroy');
  158. },
  159. destroyAllNow: async () => {
  160. await Promise.all([
  161. this.pool.read.destroyAllNow(),
  162. this.pool.write.destroyAllNow()
  163. ]);
  164. debug('all connections destroyed');
  165. },
  166. drain: async () => Promise.all([
  167. this.pool.write.drain(),
  168. this.pool.read.drain()
  169. ]),
  170. read: new Pool({
  171. name: 'sequelize:read',
  172. create: async () => {
  173. // round robin config
  174. const nextRead = reads++ % config.replication.read.length;
  175. const connection = await this._connect(config.replication.read[nextRead]);
  176. connection.queryType = 'read';
  177. return connection;
  178. },
  179. destroy: connection => this._disconnect(connection),
  180. validate: config.pool.validate,
  181. max: config.pool.max,
  182. min: config.pool.min,
  183. acquireTimeoutMillis: config.pool.acquire,
  184. idleTimeoutMillis: config.pool.idle,
  185. reapIntervalMillis: config.pool.evict,
  186. maxUses: config.pool.maxUses
  187. }),
  188. write: new Pool({
  189. name: 'sequelize:write',
  190. create: async () => {
  191. const connection = await this._connect(config.replication.write);
  192. connection.queryType = 'write';
  193. return connection;
  194. },
  195. destroy: connection => this._disconnect(connection),
  196. validate: config.pool.validate,
  197. max: config.pool.max,
  198. min: config.pool.min,
  199. acquireTimeoutMillis: config.pool.acquire,
  200. idleTimeoutMillis: config.pool.idle,
  201. reapIntervalMillis: config.pool.evict,
  202. maxUses: config.pool.maxUses
  203. })
  204. };
  205. debug(`pool created with max/min: ${config.pool.max}/${config.pool.min}, with replication`);
  206. }
  207. /**
  208. * Get connection from pool. It sets database version if it's not already set.
  209. * Call pool.acquire to get a connection
  210. *
  211. * @param {object} [options] Pool options
  212. * @param {string} [options.type] Set which replica to use. Available options are `read` and `write`
  213. * @param {boolean} [options.useMaster=false] Force master or write replica to get connection from
  214. *
  215. * @returns {Promise<Connection>}
  216. */
  217. async getConnection(options) {
  218. options = options || {};
  219. if (this.sequelize.options.databaseVersion === 0) {
  220. if (!this.versionPromise) {
  221. this.versionPromise = (async () => {
  222. try {
  223. const connection = await this._connect(this.config.replication.write || this.config);
  224. const _options = {};
  225. _options.transaction = { connection }; // Cheat .query to use our private connection
  226. _options.logging = () => {};
  227. _options.logging.__testLoggingFn = true;
  228. //connection might have set databaseVersion value at initialization,
  229. //avoiding a useless round trip
  230. if (this.sequelize.options.databaseVersion === 0) {
  231. const version = await this.sequelize.databaseVersion(_options);
  232. const parsedVersion = _.get(semver.coerce(version), 'version') || version;
  233. this.sequelize.options.databaseVersion = semver.valid(parsedVersion)
  234. ? parsedVersion
  235. : this.dialect.defaultVersion;
  236. }
  237. if (semver.lt(this.sequelize.options.databaseVersion, this.dialect.defaultVersion)) {
  238. deprecations.unsupportedEngine();
  239. debug(`Unsupported database engine version ${this.sequelize.options.databaseVersion}`);
  240. }
  241. this.versionPromise = null;
  242. return await this._disconnect(connection);
  243. } catch (err) {
  244. this.versionPromise = null;
  245. throw err;
  246. }
  247. })();
  248. }
  249. await this.versionPromise;
  250. }
  251. let result;
  252. try {
  253. result = await this.pool.acquire(options.type, options.useMaster);
  254. } catch (error) {
  255. if (error instanceof TimeoutError) throw new errors.ConnectionAcquireTimeoutError(error);
  256. throw error;
  257. }
  258. debug('connection acquired');
  259. return result;
  260. }
  261. /**
  262. * Release a pooled connection so it can be utilized by other connection requests
  263. *
  264. * @param {Connection} connection
  265. *
  266. * @returns {Promise}
  267. */
  268. async releaseConnection(connection) {
  269. this.pool.release(connection);
  270. debug('connection released');
  271. }
  272. /**
  273. * Call dialect library to get connection
  274. *
  275. * @param {*} config Connection config
  276. * @private
  277. * @returns {Promise<Connection>}
  278. */
  279. async _connect(config) {
  280. await this.sequelize.runHooks('beforeConnect', config);
  281. const connection = await this.dialect.connectionManager.connect(config);
  282. await this.sequelize.runHooks('afterConnect', connection, config);
  283. return connection;
  284. }
  285. /**
  286. * Call dialect library to disconnect a connection
  287. *
  288. * @param {Connection} connection
  289. * @private
  290. * @returns {Promise}
  291. */
  292. async _disconnect(connection) {
  293. await this.sequelize.runHooks('beforeDisconnect', connection);
  294. await this.dialect.connectionManager.disconnect(connection);
  295. return this.sequelize.runHooks('afterDisconnect', connection);
  296. }
  297. /**
  298. * Determine if a connection is still valid or not
  299. *
  300. * @param {Connection} connection
  301. *
  302. * @returns {boolean}
  303. */
  304. _validate(connection) {
  305. if (!this.dialect.connectionManager.validate) {
  306. return true;
  307. }
  308. return this.dialect.connectionManager.validate(connection);
  309. }
  310. }
  311. module.exports = ConnectionManager;
  312. module.exports.ConnectionManager = ConnectionManager;
  313. module.exports.default = ConnectionManager;