connection-manager.js 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320
  1. 'use strict';
  2. const _ = require('lodash');
  3. const AbstractConnectionManager = require('../abstract/connection-manager');
  4. const { logger } = require('../../utils/logger');
  5. const debug = logger.debugContext('connection:pg');
  6. const sequelizeErrors = require('../../errors');
  7. const semver = require('semver');
  8. const dataTypes = require('../../data-types');
  9. const moment = require('moment-timezone');
  10. const { promisify } = require('util');
  11. class ConnectionManager extends AbstractConnectionManager {
  12. constructor(dialect, sequelize) {
  13. sequelize.config.port = sequelize.config.port || 5432;
  14. super(dialect, sequelize);
  15. const pgLib = this._loadDialectModule('pg');
  16. this.lib = this.sequelize.config.native ? pgLib.native : pgLib;
  17. this._clearDynamicOIDs();
  18. this._clearTypeParser();
  19. this.refreshTypeParser(dataTypes.postgres);
  20. }
  21. // Expose this as a method so that the parsing may be updated when the user has added additional, custom types
  22. _refreshTypeParser(dataType) {
  23. const arrayParserBuilder = parser => {
  24. return value => this.lib.types.arrayParser.create(value, parser).parse();
  25. };
  26. const rangeParserBuilder = parser => {
  27. return value => dataType.parse(value, { parser });
  28. };
  29. // Set range parsers
  30. if (dataType.key.toLowerCase() === 'range') {
  31. for (const name in this.nameOidMap) {
  32. const entry = this.nameOidMap[name];
  33. if (! entry.rangeOid) continue;
  34. const rangeParser = rangeParserBuilder(this.getTypeParser(entry.oid));
  35. const arrayRangeParser = arrayParserBuilder(rangeParser);
  36. this.oidParserMap.set(entry.rangeOid, rangeParser);
  37. if (! entry.arrayRangeOid) continue;
  38. this.oidParserMap.set(entry.arrayRangeOid, arrayRangeParser);
  39. }
  40. return;
  41. }
  42. // Create parsers for normal or enum data types
  43. const parser = value => dataType.parse(value);
  44. const arrayParser = arrayParserBuilder(parser);
  45. // Set enum parsers
  46. if (dataType.key.toLowerCase() === 'enum') {
  47. this.enumOids.oids.forEach(oid => {
  48. this.oidParserMap.set(oid, parser);
  49. });
  50. this.enumOids.arrayOids.forEach(arrayOid => {
  51. this.oidParserMap.set(arrayOid, arrayParser);
  52. });
  53. return;
  54. }
  55. // Set parsers for normal data types
  56. dataType.types.postgres.forEach(name => {
  57. if (! this.nameOidMap[name]) return;
  58. this.oidParserMap.set(this.nameOidMap[name].oid, parser);
  59. if (! this.nameOidMap[name].arrayOid) return;
  60. this.oidParserMap.set(this.nameOidMap[name].arrayOid, arrayParser);
  61. });
  62. }
  63. _clearTypeParser() {
  64. this.oidParserMap = new Map();
  65. }
  66. getTypeParser(oid, ...args) {
  67. if (this.oidParserMap.get(oid)) return this.oidParserMap.get(oid);
  68. return this.lib.types.getTypeParser(oid, ...args);
  69. }
  70. async connect(config) {
  71. config.user = config.username;
  72. const connectionConfig = _.pick(config, [
  73. 'user', 'password', 'host', 'database', 'port'
  74. ]);
  75. connectionConfig.types = {
  76. getTypeParser: ConnectionManager.prototype.getTypeParser.bind(this)
  77. };
  78. if (config.dialectOptions) {
  79. _.merge(connectionConfig,
  80. _.pick(config.dialectOptions, [
  81. // see [http://www.postgresql.org/docs/9.3/static/runtime-config-logging.html#GUC-APPLICATION-NAME]
  82. 'application_name',
  83. // choose the SSL mode with the PGSSLMODE environment variable
  84. // object format: [https://github.com/brianc/node-postgres/blob/ee19e74ffa6309c9c5e8e01746261a8f651661f8/lib/connection.js#L79]
  85. // see also [http://www.postgresql.org/docs/9.3/static/libpq-ssl.html]
  86. 'ssl',
  87. // In addition to the values accepted by the corresponding server,
  88. // you can use "auto" to determine the right encoding from the
  89. // current locale in the client (LC_CTYPE environment variable on Unix systems)
  90. 'client_encoding',
  91. // !! DO NOT SET THIS TO TRUE !!
  92. // (unless you know what you're doing)
  93. // see [http://www.postgresql.org/message-id/flat/bc9549a50706040852u27633f41ib1e6b09f8339d845@mail.gmail.com#bc9549a50706040852u27633f41ib1e6b09f8339d845@mail.gmail.com]
  94. 'binary',
  95. // This should help with backends incorrectly considering idle clients to be dead and prematurely disconnecting them.
  96. // this feature has been added in pg module v6.0.0, check pg/CHANGELOG.md
  97. 'keepAlive',
  98. // Times out queries after a set time in milliseconds. Added in pg v7.3
  99. 'statement_timeout',
  100. // Terminate any session with an open transaction that has been idle for longer than the specified duration in milliseconds. Added in pg v7.17.0 only supported in postgres >= 10
  101. 'idle_in_transaction_session_timeout'
  102. ]));
  103. }
  104. const connection = await new Promise((resolve, reject) => {
  105. let responded = false;
  106. const connection = new this.lib.Client(connectionConfig);
  107. const parameterHandler = message => {
  108. switch (message.parameterName) {
  109. case 'server_version':
  110. if (this.sequelize.options.databaseVersion === 0) {
  111. const version = semver.coerce(message.parameterValue).version;
  112. this.sequelize.options.databaseVersion = semver.valid(version)
  113. ? version
  114. : this.dialect.defaultVersion;
  115. }
  116. break;
  117. case 'standard_conforming_strings':
  118. connection['standard_conforming_strings'] = message.parameterValue;
  119. break;
  120. }
  121. };
  122. const endHandler = () => {
  123. debug('connection timeout');
  124. if (!responded) {
  125. reject(new sequelizeErrors.ConnectionTimedOutError(new Error('Connection timed out')));
  126. }
  127. };
  128. // If we didn't ever hear from the client.connect() callback the connection timeout
  129. // node-postgres does not treat this as an error since no active query was ever emitted
  130. connection.once('end', endHandler);
  131. if (!this.sequelize.config.native) {
  132. // Receive various server parameters for further configuration
  133. connection.connection.on('parameterStatus', parameterHandler);
  134. }
  135. connection.connect(err => {
  136. responded = true;
  137. if (!this.sequelize.config.native) {
  138. // remove parameter handler
  139. connection.connection.removeListener('parameterStatus', parameterHandler);
  140. }
  141. if (err) {
  142. if (err.code) {
  143. switch (err.code) {
  144. case 'ECONNREFUSED':
  145. reject(new sequelizeErrors.ConnectionRefusedError(err));
  146. break;
  147. case 'ENOTFOUND':
  148. reject(new sequelizeErrors.HostNotFoundError(err));
  149. break;
  150. case 'EHOSTUNREACH':
  151. reject(new sequelizeErrors.HostNotReachableError(err));
  152. break;
  153. case 'EINVAL':
  154. reject(new sequelizeErrors.InvalidConnectionError(err));
  155. break;
  156. default:
  157. reject(new sequelizeErrors.ConnectionError(err));
  158. break;
  159. }
  160. } else {
  161. reject(new sequelizeErrors.ConnectionError(err));
  162. }
  163. } else {
  164. debug('connection acquired');
  165. connection.removeListener('end', endHandler);
  166. resolve(connection);
  167. }
  168. });
  169. });
  170. let query = '';
  171. if (this.sequelize.options.standardConformingStrings !== false && connection['standard_conforming_strings'] !== 'on') {
  172. // Disable escape characters in strings
  173. // see https://github.com/sequelize/sequelize/issues/3545 (security issue)
  174. // see https://www.postgresql.org/docs/current/static/runtime-config-compatible.html#GUC-STANDARD-CONFORMING-STRINGS
  175. query += 'SET standard_conforming_strings=on;';
  176. }
  177. if (this.sequelize.options.clientMinMessages !== false) {
  178. query += `SET client_min_messages TO ${this.sequelize.options.clientMinMessages};`;
  179. }
  180. if (!this.sequelize.config.keepDefaultTimezone) {
  181. const isZone = !!moment.tz.zone(this.sequelize.options.timezone);
  182. if (isZone) {
  183. query += `SET TIME ZONE '${this.sequelize.options.timezone}';`;
  184. } else {
  185. query += `SET TIME ZONE INTERVAL '${this.sequelize.options.timezone}' HOUR TO MINUTE;`;
  186. }
  187. }
  188. if (query) {
  189. await connection.query(query);
  190. }
  191. if (Object.keys(this.nameOidMap).length === 0 &&
  192. this.enumOids.oids.length === 0 &&
  193. this.enumOids.arrayOids.length === 0) {
  194. await this._refreshDynamicOIDs(connection);
  195. }
  196. // Don't let a Postgres restart (or error) to take down the whole app
  197. connection.on('error', error => {
  198. connection._invalid = true;
  199. debug(`connection error ${error.code || error.message}`);
  200. this.pool.destroy(connection);
  201. });
  202. return connection;
  203. }
  204. async disconnect(connection) {
  205. if (connection._ending) {
  206. debug('connection tried to disconnect but was already at ENDING state');
  207. return;
  208. }
  209. return await promisify(callback => connection.end(callback))();
  210. }
  211. validate(connection) {
  212. return !connection._invalid && !connection._ending;
  213. }
  214. async _refreshDynamicOIDs(connection) {
  215. const databaseVersion = this.sequelize.options.databaseVersion;
  216. const supportedVersion = '8.3.0';
  217. // Check for supported version
  218. if ( (databaseVersion && semver.gte(databaseVersion, supportedVersion)) === false) {
  219. return;
  220. }
  221. const results = await (connection || this.sequelize).query(
  222. 'WITH ranges AS (' +
  223. ' SELECT pg_range.rngtypid, pg_type.typname AS rngtypname,' +
  224. ' pg_type.typarray AS rngtyparray, pg_range.rngsubtype' +
  225. ' FROM pg_range LEFT OUTER JOIN pg_type ON pg_type.oid = pg_range.rngtypid' +
  226. ')' +
  227. 'SELECT pg_type.typname, pg_type.typtype, pg_type.oid, pg_type.typarray,' +
  228. ' ranges.rngtypname, ranges.rngtypid, ranges.rngtyparray' +
  229. ' FROM pg_type LEFT OUTER JOIN ranges ON pg_type.oid = ranges.rngsubtype' +
  230. ' WHERE (pg_type.typtype IN(\'b\', \'e\'));'
  231. );
  232. let result = Array.isArray(results) ? results.pop() : results;
  233. // When searchPath is prepended then two statements are executed and the result is
  234. // an array of those two statements. First one is the SET search_path and second is
  235. // the SELECT query result.
  236. if (Array.isArray(result)) {
  237. if (result[0].command === 'SET') {
  238. result = result.pop();
  239. }
  240. }
  241. const newNameOidMap = {};
  242. const newEnumOids = { oids: [], arrayOids: [] };
  243. for (const row of result.rows) {
  244. // Mapping enums, handled separatedly
  245. if (row.typtype === 'e') {
  246. newEnumOids.oids.push(row.oid);
  247. if (row.typarray) newEnumOids.arrayOids.push(row.typarray);
  248. continue;
  249. }
  250. // Mapping base types and their arrays
  251. newNameOidMap[row.typname] = { oid: row.oid };
  252. if (row.typarray) newNameOidMap[row.typname].arrayOid = row.typarray;
  253. // Mapping ranges(of base types) and their arrays
  254. if (row.rngtypid) {
  255. newNameOidMap[row.typname].rangeOid = row.rngtypid;
  256. if (row.rngtyparray) newNameOidMap[row.typname].arrayRangeOid = row.rngtyparray;
  257. }
  258. }
  259. // Replace all OID mappings. Avoids temporary empty OID mappings.
  260. this.nameOidMap = newNameOidMap;
  261. this.enumOids = newEnumOids;
  262. this.refreshTypeParser(dataTypes.postgres);
  263. }
  264. _clearDynamicOIDs() {
  265. this.nameOidMap = {};
  266. this.enumOids = { oids: [], arrayOids: [] };
  267. }
  268. }
  269. module.exports = ConnectionManager;
  270. module.exports.ConnectionManager = ConnectionManager;
  271. module.exports.default = ConnectionManager;