pool-callback.js 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. 'use strict';
  2. const PoolBase = require('./pool-base');
  3. const ConnectionCallback = require('./connection-callback');
  4. const Errors = require('./misc/errors');
  5. const util = require('util');
  6. function PoolCallback(options) {
  7. const processTaskCallback = function (conn, sql, values, isBatch) {
  8. if (sql) {
  9. return new Promise((resolve, reject) => {
  10. const fct = isBatch ? conn.batch : conn.query;
  11. fct(sql, values, (err, rows, fields) => {
  12. conn.releaseWithoutError();
  13. if (err) {
  14. reject(err);
  15. return;
  16. }
  17. return resolve(rows);
  18. });
  19. });
  20. } else {
  21. return Promise.resolve(conn);
  22. }
  23. };
  24. const pingPromise = function (conn) {
  25. return new Promise((resolve, reject) => {
  26. conn.ping(options.pingTimeout, (err) => {
  27. if (err) {
  28. reject(err);
  29. } else resolve();
  30. });
  31. });
  32. };
  33. const createConnectionPoolCallback = function (pool) {
  34. const conn = new ConnectionCallback(options.connOptions);
  35. return new Promise(function (resolve, reject) {
  36. conn.connect((err) => {
  37. if (err) {
  38. reject(err);
  39. } else {
  40. if (pool.closed) {
  41. //discard connection
  42. conn.end((err) => {});
  43. reject(
  44. Errors.createError(
  45. 'Cannot create new connection to pool, pool closed',
  46. true,
  47. null,
  48. '08S01',
  49. Errors.ER_ADD_CONNECTION_CLOSED_POOL,
  50. null
  51. )
  52. );
  53. } else {
  54. const initialEnd = conn.end;
  55. conn.forceEnd = () => {
  56. return new Promise(function (res, rej) {
  57. initialEnd((err) => {
  58. if (err) {
  59. rej(err);
  60. } else {
  61. res();
  62. }
  63. });
  64. });
  65. };
  66. conn.release = function (cb) {
  67. if (pool.closed) {
  68. pool._discardConnection(conn);
  69. if (cb) cb();
  70. return;
  71. }
  72. if (options.noControlAfterUse) {
  73. pool._releaseConnection(conn);
  74. if (cb) cb();
  75. return;
  76. }
  77. //if server permit it, reset the connection, or rollback only if not
  78. // COM_RESET_CONNECTION exist since mysql 5.7.3 and mariadb 10.2.4
  79. // but not possible to use it with mysql waiting for https://bugs.mysql.com/bug.php?id=97633 correction.
  80. // and mariadb only since https://jira.mariadb.org/browse/MDEV-18281
  81. let revertFunction = conn.rollback;
  82. if (
  83. options.resetAfterUse &&
  84. conn.info.isMariaDB() &&
  85. ((conn.info.serverVersion.minor === 2 && conn.info.hasMinVersion(10, 2, 22)) ||
  86. conn.info.hasMinVersion(10, 3, 13))
  87. ) {
  88. revertFunction = conn.reset;
  89. }
  90. revertFunction((errCall) => {
  91. if (errCall) {
  92. //uncertain connection state.
  93. pool._discardConnection(conn);
  94. if (cb) cb();
  95. return;
  96. } else {
  97. pool._releaseConnection(conn);
  98. }
  99. if (cb) cb();
  100. });
  101. };
  102. conn.end = conn.release;
  103. conn.releaseWithoutError = () => {
  104. conn.end((err) => {});
  105. };
  106. resolve(conn);
  107. }
  108. }
  109. });
  110. });
  111. };
  112. PoolBase.call(this, options, processTaskCallback, createConnectionPoolCallback, pingPromise);
  113. const getConnectionPromise = this.getConnection.bind(this);
  114. const endPromise = this.end.bind(this);
  115. const queryPromise = this.query.bind(this);
  116. const batchPromise = this.batch.bind(this);
  117. const emptyError = (err) => {};
  118. //*****************************************************************
  119. // internal equivalent with callback of promised functions
  120. //*****************************************************************
  121. const _getConnectionCallback = (callback) => {
  122. getConnectionPromise()
  123. .then((conn) => {
  124. if (callback) callback(null, conn);
  125. })
  126. .catch(callback || emptyError);
  127. };
  128. const _endCallback = (callback) => {
  129. endPromise()
  130. .then(() => {
  131. if (callback) callback(null);
  132. })
  133. .catch(callback || emptyError);
  134. };
  135. /**
  136. * Execute query using text protocol with callback emit columns/data/end/error
  137. * events to permit streaming big result-set
  138. *
  139. * @param sql sql parameter Object can be used to supersede default option.
  140. * Object must then have sql property.
  141. * @param values object / array of placeholder values (not mandatory)
  142. * @param cb callback
  143. * @returns {Query} query
  144. */
  145. const _queryCallback = function (sql, values, cb) {
  146. let _cb = cb,
  147. _values = values;
  148. if (typeof values === 'function') {
  149. _cb = values;
  150. _values = undefined;
  151. }
  152. queryPromise(sql, _values)
  153. .then((rows) => {
  154. if (_cb) _cb(null, rows, rows.meta);
  155. })
  156. .catch(_cb || emptyError);
  157. };
  158. const _batchCallback = function (sql, values, cb) {
  159. let _values = values,
  160. _cb = cb;
  161. if (typeof values === 'function') {
  162. _cb = values;
  163. _values = undefined;
  164. }
  165. batchPromise(sql, _values)
  166. .then((rows) => {
  167. if (_cb) _cb(null, rows, rows.meta);
  168. })
  169. .catch(_cb || emptyError);
  170. };
  171. //*****************************************************************
  172. // replacing public promise function with callback equivalent
  173. //*****************************************************************
  174. this.end = _endCallback;
  175. this.query = _queryCallback;
  176. this.batch = _batchCallback;
  177. this.getConnection = _getConnectionCallback;
  178. }
  179. util.inherits(PoolCallback, PoolBase);
  180. module.exports = PoolCallback;