promise.js 10 KB


  1. 'use strict';
  2. const core = require('./index.js');
  3. const EventEmitter = require('events').EventEmitter;
  4. function makeDoneCb(resolve, reject, localErr) {
  5. return function(err, rows, fields) {
  6. if (err) {
  7. localErr.message = err.message;
  8. localErr.code = err.code;
  9. localErr.errno = err.errno;
  10. localErr.sqlState = err.sqlState;
  11. localErr.sqlMessage = err.sqlMessage;
  12. reject(localErr);
  13. } else {
  14. resolve([rows, fields]);
  15. }
  16. };
  17. }
  18. function inheritEvents(source, target, events) {
  19. const listeners = {};
  20. target
  21. .on('newListener', eventName => {
  22. if (events.indexOf(eventName) >= 0 && !target.listenerCount(eventName)) {
  23. source.on(
  24. eventName,
  25. (listeners[eventName] = function() {
  26. const args = [].slice.call(arguments);
  27. args.unshift(eventName);
  28. target.emit.apply(target, args);
  29. })
  30. );
  31. }
  32. })
  33. .on('removeListener', eventName => {
  34. if (events.indexOf(eventName) >= 0 && !target.listenerCount(eventName)) {
  35. source.removeListener(eventName, listeners[eventName]);
  36. delete listeners[eventName];
  37. }
  38. });
  39. }
  40. class PromisePreparedStatementInfo {
  41. constructor(statement, promiseImpl) {
  42. this.statement = statement;
  43. this.Promise = promiseImpl;
  44. }
  45. execute(parameters) {
  46. const s = this.statement;
  47. const localErr = new Error();
  48. return new this.Promise((resolve, reject) => {
  49. const done = makeDoneCb(resolve, reject, localErr);
  50. if (parameters) {
  51. s.execute(parameters, done);
  52. } else {
  53. s.execute(done);
  54. }
  55. });
  56. }
  57. close() {
  58. return new this.Promise(resolve => {
  59. this.statement.close();
  60. resolve();
  61. });
  62. }
  63. }
  64. class PromiseConnection extends EventEmitter {
  65. constructor(connection, promiseImpl) {
  66. super();
  67. this.connection = connection;
  68. this.Promise = promiseImpl || global.Promise;
  69. inheritEvents(connection, this, [
  70. 'error',
  71. 'drain',
  72. 'connect',
  73. 'end',
  74. 'enqueue'
  75. ]);
  76. }
  77. release() {
  78. this.connection.release();
  79. }
  80. query(query, params) {
  81. const c = this.connection;
  82. const localErr = new Error();
  83. return new this.Promise((resolve, reject) => {
  84. const done = makeDoneCb(resolve, reject, localErr);
  85. if (params) {
  86. c.query(query, params, done);
  87. } else {
  88. c.query(query, done);
  89. }
  90. });
  91. }
  92. execute(query, params) {
  93. const c = this.connection;
  94. const localErr = new Error();
  95. return new this.Promise((resolve, reject) => {
  96. const done = makeDoneCb(resolve, reject, localErr);
  97. if (params) {
  98. c.execute(query, params, done);
  99. } else {
  100. c.execute(query, done);
  101. }
  102. });
  103. }
  104. end() {
  105. return new this.Promise(resolve => {
  106. this.connection.end(resolve);
  107. });
  108. }
  109. beginTransaction() {
  110. const c = this.connection;
  111. const localErr = new Error();
  112. return new this.Promise((resolve, reject) => {
  113. const done = makeDoneCb(resolve, reject, localErr);
  114. c.beginTransaction(done);
  115. });
  116. }
  117. commit() {
  118. const c = this.connection;
  119. const localErr = new Error();
  120. return new this.Promise((resolve, reject) => {
  121. const done = makeDoneCb(resolve, reject, localErr);
  122. c.commit(done);
  123. });
  124. }
  125. rollback() {
  126. const c = this.connection;
  127. const localErr = new Error();
  128. return new this.Promise((resolve, reject) => {
  129. const done = makeDoneCb(resolve, reject, localErr);
  130. c.rollback(done);
  131. });
  132. }
  133. ping() {
  134. const c = this.connection;
  135. const localErr = new Error();
  136. return new this.Promise((resolve, reject) => {
  137. const done = makeDoneCb(resolve, reject, localErr);
  138. c.ping(done);
  139. });
  140. }
  141. connect() {
  142. const c = this.connection;
  143. const localErr = new Error();
  144. return new this.Promise((resolve, reject) => {
  145. c.connect((err, param) => {
  146. if (err) {
  147. localErr.message = err.message;
  148. localErr.code = err.code;
  149. localErr.errno = err.errno;
  150. localErr.sqlState = err.sqlState;
  151. localErr.sqlMessage = err.sqlMessage;
  152. reject(localErr);
  153. } else {
  154. resolve(param);
  155. }
  156. });
  157. });
  158. }
  159. prepare(options) {
  160. const c = this.connection;
  161. const promiseImpl = this.Promise;
  162. const localErr = new Error();
  163. return new this.Promise((resolve, reject) => {
  164. c.prepare(options, (err, statement) => {
  165. if (err) {
  166. localErr.message = err.message;
  167. localErr.code = err.code;
  168. localErr.errno = err.errno;
  169. localErr.sqlState = err.sqlState;
  170. localErr.sqlMessage = err.sqlMessage;
  171. reject(localErr);
  172. } else {
  173. const wrappedStatement = new PromisePreparedStatementInfo(
  174. statement,
  175. promiseImpl
  176. );
  177. resolve(wrappedStatement);
  178. }
  179. });
  180. });
  181. }
  182. changeUser(options) {
  183. const c = this.connection;
  184. const localErr = new Error();
  185. return new this.Promise((resolve, reject) => {
  186. c.changeUser(options, err => {
  187. if (err) {
  188. localErr.message = err.message;
  189. localErr.code = err.code;
  190. localErr.errno = err.errno;
  191. localErr.sqlState = err.sqlState;
  192. localErr.sqlMessage = err.sqlMessage;
  193. reject(localErr);
  194. } else {
  195. resolve();
  196. }
  197. });
  198. });
  199. }
  200. get config() {
  201. return this.connection.config;
  202. }
  203. get threadId() {
  204. return this.connection.threadId;
  205. }
  206. }
  207. function createConnection(opts) {
  208. const coreConnection = core.createConnection(opts);
  209. const createConnectionErr = new Error();
  210. const Promise = opts.Promise || global.Promise;
  211. if (!Promise) {
  212. throw new Error(
  213. 'no Promise implementation available.' +
  214. 'Use promise-enabled node version or pass userland Promise' +
  215. " implementation as parameter, for example: { Promise: require('bluebird') }"
  216. );
  217. }
  218. return new Promise((resolve, reject) => {
  219. coreConnection.once('connect', () => {
  220. resolve(new PromiseConnection(coreConnection, Promise));
  221. });
  222. coreConnection.once('error', err => {
  223. createConnectionErr.message = err.message;
  224. createConnectionErr.code = err.code;
  225. createConnectionErr.errno = err.errno;
  226. createConnectionErr.sqlState = err.sqlState;
  227. reject(createConnectionErr);
  228. });
  229. });
  230. }
  231. // note: the callback of "changeUser" is not called on success
  232. // hence there is no possibility to call "resolve"
  233. // patching PromiseConnection
  234. // create facade functions for prototype functions on "Connection" that are not yet
  235. // implemented with PromiseConnection
  236. // proxy synchronous functions only
  237. (function(functionsToWrap) {
  238. for (let i = 0; functionsToWrap && i < functionsToWrap.length; i++) {
  239. const func = functionsToWrap[i];
  240. if (
  241. typeof core.Connection.prototype[func] === 'function' &&
  242. PromiseConnection.prototype[func] === undefined
  243. ) {
  244. PromiseConnection.prototype[func] = (function factory(funcName) {
  245. return function() {
  246. return core.Connection.prototype[funcName].apply(
  247. this.connection,
  248. arguments
  249. );
  250. };
  251. })(func);
  252. }
  253. }
  254. })([
  255. // synchronous functions
  256. 'close',
  257. 'createBinlogStream',
  258. 'destroy',
  259. 'escape',
  260. 'escapeId',
  261. 'format',
  262. 'pause',
  263. 'pipe',
  264. 'resume',
  265. 'unprepare'
  266. ]);
  267. class PromisePoolConnection extends PromiseConnection {
  268. constructor(connection, promiseImpl) {
  269. super(connection, promiseImpl);
  270. }
  271. destroy() {
  272. return core.PoolConnection.prototype.destroy.apply(
  273. this.connection,
  274. arguments
  275. );
  276. }
  277. }
  278. class PromisePool extends EventEmitter {
  279. constructor(pool, Promise) {
  280. super();
  281. this.pool = pool;
  282. this.Promise = Promise || global.Promise;
  283. inheritEvents(pool, this, ['acquire', 'connection', 'enqueue', 'release']);
  284. }
  285. getConnection() {
  286. const corePool = this.pool;
  287. return new this.Promise((resolve, reject) => {
  288. corePool.getConnection((err, coreConnection) => {
  289. if (err) {
  290. reject(err);
  291. } else {
  292. resolve(new PromisePoolConnection(coreConnection, this.Promise));
  293. }
  294. });
  295. });
  296. }
  297. query(sql, args) {
  298. const corePool = this.pool;
  299. const localErr = new Error();
  300. return new this.Promise((resolve, reject) => {
  301. const done = makeDoneCb(resolve, reject, localErr);
  302. if (args) {
  303. corePool.query(sql, args, done);
  304. } else {
  305. corePool.query(sql, done);
  306. }
  307. });
  308. }
  309. execute(sql, values) {
  310. const corePool = this.pool;
  311. const localErr = new Error();
  312. return new this.Promise((resolve, reject) => {
  313. corePool.execute(sql, values, makeDoneCb(resolve, reject, localErr));
  314. });
  315. }
  316. end() {
  317. const corePool = this.pool;
  318. const localErr = new Error();
  319. return new this.Promise((resolve, reject) => {
  320. corePool.end(err => {
  321. if (err) {
  322. localErr.message = err.message;
  323. localErr.code = err.code;
  324. localErr.errno = err.errno;
  325. localErr.sqlState = err.sqlState;
  326. localErr.sqlMessage = err.sqlMessage;
  327. reject(localErr);
  328. } else {
  329. resolve();
  330. }
  331. });
  332. });
  333. }
  334. }
  335. function createPool(opts) {
  336. const corePool = core.createPool(opts);
  337. const Promise = opts.Promise || global.Promise;
  338. if (!Promise) {
  339. throw new Error(
  340. 'no Promise implementation available.' +
  341. 'Use promise-enabled node version or pass userland Promise' +
  342. " implementation as parameter, for example: { Promise: require('bluebird') }"
  343. );
  344. }
  345. return new PromisePool(corePool, Promise);
  346. }
  347. (function(functionsToWrap) {
  348. for (let i = 0; functionsToWrap && i < functionsToWrap.length; i++) {
  349. const func = functionsToWrap[i];
  350. if (
  351. typeof core.Pool.prototype[func] === 'function' &&
  352. PromisePool.prototype[func] === undefined
  353. ) {
  354. PromisePool.prototype[func] = (function factory(funcName) {
  355. return function() {
  356. return core.Pool.prototype[funcName].apply(this.pool, arguments);
  357. };
  358. })(func);
  359. }
  360. }
  361. })([
  362. // synchronous functions
  363. 'escape',
  364. 'escapeId',
  365. 'format'
  366. ]);
  367. exports.createConnection = createConnection;
  368. exports.createPool = createPool;
  369. exports.escape = core.escape;
  370. exports.escapeId = core.escapeId;
  371. exports.format = core.format;
  372. exports.raw = core.raw;
  373. exports.PromisePool = PromisePool;
  374. exports.PromiseConnection = PromiseConnection;
  375. exports.PromisePoolConnection = PromisePoolConnection;