123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413 |
- 'use strict';
- const core = require('./index.js');
- const EventEmitter = require('events').EventEmitter;
- function makeDoneCb(resolve, reject, localErr) {
- return function(err, rows, fields) {
- if (err) {
- localErr.message = err.message;
- localErr.code = err.code;
- localErr.errno = err.errno;
- localErr.sqlState = err.sqlState;
- localErr.sqlMessage = err.sqlMessage;
- reject(localErr);
- } else {
- resolve([rows, fields]);
- }
- };
- }
- function inheritEvents(source, target, events) {
- const listeners = {};
- target
- .on('newListener', eventName => {
- if (events.indexOf(eventName) >= 0 && !target.listenerCount(eventName)) {
- source.on(
- eventName,
- (listeners[eventName] = function() {
- const args = [].slice.call(arguments);
- args.unshift(eventName);
- target.emit.apply(target, args);
- })
- );
- }
- })
- .on('removeListener', eventName => {
- if (events.indexOf(eventName) >= 0 && !target.listenerCount(eventName)) {
- source.removeListener(eventName, listeners[eventName]);
- delete listeners[eventName];
- }
- });
- }
- class PromisePreparedStatementInfo {
- constructor(statement, promiseImpl) {
- this.statement = statement;
- this.Promise = promiseImpl;
- }
- execute(parameters) {
- const s = this.statement;
- const localErr = new Error();
- return new this.Promise((resolve, reject) => {
- const done = makeDoneCb(resolve, reject, localErr);
- if (parameters) {
- s.execute(parameters, done);
- } else {
- s.execute(done);
- }
- });
- }
- close() {
- return new this.Promise(resolve => {
- this.statement.close();
- resolve();
- });
- }
- }
- class PromiseConnection extends EventEmitter {
- constructor(connection, promiseImpl) {
- super();
- this.connection = connection;
- this.Promise = promiseImpl || global.Promise;
- inheritEvents(connection, this, [
- 'error',
- 'drain',
- 'connect',
- 'end',
- 'enqueue'
- ]);
- }
- release() {
- this.connection.release();
- }
- query(query, params) {
- const c = this.connection;
- const localErr = new Error();
- return new this.Promise((resolve, reject) => {
- const done = makeDoneCb(resolve, reject, localErr);
- if (params) {
- c.query(query, params, done);
- } else {
- c.query(query, done);
- }
- });
- }
- execute(query, params) {
- const c = this.connection;
- const localErr = new Error();
- return new this.Promise((resolve, reject) => {
- const done = makeDoneCb(resolve, reject, localErr);
- if (params) {
- c.execute(query, params, done);
- } else {
- c.execute(query, done);
- }
- });
- }
- end() {
- return new this.Promise(resolve => {
- this.connection.end(resolve);
- });
- }
- beginTransaction() {
- const c = this.connection;
- const localErr = new Error();
- return new this.Promise((resolve, reject) => {
- const done = makeDoneCb(resolve, reject, localErr);
- c.beginTransaction(done);
- });
- }
- commit() {
- const c = this.connection;
- const localErr = new Error();
- return new this.Promise((resolve, reject) => {
- const done = makeDoneCb(resolve, reject, localErr);
- c.commit(done);
- });
- }
- rollback() {
- const c = this.connection;
- const localErr = new Error();
- return new this.Promise((resolve, reject) => {
- const done = makeDoneCb(resolve, reject, localErr);
- c.rollback(done);
- });
- }
- ping() {
- const c = this.connection;
- const localErr = new Error();
- return new this.Promise((resolve, reject) => {
- const done = makeDoneCb(resolve, reject, localErr);
- c.ping(done);
- });
- }
- connect() {
- const c = this.connection;
- const localErr = new Error();
- return new this.Promise((resolve, reject) => {
- c.connect((err, param) => {
- if (err) {
- localErr.message = err.message;
- localErr.code = err.code;
- localErr.errno = err.errno;
- localErr.sqlState = err.sqlState;
- localErr.sqlMessage = err.sqlMessage;
- reject(localErr);
- } else {
- resolve(param);
- }
- });
- });
- }
- prepare(options) {
- const c = this.connection;
- const promiseImpl = this.Promise;
- const localErr = new Error();
- return new this.Promise((resolve, reject) => {
- c.prepare(options, (err, statement) => {
- if (err) {
- localErr.message = err.message;
- localErr.code = err.code;
- localErr.errno = err.errno;
- localErr.sqlState = err.sqlState;
- localErr.sqlMessage = err.sqlMessage;
- reject(localErr);
- } else {
- const wrappedStatement = new PromisePreparedStatementInfo(
- statement,
- promiseImpl
- );
- resolve(wrappedStatement);
- }
- });
- });
- }
- changeUser(options) {
- const c = this.connection;
- const localErr = new Error();
- return new this.Promise((resolve, reject) => {
- c.changeUser(options, err => {
- if (err) {
- localErr.message = err.message;
- localErr.code = err.code;
- localErr.errno = err.errno;
- localErr.sqlState = err.sqlState;
- localErr.sqlMessage = err.sqlMessage;
- reject(localErr);
- } else {
- resolve();
- }
- });
- });
- }
- get config() {
- return this.connection.config;
- }
- get threadId() {
- return this.connection.threadId;
- }
- }
- function createConnection(opts) {
- const coreConnection = core.createConnection(opts);
- const createConnectionErr = new Error();
- const Promise = opts.Promise || global.Promise;
- if (!Promise) {
- throw new Error(
- 'no Promise implementation available.' +
- 'Use promise-enabled node version or pass userland Promise' +
- " implementation as parameter, for example: { Promise: require('bluebird') }"
- );
- }
- return new Promise((resolve, reject) => {
- coreConnection.once('connect', () => {
- resolve(new PromiseConnection(coreConnection, Promise));
- });
- coreConnection.once('error', err => {
- createConnectionErr.message = err.message;
- createConnectionErr.code = err.code;
- createConnectionErr.errno = err.errno;
- createConnectionErr.sqlState = err.sqlState;
- reject(createConnectionErr);
- });
- });
- }
- // note: the callback of "changeUser" is not called on success
- // hence there is no possibility to call "resolve"
- // patching PromiseConnection
- // create facade functions for prototype functions on "Connection" that are not yet
- // implemented with PromiseConnection
- // proxy synchronous functions only
- (function(functionsToWrap) {
- for (let i = 0; functionsToWrap && i < functionsToWrap.length; i++) {
- const func = functionsToWrap[i];
- if (
- typeof core.Connection.prototype[func] === 'function' &&
- PromiseConnection.prototype[func] === undefined
- ) {
- PromiseConnection.prototype[func] = (function factory(funcName) {
- return function() {
- return core.Connection.prototype[funcName].apply(
- this.connection,
- arguments
- );
- };
- })(func);
- }
- }
- })([
- // synchronous functions
- 'close',
- 'createBinlogStream',
- 'destroy',
- 'escape',
- 'escapeId',
- 'format',
- 'pause',
- 'pipe',
- 'resume',
- 'unprepare'
- ]);
- class PromisePoolConnection extends PromiseConnection {
- constructor(connection, promiseImpl) {
- super(connection, promiseImpl);
- }
- destroy() {
- return core.PoolConnection.prototype.destroy.apply(
- this.connection,
- arguments
- );
- }
- }
- class PromisePool extends EventEmitter {
- constructor(pool, Promise) {
- super();
- this.pool = pool;
- this.Promise = Promise || global.Promise;
- inheritEvents(pool, this, ['acquire', 'connection', 'enqueue', 'release']);
- }
- getConnection() {
- const corePool = this.pool;
- return new this.Promise((resolve, reject) => {
- corePool.getConnection((err, coreConnection) => {
- if (err) {
- reject(err);
- } else {
- resolve(new PromisePoolConnection(coreConnection, this.Promise));
- }
- });
- });
- }
- query(sql, args) {
- const corePool = this.pool;
- const localErr = new Error();
- return new this.Promise((resolve, reject) => {
- const done = makeDoneCb(resolve, reject, localErr);
- if (args) {
- corePool.query(sql, args, done);
- } else {
- corePool.query(sql, done);
- }
- });
- }
- execute(sql, values) {
- const corePool = this.pool;
- const localErr = new Error();
- return new this.Promise((resolve, reject) => {
- corePool.execute(sql, values, makeDoneCb(resolve, reject, localErr));
- });
- }
- end() {
- const corePool = this.pool;
- const localErr = new Error();
- return new this.Promise((resolve, reject) => {
- corePool.end(err => {
- if (err) {
- localErr.message = err.message;
- localErr.code = err.code;
- localErr.errno = err.errno;
- localErr.sqlState = err.sqlState;
- localErr.sqlMessage = err.sqlMessage;
- reject(localErr);
- } else {
- resolve();
- }
- });
- });
- }
- }
- function createPool(opts) {
- const corePool = core.createPool(opts);
- const Promise = opts.Promise || global.Promise;
- if (!Promise) {
- throw new Error(
- 'no Promise implementation available.' +
- 'Use promise-enabled node version or pass userland Promise' +
- " implementation as parameter, for example: { Promise: require('bluebird') }"
- );
- }
- return new PromisePool(corePool, Promise);
- }
- (function(functionsToWrap) {
- for (let i = 0; functionsToWrap && i < functionsToWrap.length; i++) {
- const func = functionsToWrap[i];
- if (
- typeof core.Pool.prototype[func] === 'function' &&
- PromisePool.prototype[func] === undefined
- ) {
- PromisePool.prototype[func] = (function factory(funcName) {
- return function() {
- return core.Pool.prototype[funcName].apply(this.pool, arguments);
- };
- })(func);
- }
- }
- })([
- // synchronous functions
- 'escape',
- 'escapeId',
- 'format'
- ]);
- exports.createConnection = createConnection;
- exports.createPool = createPool;
- exports.escape = core.escape;
- exports.escapeId = core.escapeId;
- exports.format = core.format;
- exports.raw = core.raw;
- exports.PromisePool = PromisePool;
- exports.PromiseConnection = PromiseConnection;
- exports.PromisePoolConnection = PromisePoolConnection;
|