connection.js 39 KB


  1. 'use strict';
  2. const EventEmitter = require('events');
  3. const util = require('util');
  4. const Queue = require('denque');
  5. const Net = require('net');
  6. const PacketInputStream = require('./io/packet-input-stream');
  7. const PacketOutputStream = require('./io/packet-output-stream');
  8. const CompressionInputStream = require('./io/compression-input-stream');
  9. const CompressionOutputStream = require('./io/compression-output-stream');
  10. const ServerStatus = require('./const/server-status');
  11. const ConnectionInformation = require('./misc/connection-information');
  12. const tls = require('tls');
  13. const Errors = require('./misc/errors');
  14. const Utils = require('./misc/utils');
  15. const Capabilities = require('./const/capabilities');
  16. const moment = require('moment-timezone');
  17. /*commands*/
  18. const Handshake = require('./cmd/handshake/handshake');
  19. const Quit = require('./cmd/quit');
  20. const Ping = require('./cmd/ping');
  21. const Reset = require('./cmd/reset');
  22. const Query = require('./cmd/query');
  23. const BatchRewrite = require('./cmd/batch-rewrite');
  24. const BatchBulk = require('./cmd/batch-bulk');
  25. const Stream = require('./cmd/stream');
  26. const ChangeUser = require('./cmd/change-user');
  27. const { Status } = require('./const/connection_status');
  28. /**
  29. * New Connection instance.
  30. *
  31. * @param options connection options
  32. * @returns Connection instance
  33. * @constructor
  34. * @fires Connection#connect
  35. * @fires Connection#end
  36. * @fires Connection#error
  37. *
  38. */
  39. function Connection(options) {
  40. //*****************************************************************
  41. // public API functions
  42. //*****************************************************************
  43. /**
  44. * Connect event
  45. *
  46. * @returns {Promise} promise
  47. */
  48. this.connect = () => {
  49. switch (_status) {
  50. case Status.NOT_CONNECTED:
  51. _status = Status.CONNECTING;
  52. return new Promise(function (resolve, reject) {
  53. _registerHandshakeCmd(resolve, reject);
  54. });
  55. case Status.CLOSING:
  56. case Status.CLOSED:
  57. return Promise.reject(
  58. Errors.createError(
  59. 'Connection closed',
  60. true,
  61. info,
  62. '08S01',
  63. Errors.ER_CONNECTION_ALREADY_CLOSED
  64. )
  65. );
  66. case Status.CONNECTING:
  67. case Status.AUTHENTICATING:
  68. return Promise.reject(
  69. Errors.createError(
  70. 'Connection is already connecting',
  71. true,
  72. info,
  73. '08S01',
  74. Errors.ER_ALREADY_CONNECTING
  75. )
  76. );
  77. }
  78. //status Connected
  79. return Promise.resolve(this);
  80. };
  81. /**
  82. * Permit to change user during connection.
  83. * All user variables will be reset, Prepare commands will be released.
  84. * !!! mysql has a bug when CONNECT_ATTRS capability is set, that is default !!!!
  85. *
  86. * @param options connection options
  87. * @returns {Promise} promise
  88. */
  89. this.changeUser = (options) => {
  90. if (!info.isMariaDB()) {
  91. return Promise.reject(
  92. Errors.createError(
  93. 'method changeUser not available for MySQL server due to Bug #83472',
  94. false,
  95. info,
  96. '0A000',
  97. Errors.ER_MYSQL_CHANGE_USER_BUG
  98. )
  99. );
  100. }
  101. return new Promise(function (resolve, reject) {
  102. _addCommand(
  103. new ChangeUser(
  104. options,
  105. (res) => {
  106. if (options && options.collation) opts.collation = options.collation;
  107. resolve(res);
  108. },
  109. _authFailHandler.bind(this, reject),
  110. _addCommand.bind(this)
  111. )
  112. );
  113. });
  114. };
  115. /**
  116. * Start transaction
  117. *
  118. * @returns {Promise} promise
  119. */
  120. this.beginTransaction = () => {
  121. return this.query('START TRANSACTION');
  122. };
  123. /**
  124. * Commit a transaction.
  125. *
  126. * @returns {Promise} command if commit was needed only
  127. */
  128. this.commit = () => {
  129. return _changeTransaction('COMMIT');
  130. };
  131. /**
  132. * Roll back a transaction.
  133. *
  134. * @returns {Promise} promise
  135. */
  136. this.rollback = () => {
  137. return _changeTransaction('ROLLBACK');
  138. };
  139. /**
  140. * Execute query using text protocol.
  141. *
  142. * @param sql sql parameter Object can be used to supersede default option.
  143. * Object must then have sql property.
  144. * @param values object / array of placeholder values (not mandatory)
  145. * @returns {Promise} promise
  146. */
  147. this._queryPromise = (sql, values) => {
  148. let _cmdOpt,
  149. _sql,
  150. _values = values;
  151. if (typeof sql === 'object') {
  152. _cmdOpt = sql;
  153. _sql = _cmdOpt.sql;
  154. if (_cmdOpt.values) _values = _cmdOpt.values;
  155. } else {
  156. _sql = sql;
  157. }
  158. return new Promise(function (resolve, reject) {
  159. const cmd = new Query(resolve, reject, _cmdOpt, opts, _sql, _values);
  160. if (opts.trace) Error.captureStackTrace(cmd);
  161. _addCommand(cmd);
  162. });
  163. };
  164. /**
  165. * Execute batch using text protocol.
  166. *
  167. * @param sql sql parameter Object can be used to supersede default option.
  168. * Object must then have sql property.
  169. * @param initialValues object / array of placeholder values (not mandatory)
  170. * @returns {Promise} promise
  171. */
  172. this.batch = (sql, initialValues) => {
  173. let _options,
  174. _sql,
  175. _values = initialValues;
  176. if (typeof sql === 'object') {
  177. _options = sql;
  178. _sql = _options.sql;
  179. if (_options.values) _values = _options.values;
  180. } else {
  181. _sql = sql;
  182. }
  183. if (!_values) {
  184. return Promise.reject(
  185. Errors.createError(
  186. 'Batch must have values set\nsql: ' + _sql + ' - parameters:[]',
  187. false,
  188. info,
  189. 'HY000',
  190. Errors.ER_BATCH_WITH_NO_VALUES
  191. )
  192. );
  193. }
  194. const vals = Array.isArray(_values) ? _values : [_values];
  195. return new Promise(function (resolve, reject) {
  196. let useBulk = canUseBulk(vals);
  197. const cmd = useBulk
  198. ? new BatchBulk(resolve, reject, _options, opts, _sql, vals)
  199. : new BatchRewrite(resolve, reject, _options, opts, _sql, vals);
  200. if (opts.trace) Error.captureStackTrace(cmd);
  201. _addCommand(cmd);
  202. });
  203. };
  204. /**
  205. * Execute query returning a Readable Object that will emit columns/data/end/error events
  206. * to permit streaming big result-set
  207. *
  208. * @param sql sql parameter Object can be used to supersede default option.
  209. * Object must then have sql property.
  210. * @param values object / array of placeholder values (not mandatory)
  211. * @returns {Readable}
  212. */
  213. this.queryStream = (sql, values) => {
  214. let _cmdOpt,
  215. _sql,
  216. _values = values;
  217. if (typeof sql === 'object') {
  218. _cmdOpt = sql;
  219. _sql = _cmdOpt.sql;
  220. if (sql.values) _values = sql.values;
  221. } else {
  222. _sql = sql;
  223. }
  224. const cmd = new Stream(_cmdOpt, opts, _sql, _values, _socket);
  225. if (opts.trace) Error.captureStackTrace(cmd);
  226. _addCommand(cmd);
  227. return cmd.inStream;
  228. };
  229. /**
  230. * Send an empty MySQL packet to ensure connection is active, and reset @@wait_timeout
  231. * @param timeout (optional) timeout value in ms. If reached, throw error and close connection
  232. * @returns {Promise} promise
  233. */
  234. this.ping = (timeout) => {
  235. return new Promise(function (resolve, reject) {
  236. if (timeout) {
  237. if (timeout < 0) {
  238. reject(
  239. Errors.createError(
  240. 'Ping cannot have negative timeout value',
  241. false,
  242. info,
  243. '0A000',
  244. Errors.ER_BAD_PARAMETER_VALUE
  245. )
  246. );
  247. return;
  248. }
  249. const tOut = setTimeout(() => {
  250. reject(Errors.createError('Ping timeout', true, info, '0A000', Errors.ER_PING_TIMEOUT));
  251. // close connection
  252. _addCommand = _addCommandDisabled;
  253. clearTimeout(_timeout);
  254. if (_status !== Status.CLOSING && _status !== Status.CLOSED) {
  255. _sendQueue.clear();
  256. _status = Status.CLOSED;
  257. _socket.destroy();
  258. }
  259. _clear();
  260. }, timeout);
  261. return _addCommand(
  262. new Ping(
  263. () => {
  264. clearTimeout(tOut);
  265. resolve();
  266. },
  267. (err) => {
  268. clearTimeout(tOut);
  269. reject(err);
  270. }
  271. )
  272. );
  273. }
  274. return _addCommand(new Ping(resolve, reject));
  275. });
  276. };
  277. /**
  278. * Send a reset command that will
  279. * - rollback any open transaction
  280. * - reset transaction isolation level
  281. * - reset session variables
  282. * - delete user variables
  283. * - remove temporary tables
  284. * - remove all PREPARE statement
  285. *
  286. * @returns {Promise} promise
  287. */
  288. this.reset = () => {
  289. if (
  290. (info.isMariaDB() && info.hasMinVersion(10, 2, 4)) ||
  291. (!info.isMariaDB() && info.hasMinVersion(5, 7, 3))
  292. ) {
  293. return new Promise(function (resolve, reject) {
  294. return _addCommand(new Reset(resolve, reject));
  295. });
  296. }
  297. return Promise.reject(
  298. new Error(
  299. 'Reset command not permitted for server ' +
  300. this.info.serverVersion +
  301. ' (requires server MariaDB version 10.2.4+ or MySQL 5.7.3+)'
  302. )
  303. );
  304. };
  305. /**
  306. * Indicates the state of the connection as the driver knows it
  307. * @returns {boolean}
  308. */
  309. this.isValid = () => {
  310. return _status === Status.CONNECTED;
  311. };
  312. /**
  313. * Terminate connection gracefully.
  314. *
  315. * @returns {Promise} promise
  316. */
  317. this.end = () => {
  318. _addCommand = _addCommandDisabled;
  319. clearTimeout(_timeout);
  320. if (
  321. _status !== Status.CLOSING &&
  322. _status !== Status.CLOSED &&
  323. _status !== Status.NOT_CONNECTED
  324. ) {
  325. _status = Status.CLOSING;
  326. return new Promise(function (resolve, reject) {
  327. const ended = () => {
  328. _status = Status.CLOSED;
  329. _socket.destroy();
  330. _socket.unref();
  331. _clear();
  332. _receiveQueue.clear();
  333. resolve();
  334. };
  335. const quitCmd = new Quit(ended, ended);
  336. _sendQueue.push(quitCmd);
  337. _receiveQueue.push(quitCmd);
  338. if (_sendQueue.length === 1) {
  339. process.nextTick(_nextSendCmd.bind(this));
  340. }
  341. });
  342. }
  343. return Promise.resolve();
  344. };
  345. /**
  346. * Alias for destroy.
  347. */
  348. this.close = function () {
  349. this.destroy();
  350. };
  351. /**
  352. * Force connection termination by closing the underlying socket and killing server process if any.
  353. */
  354. this.destroy = () => {
  355. _addCommand = _addCommandDisabled;
  356. clearTimeout(_timeout);
  357. if (_status !== Status.CLOSING && _status !== Status.CLOSED) {
  358. _status = Status.CLOSING;
  359. _sendQueue.clear();
  360. if (_receiveQueue.length > 0) {
  361. //socket is closed, but server may still be processing a huge select
  362. //only possibility is to kill process by another thread
  363. //TODO reuse a pool connection to avoid connection creation
  364. const self = this;
  365. const killCon = new Connection(opts);
  366. killCon
  367. .connect()
  368. .then(() => {
  369. //*************************************************
  370. //kill connection
  371. //*************************************************
  372. const killResHandler = () => {
  373. const destroyError = Errors.createError(
  374. 'Connection destroyed, command was killed',
  375. true,
  376. info,
  377. '08S01',
  378. Errors.ER_CMD_NOT_EXECUTED_DESTROYED
  379. );
  380. socketErrorDispatchToQueries(destroyError);
  381. process.nextTick(() => {
  382. if (_socket) _socket.destroy();
  383. });
  384. _status = Status.CLOSED;
  385. killCon.end().catch(() => {});
  386. };
  387. killCon
  388. .query('KILL ' + info.threadId)
  389. .then(killResHandler)
  390. .catch(killResHandler);
  391. })
  392. .catch((err) => {
  393. //*************************************************
  394. //failing to create a kill connection, end normally
  395. //*************************************************
  396. const ended = () => {
  397. let sock = _socket;
  398. _clear();
  399. _status = Status.CLOSED;
  400. setImmediate(resolve);
  401. sock.destroy();
  402. _receiveQueue.clear();
  403. };
  404. const quitCmd = new Quit(ended, ended);
  405. _sendQueue.push(quitCmd);
  406. _receiveQueue.push(quitCmd);
  407. if (_sendQueue.length === 1) {
  408. process.nextTick(_nextSendCmd.bind(self));
  409. }
  410. });
  411. } else {
  412. _status = Status.CLOSED;
  413. _socket.destroy();
  414. }
  415. }
  416. _clear();
  417. };
  418. this.pause = () => {
  419. _socket.pause();
  420. };
  421. this.resume = () => {
  422. _socket.resume();
  423. };
  424. this.format = (sql, values) => {
  425. throw Errors.createError(
  426. '"Connection.format intentionally not implemented. please use Connection.query(sql, values), it will be more secure and faster',
  427. false,
  428. info,
  429. '0A000',
  430. Errors.ER_NOT_IMPLEMENTED_FORMAT
  431. );
  432. };
  433. //*****************************************************************
  434. // additional public methods
  435. //*****************************************************************
  436. /**
  437. * return current connected server version information.
  438. *
  439. * @returns {*}
  440. */
  441. this.serverVersion = () => {
  442. if (!info.serverVersion)
  443. throw new Error('cannot know if server information until connection is established');
  444. return info.serverVersion.raw;
  445. };
  446. /**
  447. * Change option "debug" during connection.
  448. * @param val debug value
  449. */
  450. this.debug = (val) => {
  451. opts.debug = val;
  452. opts.emit('debug', opts.logPackets, opts.debug);
  453. };
  454. this.debugCompress = (val) => {
  455. opts.debugCompress = val;
  456. };
  457. //*****************************************************************
  458. // internal public testing methods
  459. //*****************************************************************
  460. function TestMethods() {}
  461. TestMethods.prototype.getCollation = () => {
  462. return opts.collation;
  463. };
  464. TestMethods.prototype.getSocket = () => {
  465. return _socket;
  466. };
  467. this.__tests = new TestMethods();
  468. //*****************************************************************
  469. // internal methods
  470. //*****************************************************************
  471. this._status = () => {
  472. return _status;
  473. };
  474. /**
  475. * Execute query using text protocol with callback emit columns/data/end/error
  476. * events to permit streaming big result-set
  477. *
  478. * @param sql sql parameter Object can be used to supersede default option.
  479. * Object must then have sql property.
  480. * @param values object / array of placeholder values (not mandatory)
  481. * @param cb callback
  482. * @returns {Query} query
  483. */
  484. this._queryCallback = (sql, values, cb) => {
  485. let _cmdOpts,
  486. _sql,
  487. _values = values,
  488. _cb = cb;
  489. if (typeof values === 'function') {
  490. _cb = values;
  491. _values = undefined;
  492. }
  493. if (typeof sql === 'object') {
  494. _cmdOpts = sql;
  495. _sql = _cmdOpts.sql;
  496. if (sql.values) _values = sql.values;
  497. } else {
  498. _sql = sql;
  499. }
  500. let cmd;
  501. if (_cb) {
  502. const resolve = (rows) => {
  503. const meta = rows.meta;
  504. delete rows.meta;
  505. _cb(null, rows, meta);
  506. };
  507. cmd = new Query(resolve, _cb, _cmdOpts, opts, _sql, _values);
  508. } else {
  509. cmd = new Query(
  510. () => {},
  511. () => {},
  512. _cmdOpts,
  513. opts,
  514. _sql,
  515. _values
  516. );
  517. }
  518. cmd.handleNewRows = (row) => {
  519. cmd._rows[cmd._responseIndex].push(row);
  520. cmd.emit('data', row);
  521. };
  522. if (opts.trace) Error.captureStackTrace(cmd);
  523. _addCommand(cmd);
  524. return cmd;
  525. };
  526. /**
  527. * Execute a batch using text protocol with callback emit columns/data/end/error
  528. * events to permit streaming big result-set
  529. *
  530. * @param sql sql parameter Object can be used to supersede default option.
  531. * Object must then have sql property.
  532. * @param values object / array of placeholder values (not mandatory)
  533. * @param cb callback
  534. * @returns {Query} query
  535. */
  536. this._batchCallback = (sql, values, cb) => {
  537. let _cmdOpts,
  538. _sql,
  539. _values = values,
  540. _cb = cb;
  541. if (typeof values === 'function') {
  542. _cb = values;
  543. _values = undefined;
  544. }
  545. if (typeof sql === 'object') {
  546. _cmdOpts = sql;
  547. _sql = _cmdOpts.sql;
  548. if (sql.values) _values = sql.values;
  549. } else {
  550. _sql = sql;
  551. }
  552. if (_values !== undefined) {
  553. _values = Array.isArray(_values) ? _values : [_values];
  554. }
  555. let cmd;
  556. if (!_values) {
  557. if (_cb) {
  558. _cb(
  559. Errors.createError(
  560. 'Batch must have values set\nsql: ' + _sql + ' - parameters:[]',
  561. false,
  562. info,
  563. 'HY000',
  564. Errors.ER_BATCH_WITH_NO_VALUES
  565. )
  566. );
  567. }
  568. return null;
  569. }
  570. let useBulk = canUseBulk(_values);
  571. const fct = useBulk ? BatchBulk : BatchRewrite;
  572. if (_cb) {
  573. const resolve = (rows) => {
  574. const meta = rows.meta;
  575. delete rows.meta;
  576. _cb(null, rows, meta);
  577. };
  578. cmd = new fct(resolve, _cb, _cmdOpts, opts, _sql, _values);
  579. } else {
  580. cmd = new fct(
  581. () => {},
  582. () => {},
  583. _cmdOpts,
  584. opts,
  585. _sql,
  586. _values
  587. );
  588. }
  589. cmd.handleNewRows = (row) => {
  590. cmd._rows[cmd._responseIndex].push(row);
  591. cmd.emit('data', row);
  592. };
  593. if (opts.trace) Error.captureStackTrace(cmd);
  594. _addCommand(cmd);
  595. return cmd;
  596. };
  597. /**
  598. * Use Batch rewrite or MariaDB bulk protocol.
  599. *
  600. * @param values current batch values
  601. * @return {boolean} indicating if must use rewrite or bulk
  602. */
  603. const canUseBulk = (values) => {
  604. // not using info.isMariaDB() directly in case of callback use,
  605. // without connection beeing completly finished.
  606. let useBulk =
  607. info.serverVersion &&
  608. info.serverVersion.mariaDb &&
  609. info.hasMinVersion(10, 2, 7) &&
  610. opts.bulk &&
  611. (info.serverCapabilities & Capabilities.MARIADB_CLIENT_STMT_BULK_OPERATIONS) > BigInt(0);
  612. if (useBulk) {
  613. //ensure that there is no stream object
  614. if (values !== undefined) {
  615. if (!opts.namedPlaceholders) {
  616. //ensure that all parameters have same length
  617. //single array is considered as an array of single element.
  618. const paramLen = Array.isArray(values[0]) ? values[0].length : values[0] ? 1 : 0;
  619. if (paramLen == 0) return false;
  620. for (let r = 0; r < values.length; r++) {
  621. let row = values[r];
  622. if (!Array.isArray(row)) row = [row];
  623. if (paramLen !== row.length) {
  624. return false;
  625. }
  626. for (let j = 0; j < paramLen; j++) {
  627. const val = row[j];
  628. if (
  629. val !== null &&
  630. typeof val === 'object' &&
  631. typeof val.pipe === 'function' &&
  632. typeof val.read === 'function'
  633. ) {
  634. return false;
  635. }
  636. }
  637. }
  638. } else {
  639. for (let r = 0; r < values.length; r++) {
  640. let row = values[r];
  641. const keys = Object.keys(row);
  642. for (let j = 0; j < keys.length; j++) {
  643. const val = row[keys[j]];
  644. if (
  645. val !== null &&
  646. typeof val === 'object' &&
  647. typeof val.pipe === 'function' &&
  648. typeof val.read === 'function'
  649. ) {
  650. return false;
  651. }
  652. }
  653. }
  654. }
  655. }
  656. }
  657. return useBulk;
  658. };
  659. /**
  660. * Add handshake command to queue.
  661. *
  662. * @private
  663. */
  664. const _registerHandshakeCmd = (resolve, rejected) => {
  665. const _authFail = _authFailHandler.bind(this, rejected);
  666. const _authSucceed = _authSucceedHandler.bind(this, resolve, _authFail);
  667. const handshake = new Handshake(
  668. _authSucceed,
  669. _authFail,
  670. _createSecureContext.bind(this, _authFail),
  671. _addCommandEnable.bind(this),
  672. _getSocket
  673. );
  674. Error.captureStackTrace(handshake);
  675. handshake.once('end', () => {
  676. process.nextTick(_nextSendCmd);
  677. });
  678. _receiveQueue.push(handshake);
  679. _initSocket(_authFail);
  680. };
  681. const _executeSessionVariableQuery = () => {
  682. if (opts.sessionVariables) {
  683. const values = [];
  684. let sessionQuery = 'set ';
  685. let keys = Object.keys(opts.sessionVariables);
  686. if (keys.length > 0) {
  687. return new Promise(function (resolve, reject) {
  688. for (let k = 0; k < keys.length; ++k) {
  689. sessionQuery +=
  690. (k !== 0 ? ',' : '') + '@@' + keys[k].replace(/[^a-z0-9_]/gi, '') + '=?';
  691. values.push(opts.sessionVariables[keys[k]]);
  692. }
  693. const errorHandling = (initialErr) => {
  694. reject(
  695. Errors.createError(
  696. 'Error setting session variable (value ' +
  697. JSON.stringify(opts.sessionVariables) +
  698. '). Error: ' +
  699. initialErr.message,
  700. true,
  701. info,
  702. '08S01',
  703. Errors.ER_SETTING_SESSION_ERROR,
  704. null
  705. )
  706. );
  707. };
  708. const cmd = new Query(resolve, errorHandling, null, opts, sessionQuery, values);
  709. if (opts.trace) Error.captureStackTrace(cmd);
  710. _addCommand(cmd);
  711. });
  712. }
  713. }
  714. return Promise.resolve();
  715. };
  716. /**
  717. * Asking server timezone if not set in case of 'auto'
  718. * @returns {Promise<void>}
  719. * @private
  720. */
  721. const _checkServerTimezone = () => {
  722. if (opts.timezone === 'auto') {
  723. return this._queryPromise('SELECT @@system_time_zone stz, @@time_zone tz').then((res) => {
  724. const serverTimezone = res[0].tz === 'SYSTEM' ? res[0].stz : res[0].tz;
  725. const serverZone = moment.tz.zone(serverTimezone);
  726. if (serverZone) {
  727. const localTz = moment.tz.guess();
  728. if (serverTimezone === localTz) {
  729. //db server and client use same timezone, avoid any conversion
  730. opts.tz = null;
  731. } else {
  732. opts.localTz = localTz;
  733. opts.tz = serverTimezone;
  734. }
  735. } else {
  736. return Promise.reject(
  737. Errors.createError(
  738. "Automatic timezone setting fails. Server timezone '" +
  739. serverTimezone +
  740. "' does't have a corresponding IANA timezone. Option timezone must be set according to server timezone",
  741. true,
  742. info,
  743. '08S01',
  744. Errors.ER_WRONG_AUTO_TIMEZONE
  745. )
  746. );
  747. }
  748. return Promise.resolve();
  749. });
  750. }
  751. if (opts.tz && !opts.skipSetTimezone) {
  752. let tz = opts.tz;
  753. if (opts.tz === 'Etc/UTC') {
  754. tz = '+00:00';
  755. } else if (opts.tz.startsWith('Etc/GMT')) {
  756. let zone = moment.tz.zone(opts.tz);
  757. tz = zone.abbrs[0] + ':00';
  758. }
  759. return this._queryPromise('SET time_zone=?', tz)
  760. .then((res) => {
  761. return Promise.resolve();
  762. })
  763. .catch((err) => {
  764. console.log(
  765. `warning: setting timezone '${opts.tz}' fails on server.\n look at https://mariadb.com/kb/en/mysql_tzinfo_to_sql/ to load IANA timezone.\nSetting timezone can be disabled with option \`skipSetTimezone\``
  766. );
  767. return Promise.resolve();
  768. });
  769. }
  770. return Promise.resolve();
  771. };
  772. const _checkServerVersion = () => {
  773. if (!opts.forceVersionCheck) {
  774. return Promise.resolve();
  775. }
  776. return this._queryPromise('SELECT @@VERSION AS v').then((res) => {
  777. info.serverVersion.raw = res[0].v;
  778. info.serverVersion.mariaDb = info.serverVersion.raw.includes('MariaDB');
  779. ConnectionInformation.parseVersionString(info);
  780. return Promise.resolve();
  781. });
  782. };
  783. const _executeInitQuery = () => {
  784. if (opts.initSql) {
  785. const initialArr = Array.isArray(opts.initSql) ? opts.initSql : [opts.initSql];
  786. const initialPromises = [];
  787. initialArr.forEach((sql) => {
  788. initialPromises.push(this._queryPromise(sql));
  789. });
  790. return Promise.all(initialPromises).catch((initialErr) => {
  791. return Promise.reject(
  792. Errors.createError(
  793. 'Error executing initial sql command: ' + initialErr.message,
  794. true,
  795. info,
  796. '08S01',
  797. Errors.ER_INITIAL_SQL_ERROR,
  798. null
  799. )
  800. );
  801. });
  802. }
  803. return Promise.resolve();
  804. };
  805. const _executeSessionTimeout = () => {
  806. if (opts.queryTimeout) {
  807. if (info.isMariaDB() && info.hasMinVersion(10, 1, 2)) {
  808. this._queryPromise('SET max_statement_time=' + opts.queryTimeout / 1000).catch(
  809. (initialErr) => {
  810. return Promise.reject(
  811. Errors.createError(
  812. 'Error setting session queryTimeout: ' + initialErr.message,
  813. true,
  814. info,
  815. '08S01',
  816. Errors.ER_INITIAL_TIMEOUT_ERROR,
  817. null
  818. )
  819. );
  820. }
  821. );
  822. } else {
  823. return Promise.reject(
  824. Errors.createError(
  825. 'Can only use queryTimeout for MariaDB server after 10.1.1. queryTimeout value: ' +
  826. opts.queryTimeout,
  827. false,
  828. info,
  829. 'HY000',
  830. Errors.ER_TIMEOUT_NOT_SUPPORTED
  831. )
  832. );
  833. }
  834. }
  835. return Promise.resolve();
  836. };
  837. const _getSocket = () => {
  838. return _socket;
  839. };
  840. /**
  841. * Initialize socket and associate events.
  842. * @private
  843. */
  844. const _initSocket = (authFailHandler) => {
  845. if (opts.socketPath) {
  846. _socket = Net.connect(opts.socketPath);
  847. } else {
  848. _socket = Net.connect(opts.port, opts.host);
  849. }
  850. if (opts.connectTimeout) {
  851. _timeout = setTimeout(
  852. _connectTimeoutReached,
  853. opts.connectTimeout,
  854. authFailHandler,
  855. Date.now()
  856. );
  857. }
  858. const _socketError = _socketErrorHandler.bind(this, authFailHandler);
  859. _socket.on('data', _in.onData.bind(_in));
  860. _socket.on('error', _socketError);
  861. _socket.on('end', _socketError);
  862. _socket.on(
  863. 'connect',
  864. function () {
  865. clearTimeout(_timeout);
  866. if (_status === Status.CONNECTING) {
  867. _status = Status.AUTHENTICATING;
  868. _socketConnected = true;
  869. _socket.setTimeout(opts.socketTimeout, _socketTimeoutReached.bind(this, authFailHandler));
  870. _socket.setNoDelay(true);
  871. // keep alive for socket. This won't reset server wait_timeout use pool option idleTimeout for that
  872. if (opts.keepAliveDelay) {
  873. _socket.setKeepAlive(true, opts.keepAliveDelay);
  874. }
  875. }
  876. }.bind(this)
  877. );
  878. _socket.writeBuf = (buf) => _socket.write(buf);
  879. _socket.flush = () => {};
  880. _out.setStream(_socket);
  881. };
  882. /**
  883. * Authentication success result handler.
  884. *
  885. * @private
  886. */
  887. const _authSucceedHandler = (resolve, rejected) => {
  888. //enable packet compression according to option
  889. if (opts.logPackets) info.enableLogPacket();
  890. if (opts.compress) {
  891. if (info.serverCapabilities & Capabilities.COMPRESS) {
  892. _out.setStream(new CompressionOutputStream(_socket, opts, info));
  893. _in = new CompressionInputStream(_in, _receiveQueue, opts, info);
  894. _socket.removeAllListeners('data');
  895. _socket.on('data', _in.onData.bind(_in));
  896. } else {
  897. console.error(
  898. "connection is configured to use packet compression, but the server doesn't have this capability"
  899. );
  900. }
  901. }
  902. _addCommand = opts.pipelining ? _addCommandEnablePipeline : _addCommandEnable;
  903. const commands = _waitingAuthenticationQueue.toArray();
  904. commands.forEach((cmd) => {
  905. _addCommand(cmd);
  906. });
  907. const errorInitialQueries = (err) => {
  908. if (!err.fatal) this.end().catch((err) => {});
  909. process.nextTick(rejected, err);
  910. };
  911. _status = Status.INIT_CMD;
  912. _executeSessionVariableQuery()
  913. .then(() => {
  914. return _checkServerTimezone();
  915. })
  916. .then(() => {
  917. return _checkServerVersion();
  918. })
  919. .then(() => {
  920. return _executeInitQuery();
  921. })
  922. .then(() => {
  923. return _executeSessionTimeout();
  924. })
  925. .then(() => {
  926. _status = Status.CONNECTED;
  927. process.nextTick(resolve, this);
  928. })
  929. .catch(errorInitialQueries);
  930. };
  931. /**
  932. * Authentication failed result handler.
  933. *
  934. * @private
  935. */
  936. const _authFailHandler = (reject, err) => {
  937. process.nextTick(reject, err);
  938. //remove handshake command
  939. _receiveQueue.shift();
  940. _fatalError(err, true);
  941. };
  942. /**
  943. * Create TLS socket and associate events.
  944. *
  945. * @param rejected rejected function when error
  946. * @param callback callback function when done
  947. * @private
  948. */
  949. const _createSecureContext = (rejected, callback) => {
  950. const _socketError = _socketErrorHandler.bind(this, rejected);
  951. const sslOption = Object.assign({}, opts.ssl, {
  952. servername: opts.host,
  953. socket: _socket
  954. });
  955. try {
  956. const secureSocket = tls.connect(sslOption, callback);
  957. secureSocket.on('data', _in.onData.bind(_in));
  958. secureSocket.on('error', _socketError);
  959. secureSocket.on('end', _socketError);
  960. secureSocket.writeBuf = (buf) => secureSocket.write(buf);
  961. secureSocket.flush = () => {};
  962. _socket.removeAllListeners('data');
  963. _socket = secureSocket;
  964. _out.setStream(secureSocket);
  965. } catch (err) {
  966. _socketError(err);
  967. }
  968. };
  969. /**
  970. * Handle packet when no packet is expected.
  971. * (there can be an ERROR packet send by server/proxy to inform that connection is ending).
  972. *
  973. * @param packet packet
  974. * @private
  975. */
  976. const _unexpectedPacket = function (packet) {
  977. if (packet && packet.peek() === 0xff) {
  978. //can receive unexpected error packet from server/proxy
  979. //to inform that connection is closed (usually by timeout)
  980. let err = packet.readError(info);
  981. if (err.fatal && _status !== Status.CLOSING && _status !== Status.CLOSED) {
  982. this.emit('error', err);
  983. this.end();
  984. }
  985. } else if (_status !== Status.CLOSING && _status !== Status.CLOSED) {
  986. this.emit(
  987. 'error',
  988. Errors.createError(
  989. 'receiving packet from server without active commands\n' +
  990. 'conn:' +
  991. (info.threadId ? info.threadId : -1) +
  992. '(' +
  993. packet.pos +
  994. ',' +
  995. packet.end +
  996. ')\n' +
  997. Utils.log(opts, packet.buf, packet.pos, packet.end),
  998. true,
  999. info,
  1000. '08S01',
  1001. Errors.ER_UNEXPECTED_PACKET
  1002. )
  1003. );
  1004. this.destroy();
  1005. }
  1006. };
  1007. /**
  1008. * Change transaction state.
  1009. *
  1010. * @param sql sql
  1011. * @returns {Promise} promise
  1012. * @private
  1013. */
  1014. const _changeTransaction = (sql) => {
  1015. //if command in progress, driver cannot rely on status and must execute query
  1016. if (_status === Status.CLOSING || _status === Status.CLOSED) {
  1017. return Promise.reject(
  1018. Errors.createError(
  1019. 'Cannot execute new commands: connection closed\nsql: ' + sql,
  1020. true,
  1021. info,
  1022. '08S01',
  1023. Errors.ER_CMD_CONNECTION_CLOSED
  1024. )
  1025. );
  1026. }
  1027. //Command in progress => must execute query
  1028. //or if no command in progress, can rely on status to know if query is needed
  1029. if (_receiveQueue.peekFront() || info.status & ServerStatus.STATUS_IN_TRANS) {
  1030. return new Promise(function (resolve, reject) {
  1031. const cmd = new Query(resolve, reject, null, opts, sql, null);
  1032. if (opts.trace) Error.captureStackTrace(cmd);
  1033. _addCommand(cmd);
  1034. });
  1035. }
  1036. return Promise.resolve();
  1037. };
  1038. /**
  1039. * Handle connection timeout.
  1040. *
  1041. * @private
  1042. */
  1043. const _connectTimeoutReached = function (authFailHandler, initialConnectionTime) {
  1044. _timeout = null;
  1045. const handshake = _receiveQueue.peekFront();
  1046. authFailHandler(
  1047. Errors.createError(
  1048. 'Connection timeout: failed to create socket after ' +
  1049. (Date.now() - initialConnectionTime) +
  1050. 'ms',
  1051. true,
  1052. info,
  1053. '08S01',
  1054. Errors.ER_CONNECTION_TIMEOUT,
  1055. handshake ? handshake.stack : null
  1056. )
  1057. );
  1058. };
  1059. /**
  1060. * Handle socket timeout.
  1061. *
  1062. * @private
  1063. */
  1064. const _socketTimeoutReached = function () {
  1065. const err = Errors.createError('socket timeout', true, info, '08S01', Errors.ER_SOCKET_TIMEOUT);
  1066. const packetMsgs = info.getLastPackets();
  1067. if (packetMsgs !== '') {
  1068. err.message = err.message + '\nlast received packets:\n' + packetMsgs;
  1069. }
  1070. _fatalError(err, true);
  1071. };
  1072. /**
  1073. * Add command to waiting queue until authentication.
  1074. *
  1075. * @param cmd command
  1076. * @returns {*} current command
  1077. * @private
  1078. */
  1079. const _addCommandQueue = (cmd) => {
  1080. _waitingAuthenticationQueue.push(cmd);
  1081. return cmd;
  1082. };
  1083. /**
  1084. * Add command to command sending and receiving queue.
  1085. *
  1086. * @param cmd command
  1087. * @returns {*} current command
  1088. * @private
  1089. */
  1090. const _addCommandEnable = (cmd) => {
  1091. cmd.once('end', () => {
  1092. setImmediate(_nextSendCmd);
  1093. });
  1094. //send immediately only if no current active receiver
  1095. if (_sendQueue.isEmpty() && (_status === Status.INIT_CMD || _status === Status.CONNECTED)) {
  1096. if (_receiveQueue.peekFront()) {
  1097. _receiveQueue.push(cmd);
  1098. _sendQueue.push(cmd);
  1099. return cmd;
  1100. }
  1101. _receiveQueue.push(cmd);
  1102. cmd.start(_out, opts, info);
  1103. } else {
  1104. _receiveQueue.push(cmd);
  1105. _sendQueue.push(cmd);
  1106. }
  1107. return cmd;
  1108. };
  1109. /**
  1110. * Add command to command sending and receiving queue using pipelining
  1111. *
  1112. * @param cmd command
  1113. * @returns {*} current command
  1114. * @private
  1115. */
  1116. const _addCommandEnablePipeline = (cmd) => {
  1117. cmd.once('send_end', () => {
  1118. setImmediate(_nextSendCmd);
  1119. });
  1120. _receiveQueue.push(cmd);
  1121. if (_sendQueue.isEmpty()) {
  1122. cmd.start(_out, opts, info);
  1123. if (cmd.sending) {
  1124. _sendQueue.push(cmd);
  1125. cmd.prependOnceListener('send_end', () => {
  1126. _sendQueue.shift();
  1127. });
  1128. }
  1129. } else {
  1130. _sendQueue.push(cmd);
  1131. }
  1132. return cmd;
  1133. };
  1134. /**
  1135. * Replacing command when connection is closing or closed to send a proper error message.
  1136. *
  1137. * @param cmd command
  1138. * @private
  1139. */
  1140. const _addCommandDisabled = (cmd) => {
  1141. cmd.throwNewError(
  1142. 'Cannot execute new commands: connection closed\n' + cmd.displaySql(),
  1143. true,
  1144. info,
  1145. '08S01',
  1146. Errors.ER_CMD_CONNECTION_CLOSED
  1147. );
  1148. };
  1149. /**
  1150. * Handle socket error.
  1151. *
  1152. * @param authFailHandler authentication handler
  1153. * @param err socket error
  1154. * @private
  1155. */
  1156. const _socketErrorHandler = function (authFailHandler, err) {
  1157. if (_status === Status.CLOSING || _status === Status.CLOSED) return;
  1158. if (_socket) {
  1159. _socket.writeBuf = () => {};
  1160. _socket.flush = () => {};
  1161. }
  1162. //socket has been ended without error
  1163. if (!err) {
  1164. err = Errors.createError(
  1165. 'socket has unexpectedly been closed',
  1166. true,
  1167. info,
  1168. '08S01',
  1169. Errors.ER_SOCKET_UNEXPECTED_CLOSE
  1170. );
  1171. } else {
  1172. err.fatal = true;
  1173. this.sqlState = 'HY000';
  1174. }
  1175. const packetMsgs = info.getLastPackets();
  1176. if (packetMsgs !== '') {
  1177. err.message += '\nlast received packets:\n' + packetMsgs;
  1178. }
  1179. switch (_status) {
  1180. case Status.CONNECTING:
  1181. case Status.AUTHENTICATING:
  1182. const currentCmd = _receiveQueue.peekFront();
  1183. if (currentCmd && currentCmd.stack && err) {
  1184. err.stack +=
  1185. '\n From event:\n' + currentCmd.stack.substring(currentCmd.stack.indexOf('\n') + 1);
  1186. }
  1187. authFailHandler(err);
  1188. break;
  1189. default:
  1190. _fatalError(err, false);
  1191. }
  1192. };
  1193. /**
  1194. * Fatal unexpected error : closing connection, and throw exception.
  1195. *
  1196. * @param self current connection
  1197. * @private
  1198. */
  1199. const _fatalErrorHandler = function (self) {
  1200. return function (err, avoidThrowError) {
  1201. if (_status === Status.CLOSING || _status === Status.CLOSED) {
  1202. socketErrorDispatchToQueries(err);
  1203. return;
  1204. }
  1205. const mustThrowError = _status !== Status.CONNECTING;
  1206. _status = Status.CLOSING;
  1207. //prevent executing new commands
  1208. _addCommand = _addCommandDisabled;
  1209. if (_socket) {
  1210. _socket.removeAllListeners('error');
  1211. _socket.removeAllListeners('timeout');
  1212. _socket.removeAllListeners('close');
  1213. _socket.removeAllListeners('data');
  1214. if (!_socket.destroyed) _socket.destroy();
  1215. _socket = undefined;
  1216. }
  1217. _status = Status.CLOSED;
  1218. const errorThrownByCmd = socketErrorDispatchToQueries(err);
  1219. if (mustThrowError) {
  1220. if (self.listenerCount('error') > 0) {
  1221. self.emit('error', err);
  1222. self.emit('end');
  1223. _clear();
  1224. } else {
  1225. self.emit('end');
  1226. _clear();
  1227. //error will be thrown if no error listener and no command did throw the exception
  1228. if (!avoidThrowError && !errorThrownByCmd) throw err;
  1229. }
  1230. } else {
  1231. _clear();
  1232. }
  1233. };
  1234. };
  1235. /**
  1236. * Dispatch fatal error to current running queries.
  1237. *
  1238. * @param err the fatal error
  1239. * @return {boolean} return if error has been relayed to queries
  1240. */
  1241. const socketErrorDispatchToQueries = (err) => {
  1242. let receiveCmd;
  1243. let errorThrownByCmd = false;
  1244. while ((receiveCmd = _receiveQueue.shift())) {
  1245. if (receiveCmd && receiveCmd.onPacketReceive) {
  1246. errorThrownByCmd = true;
  1247. setImmediate(receiveCmd.throwError.bind(receiveCmd), err, info);
  1248. }
  1249. }
  1250. return errorThrownByCmd;
  1251. };
  1252. /**
  1253. * Will send next command in queue if any.
  1254. *
  1255. * @private
  1256. */
  1257. const _nextSendCmd = () => {
  1258. let sendCmd;
  1259. if ((sendCmd = _sendQueue.shift())) {
  1260. if (sendCmd.sending) {
  1261. _sendQueue.unshift(sendCmd);
  1262. } else {
  1263. sendCmd.start(_out, opts, info);
  1264. if (sendCmd.sending) {
  1265. sendCmd.prependOnceListener('send_end', () => {
  1266. _sendQueue.shift();
  1267. });
  1268. _sendQueue.unshift(sendCmd);
  1269. }
  1270. }
  1271. }
  1272. };
  1273. /**
  1274. * Clearing connection variables when ending.
  1275. *
  1276. * @private
  1277. */
  1278. const _clear = () => {
  1279. _sendQueue.clear();
  1280. opts.removeAllListeners();
  1281. _out = undefined;
  1282. _socket = undefined;
  1283. };
  1284. //*****************************************************************
  1285. // internal variables
  1286. //*****************************************************************
  1287. EventEmitter.call(this);
  1288. const opts = Object.assign(new EventEmitter(), options);
  1289. const info = new ConnectionInformation();
  1290. const _sendQueue = new Queue();
  1291. const _receiveQueue = new Queue();
  1292. const _waitingAuthenticationQueue = new Queue();
  1293. let _status = Status.NOT_CONNECTED;
  1294. let _socketConnected = false;
  1295. let _socket = null;
  1296. let _timeout = null;
  1297. let _addCommand = _addCommandQueue;
  1298. const _fatalError = _fatalErrorHandler(this);
  1299. let _out = new PacketOutputStream(opts, info);
  1300. let _in = new PacketInputStream(_unexpectedPacket.bind(this), _receiveQueue, _out, opts, info);
  1301. this.query = this._queryPromise;
  1302. this.escape = Utils.escape.bind(this, opts, info);
  1303. this.escapeId = Utils.escapeId.bind(this, opts, info);
  1304. //add alias threadId for mysql/mysql2 compatibility
  1305. Object.defineProperty(this, 'threadId', {
  1306. get() {
  1307. return info ? info.threadId : undefined;
  1308. }
  1309. });
  1310. Object.defineProperty(this, 'info', {
  1311. get() {
  1312. return info;
  1313. }
  1314. });
  1315. }
  1316. util.inherits(Connection, EventEmitter);
  1317. module.exports = Connection;