123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201 |
- 'use strict';
- const PoolBase = require('./pool-base');
- const ConnectionCallback = require('./connection-callback');
- const Errors = require('./misc/errors');
- const util = require('util');
- function PoolCallback(options) {
- const processTaskCallback = function (conn, sql, values, isBatch) {
- if (sql) {
- return new Promise((resolve, reject) => {
- const fct = isBatch ? conn.batch : conn.query;
- fct(sql, values, (err, rows, fields) => {
- conn.releaseWithoutError();
- if (err) {
- reject(err);
- return;
- }
- return resolve(rows);
- });
- });
- } else {
- return Promise.resolve(conn);
- }
- };
- const pingPromise = function (conn) {
- return new Promise((resolve, reject) => {
- conn.ping(options.pingTimeout, (err) => {
- if (err) {
- reject(err);
- } else resolve();
- });
- });
- };
- const createConnectionPoolCallback = function (pool) {
- const conn = new ConnectionCallback(options.connOptions);
- return new Promise(function (resolve, reject) {
- conn.connect((err) => {
- if (err) {
- reject(err);
- } else {
- if (pool.closed) {
- //discard connection
- conn.end((err) => {});
- reject(
- Errors.createError(
- 'Cannot create new connection to pool, pool closed',
- true,
- null,
- '08S01',
- Errors.ER_ADD_CONNECTION_CLOSED_POOL,
- null
- )
- );
- } else {
- const initialEnd = conn.end;
- conn.forceEnd = () => {
- return new Promise(function (res, rej) {
- initialEnd((err) => {
- if (err) {
- rej(err);
- } else {
- res();
- }
- });
- });
- };
- conn.release = function (cb) {
- if (pool.closed) {
- pool._discardConnection(conn);
- if (cb) cb();
- return;
- }
- if (options.noControlAfterUse) {
- pool._releaseConnection(conn);
- if (cb) cb();
- return;
- }
- //if server permit it, reset the connection, or rollback only if not
- // COM_RESET_CONNECTION exist since mysql 5.7.3 and mariadb 10.2.4
- // but not possible to use it with mysql waiting for https://bugs.mysql.com/bug.php?id=97633 correction.
- // and mariadb only since https://jira.mariadb.org/browse/MDEV-18281
- let revertFunction = conn.rollback;
- if (
- options.resetAfterUse &&
- conn.info.isMariaDB() &&
- ((conn.info.serverVersion.minor === 2 && conn.info.hasMinVersion(10, 2, 22)) ||
- conn.info.hasMinVersion(10, 3, 13))
- ) {
- revertFunction = conn.reset;
- }
- revertFunction((errCall) => {
- if (errCall) {
- //uncertain connection state.
- pool._discardConnection(conn);
- if (cb) cb();
- return;
- } else {
- pool._releaseConnection(conn);
- }
- if (cb) cb();
- });
- };
- conn.end = conn.release;
- conn.releaseWithoutError = () => {
- conn.end((err) => {});
- };
- resolve(conn);
- }
- }
- });
- });
- };
- PoolBase.call(this, options, processTaskCallback, createConnectionPoolCallback, pingPromise);
- const getConnectionPromise = this.getConnection.bind(this);
- const endPromise = this.end.bind(this);
- const queryPromise = this.query.bind(this);
- const batchPromise = this.batch.bind(this);
- const emptyError = (err) => {};
- //*****************************************************************
- // internal equivalent with callback of promised functions
- //*****************************************************************
- const _getConnectionCallback = (callback) => {
- getConnectionPromise()
- .then((conn) => {
- if (callback) callback(null, conn);
- })
- .catch(callback || emptyError);
- };
- const _endCallback = (callback) => {
- endPromise()
- .then(() => {
- if (callback) callback(null);
- })
- .catch(callback || emptyError);
- };
- /**
- * Execute query using text protocol with callback emit columns/data/end/error
- * events to permit streaming big result-set
- *
- * @param sql sql parameter Object can be used to supersede default option.
- * Object must then have sql property.
- * @param values object / array of placeholder values (not mandatory)
- * @param cb callback
- * @returns {Query} query
- */
- const _queryCallback = function (sql, values, cb) {
- let _cb = cb,
- _values = values;
- if (typeof values === 'function') {
- _cb = values;
- _values = undefined;
- }
- queryPromise(sql, _values)
- .then((rows) => {
- if (_cb) _cb(null, rows, rows.meta);
- })
- .catch(_cb || emptyError);
- };
- const _batchCallback = function (sql, values, cb) {
- let _values = values,
- _cb = cb;
- if (typeof values === 'function') {
- _cb = values;
- _values = undefined;
- }
- batchPromise(sql, _values)
- .then((rows) => {
- if (_cb) _cb(null, rows, rows.meta);
- })
- .catch(_cb || emptyError);
- };
- //*****************************************************************
- // replacing public promise function with callback equivalent
- //*****************************************************************
- this.end = _endCallback;
- this.query = _queryCallback;
- this.batch = _batchCallback;
- this.getConnection = _getConnectionCallback;
- }
- util.inherits(PoolCallback, PoolBase);
- module.exports = PoolCallback;
|