123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886 |
- 'use strict';
- const Net = require('net');
- const Tls = require('tls');
- const Timers = require('timers');
- const EventEmitter = require('events').EventEmitter;
- const Readable = require('stream').Readable;
- const Queue = require('denque');
- const SqlString = require('sqlstring');
- const LRU = require('lru-cache');
- const PacketParser = require('./packet_parser.js');
- const Packets = require('./packets/index.js');
- const Commands = require('./commands/index.js');
- const ConnectionConfig = require('./connection_config.js');
- const CharsetToEncoding = require('./constants/charset_encodings.js');
- let _connectionId = 0;
- let convertNamedPlaceholders = null;
- class Connection extends EventEmitter {
- constructor(opts) {
- super();
- this.config = opts.config;
- // TODO: fill defaults
- // if no params, connect to /var/lib/mysql/mysql.sock ( /tmp/mysql.sock on OSX )
- // if host is given, connect to host:3306
- // TODO: use `/usr/local/mysql/bin/mysql_config --socket` output? as default socketPath
- // if there is no host/port and no socketPath parameters?
- if (!opts.config.stream) {
- if (opts.config.socketPath) {
- this.stream = Net.connect(opts.config.socketPath);
- } else {
- this.stream = Net.connect(
- opts.config.port,
- opts.config.host
- );
- // Enable keep-alive on the socket. It's disabled by default, but the
- // user can enable it and supply an initial delay.
- this.stream.setKeepAlive(true, this.config.keepAliveInitialDelay);
- }
- // if stream is a function, treat it as "stream agent / factory"
- } else if (typeof opts.config.stream === 'function') {
- this.stream = opts.config.stream(opts);
- } else {
- this.stream = opts.config.stream;
- }
- this._internalId = _connectionId++;
- this._commands = new Queue();
- this._command = null;
- this._paused = false;
- this._paused_packets = new Queue();
- this._statements = new LRU({
- max: this.config.maxPreparedStatements,
- dispose: function(key, statement) {
- statement.close();
- }
- });
- this.serverCapabilityFlags = 0;
- this.authorized = false;
- this.sequenceId = 0;
- this.compressedSequenceId = 0;
- this.threadId = null;
- this._handshakePacket = null;
- this._fatalError = null;
- this._protocolError = null;
- this._outOfOrderPackets = [];
- this.clientEncoding = CharsetToEncoding[this.config.charsetNumber];
- this.stream.on('error', this._handleNetworkError.bind(this));
- // see https://gist.github.com/khoomeister/4985691#use-that-instead-of-bind
- this.packetParser = new PacketParser(p => {
- this.handlePacket(p);
- });
- this.stream.on('data', data => {
- if (this.connectTimeout) {
- Timers.clearTimeout(this.connectTimeout);
- this.connectTimeout = null;
- }
- this.packetParser.execute(data);
- });
- this.stream.on('close', () => {
- // we need to set this flag everywhere where we want connection to close
- if (this._closing) {
- return;
- }
- if (!this._protocolError) {
- // no particular error message before disconnect
- this._protocolError = new Error(
- 'Connection lost: The server closed the connection.'
- );
- this._protocolError.fatal = true;
- this._protocolError.code = 'PROTOCOL_CONNECTION_LOST';
- }
- this._notifyError(this._protocolError);
- });
- let handshakeCommand;
- if (!this.config.isServer) {
- handshakeCommand = new Commands.ClientHandshake(this.config.clientFlags);
- handshakeCommand.on('end', () => {
- // this happens when handshake finishes early and first packet is error
- // and not server hello ( for example, 'Too many connactions' error)
- if (!handshakeCommand.handshake) {
- return;
- }
- this._handshakePacket = handshakeCommand.handshake;
- this.threadId = handshakeCommand.handshake.connectionId;
- this.emit('connect', handshakeCommand.handshake);
- });
- handshakeCommand.on('error', err => {
- this._closing = true;
- this._notifyError(err);
- });
- this.addCommand(handshakeCommand);
- }
- // in case there was no initiall handshake but we need to read sting, assume it utf-8
- // most common example: "Too many connections" error ( packet is sent immediately on connection attempt, we don't know server encoding yet)
- // will be overwrittedn with actial encoding value as soon as server handshake packet is received
- this.serverEncoding = 'utf8';
- if (this.config.connectTimeout) {
- const timeoutHandler = this._handleTimeoutError.bind(this);
- this.connectTimeout = Timers.setTimeout(
- timeoutHandler,
- this.config.connectTimeout
- );
- }
- }
- promise(promiseImpl) {
- const PromiseConnection = require('../promise').PromiseConnection;
- return new PromiseConnection(this, promiseImpl);
- }
- _addCommandClosedState(cmd) {
- const err = new Error(
- "Can't add new command when connection is in closed state"
- );
- err.fatal = true;
- if (cmd.onResult) {
- cmd.onResult(err);
- } else {
- this.emit('error', err);
- }
- }
- _handleFatalError(err) {
- err.fatal = true;
- // stop receiving packets
- this.stream.removeAllListeners('data');
- this.addCommand = this._addCommandClosedState;
- this.write = () => {
- this.emit('error', new Error("Can't write in closed state"));
- };
- this._notifyError(err);
- this._fatalError = err;
- }
- _handleNetworkError(err) {
- if (this.connectTimeout) {
- Timers.clearTimeout(this.connectTimeout);
- this.connectTimeout = null;
- }
- // Do not throw an error when a connection ends with a RST,ACK packet
- if (err.errno === 'ECONNRESET' && this._closing) {
- return;
- }
- this._handleFatalError(err);
- }
- _handleTimeoutError() {
- if (this.connectTimeout) {
- Timers.clearTimeout(this.connectTimeout);
- this.connectTimeout = null;
- }
- this.stream.destroy && this.stream.destroy();
- const err = new Error('connect ETIMEDOUT');
- err.errorno = 'ETIMEDOUT';
- err.code = 'ETIMEDOUT';
- err.syscall = 'connect';
- this._handleNetworkError(err);
- }
- // notify all commands in the queue and bubble error as connection "error"
- // called on stream error or unexpected termination
- _notifyError(err) {
- // prevent from emitting 'PROTOCOL_CONNECTION_LOST' after EPIPE or ECONNRESET
- if (this._fatalError) {
- return;
- }
- let command;
- // if there is no active command, notify connection
- // if there are commands and all of them have callbacks, pass error via callback
- let bubbleErrorToConnection = !this._command;
- if (this._command && this._command.onResult) {
- this._command.onResult(err);
- this._command = null;
- // connection handshake is special because we allow it to be implicit
- // if error happened during handshake, but there are others commands in queue
- // then bubble error to other commands and not to connection
- } else if (
- !(
- this._command &&
- this._command.constructor === Commands.ClientHandshake &&
- this._commands.length > 0
- )
- ) {
- bubbleErrorToConnection = true;
- }
- while ((command = this._commands.shift())) {
- if (command.onResult) {
- command.onResult(err);
- } else {
- bubbleErrorToConnection = true;
- }
- }
- // notify connection if some comands in the queue did not have callbacks
- // or if this is pool connection ( so it can be removed from pool )
- if (bubbleErrorToConnection || this._pool) {
- this.emit('error', err);
- }
- }
- write(buffer) {
- this.stream.write(buffer, err => {
- if (err) {
- this._handleNetworkError(err);
- }
- });
- }
- // http://dev.mysql.com/doc/internals/en/sequence-id.html
- //
- // The sequence-id is incremented with each packet and may wrap around.
- // It starts at 0 and is reset to 0 when a new command
- // begins in the Command Phase.
- // http://dev.mysql.com/doc/internals/en/example-several-mysql-packets.html
- _resetSequenceId() {
- this.sequenceId = 0;
- this.compressedSequenceId = 0;
- }
- _bumpCompressedSequenceId(numPackets) {
- this.compressedSequenceId += numPackets;
- this.compressedSequenceId %= 256;
- }
- _bumpSequenceId(numPackets) {
- this.sequenceId += numPackets;
- this.sequenceId %= 256;
- }
- writePacket(packet) {
- const MAX_PACKET_LENGTH = 16777215;
- const length = packet.length();
- let chunk, offset, header;
- if (length < MAX_PACKET_LENGTH) {
- packet.writeHeader(this.sequenceId);
- if (this.config.debug) {
- // eslint-disable-next-line no-console
- console.log(
- `${this._internalId} ${this.connectionId} <== ${this._command._commandName}#${this._command.stateName()}(${[this.sequenceId, packet._name, packet.length()].join(',')})`
- );
- // eslint-disable-next-line no-console
- console.log(
- `${this._internalId} ${this.connectionId} <== ${packet.buffer.toString('hex')}`
- );
- }
- this._bumpSequenceId(1);
- this.write(packet.buffer);
- } else {
- if (this.config.debug) {
- // eslint-disable-next-line no-console
- console.log(
- `${this._internalId} ${this.connectionId} <== Writing large packet, raw content not written:`
- );
- // eslint-disable-next-line no-console
- console.log(
- `${this._internalId} ${this.connectionId} <== ${this._command._commandName}#${this._command.stateName()}(${[this.sequenceId, packet._name, packet.length()].join(',')})`
- );
- }
- for (offset = 4; offset < 4 + length; offset += MAX_PACKET_LENGTH) {
- chunk = packet.buffer.slice(offset, offset + MAX_PACKET_LENGTH);
- if (chunk.length === MAX_PACKET_LENGTH) {
- header = Buffer.from([0xff, 0xff, 0xff, this.sequenceId]);
- } else {
- header = Buffer.from([
- chunk.length & 0xff,
- (chunk.length >> 8) & 0xff,
- (chunk.length >> 16) & 0xff,
- this.sequenceId
- ]);
- }
- this._bumpSequenceId(1);
- this.write(header);
- this.write(chunk);
- }
- }
- }
- // 0.11+ environment
- startTLS(onSecure) {
- if (this.config.debug) {
- // eslint-disable-next-line no-console
- console.log('Upgrading connection to TLS');
- }
- const secureContext = Tls.createSecureContext({
- ca: this.config.ssl.ca,
- cert: this.config.ssl.cert,
- ciphers: this.config.ssl.ciphers,
- key: this.config.ssl.key,
- passphrase: this.config.ssl.passphrase,
- minVersion: this.config.ssl.minVersion
- });
- const rejectUnauthorized = this.config.ssl.rejectUnauthorized;
- let secureEstablished = false;
- const secureSocket = new Tls.TLSSocket(this.stream, {
- rejectUnauthorized: rejectUnauthorized,
- requestCert: true,
- secureContext: secureContext,
- isServer: false
- });
- // error handler for secure socket
- secureSocket.on('_tlsError', err => {
- if (secureEstablished) {
- this._handleNetworkError(err);
- } else {
- onSecure(err);
- }
- });
- secureSocket.on('secure', () => {
- secureEstablished = true;
- onSecure(rejectUnauthorized ? secureSocket.ssl.verifyError() : null);
- });
- secureSocket.on('data', data => {
- this.packetParser.execute(data);
- });
- this.write = buffer => {
- secureSocket.write(buffer);
- };
- // start TLS communications
- secureSocket._start();
- }
- pipe() {
- if (this.stream instanceof Net.Stream) {
- this.stream.ondata = (data, start, end) => {
- this.packetParser.execute(data, start, end);
- };
- } else {
- this.stream.on('data', data => {
- this.packetParser.execute(
- data.parent,
- data.offset,
- data.offset + data.length
- );
- });
- }
- }
- protocolError(message, code) {
- const err = new Error(message);
- err.fatal = true;
- err.code = code || 'PROTOCOL_ERROR';
- this.emit('error', err);
- }
- handlePacket(packet) {
- if (this._paused) {
- this._paused_packets.push(packet);
- return;
- }
- if (packet) {
- if (this.sequenceId !== packet.sequenceId) {
- const err = new Error(
- `Warning: got packets out of order. Expected ${this.sequenceId} but received ${packet.sequenceId}`
- );
- err.expected = this.sequenceId;
- err.received = packet.sequenceId;
- this.emit('warn', err); // REVIEW
- // eslint-disable-next-line no-console
- console.error(err.message);
- }
- this._bumpSequenceId(packet.numPackets);
- }
- if (this.config.debug) {
- if (packet) {
- // eslint-disable-next-line no-console
- console.log(
- ` raw: ${packet.buffer
- .slice(packet.offset, packet.offset + packet.length())
- .toString('hex')}`
- );
- // eslint-disable-next-line no-console
- console.trace();
- const commandName = this._command
- ? this._command._commandName
- : '(no command)';
- const stateName = this._command
- ? this._command.stateName()
- : '(no command)';
- // eslint-disable-next-line no-console
- console.log(
- `${this._internalId} ${this.connectionId} ==> ${commandName}#${stateName}(${[packet.sequenceId, packet.type(), packet.length()].join(',')})`
- );
- }
- }
- if (!this._command) {
- this.protocolError(
- 'Unexpected packet while no commands in the queue',
- 'PROTOCOL_UNEXPECTED_PACKET'
- );
- this.close();
- return;
- }
- const done = this._command.execute(packet, this);
- if (done) {
- this._command = this._commands.shift();
- if (this._command) {
- this.sequenceId = 0;
- this.compressedSequenceId = 0;
- this.handlePacket();
- }
- }
- }
- addCommand(cmd) {
- // this.compressedSequenceId = 0;
- // this.sequenceId = 0;
- if (this.config.debug) {
- const commandName = cmd.constructor.name;
- // eslint-disable-next-line no-console
- console.log(`Add command: ${commandName}`);
- cmd._commandName = commandName;
- }
- if (!this._command) {
- this._command = cmd;
- this.handlePacket();
- } else {
- this._commands.push(cmd);
- }
- return cmd;
- }
- format(sql, values) {
- if (typeof this.config.queryFormat === 'function') {
- return this.config.queryFormat.call(
- this,
- sql,
- values,
- this.config.timezone
- );
- }
- const opts = {
- sql: sql,
- values: values
- };
- this._resolveNamedPlaceholders(opts);
- return SqlString.format(
- opts.sql,
- opts.values,
- this.config.stringifyObjects,
- this.config.timezone
- );
- }
- escape(value) {
- return SqlString.escape(value, false, this.config.timezone);
- }
- escapeId(value) {
- return SqlString.escapeId(value, false);
- }
- raw(sql) {
- return SqlString.raw(sql);
- }
- _resolveNamedPlaceholders(options) {
- let unnamed;
- if (this.config.namedPlaceholders || options.namedPlaceholders) {
- if (convertNamedPlaceholders === null) {
- convertNamedPlaceholders = require('named-placeholders')();
- }
- unnamed = convertNamedPlaceholders(options.sql, options.values);
- options.sql = unnamed[0];
- options.values = unnamed[1];
- }
- }
- query(sql, values, cb) {
- let cmdQuery;
- if (sql.constructor === Commands.Query) {
- cmdQuery = sql;
- } else {
- cmdQuery = Connection.createQuery(sql, values, cb, this.config);
- }
- this._resolveNamedPlaceholders(cmdQuery);
- const rawSql = this.format(cmdQuery.sql, cmdQuery.values || []);
- cmdQuery.sql = rawSql;
- return this.addCommand(cmdQuery);
- }
- pause() {
- this._paused = true;
- this.stream.pause();
- }
- resume() {
- let packet;
- this._paused = false;
- while ((packet = this._paused_packets.shift())) {
- this.handlePacket(packet);
- // don't resume if packet hander paused connection
- if (this._paused) {
- return;
- }
- }
- this.stream.resume();
- }
- // TODO: named placeholders support
- prepare(options, cb) {
- if (typeof options === 'string') {
- options = { sql: options };
- }
- return this.addCommand(new Commands.Prepare(options, cb));
- }
- unprepare(sql) {
- let options = {};
- if (typeof sql === 'object') {
- options = sql;
- } else {
- options.sql = sql;
- }
- const key = Connection.statementKey(options);
- const stmt = this._statements.get(key);
- if (stmt) {
- this._statements.del(key);
- stmt.close();
- }
- return stmt;
- }
- execute(sql, values, cb) {
- let options = {};
- if (typeof sql === 'object') {
- // execute(options, cb)
- options = sql;
- if (typeof values === 'function') {
- cb = values;
- } else {
- options.values = options.values || values;
- }
- } else if (typeof values === 'function') {
- // execute(sql, cb)
- cb = values;
- options.sql = sql;
- options.values = undefined;
- } else {
- // execute(sql, values, cb)
- options.sql = sql;
- options.values = values;
- }
- this._resolveNamedPlaceholders(options);
- // check for values containing undefined
- if (options.values) {
- //If namedPlaceholder is not enabled and object is passed as bind parameters
- if (!Array.isArray(options.values)) {
- throw new TypeError(
- 'Bind parameters must be array if namedPlaceholders parameter is not enabled'
- );
- }
- options.values.forEach(val => {
- //If namedPlaceholder is not enabled and object is passed as bind parameters
- if (!Array.isArray(options.values)) {
- throw new TypeError(
- 'Bind parameters must be array if namedPlaceholders parameter is not enabled'
- );
- }
- if (val === undefined) {
- throw new TypeError(
- 'Bind parameters must not contain undefined. To pass SQL NULL specify JS null'
- );
- }
- if (typeof val === 'function') {
- throw new TypeError(
- 'Bind parameters must not contain function(s). To pass the body of a function as a string call .toString() first'
- );
- }
- });
- }
- const executeCommand = new Commands.Execute(options, cb);
- const prepareCommand = new Commands.Prepare(options, (err, stmt) => {
- if (err) {
- // skip execute command if prepare failed, we have main
- // combined callback here
- executeCommand.start = function() {
- return null;
- };
- if (cb) {
- cb(err);
- } else {
- executeCommand.emit('error', err);
- }
- executeCommand.emit('end');
- return;
- }
- executeCommand.statement = stmt;
- });
- this.addCommand(prepareCommand);
- this.addCommand(executeCommand);
- return executeCommand;
- }
- changeUser(options, callback) {
- if (!callback && typeof options === 'function') {
- callback = options;
- options = {};
- }
- const charsetNumber = options.charset
- ? ConnectionConfig.getCharsetNumber(options.charset)
- : this.config.charsetNumber;
- return this.addCommand(
- new Commands.ChangeUser(
- {
- user: options.user || this.config.user,
- password: options.password || this.config.password,
- passwordSha1: options.passwordSha1 || this.config.passwordSha1,
- database: options.database || this.config.database,
- timeout: options.timeout,
- charsetNumber: charsetNumber,
- currentConfig: this.config
- },
- err => {
- if (err) {
- err.fatal = true;
- }
- if (callback) {
- callback(err);
- }
- }
- )
- );
- }
- // transaction helpers
- beginTransaction(cb) {
- return this.query('START TRANSACTION', cb);
- }
- commit(cb) {
- return this.query('COMMIT', cb);
- }
- rollback(cb) {
- return this.query('ROLLBACK', cb);
- }
- ping(cb) {
- return this.addCommand(new Commands.Ping(cb));
- }
- _registerSlave(opts, cb) {
- return this.addCommand(new Commands.RegisterSlave(opts, cb));
- }
- _binlogDump(opts, cb) {
- return this.addCommand(new Commands.BinlogDump(opts, cb));
- }
- // currently just alias to close
- destroy() {
- this.close();
- }
- close() {
- if (this.connectTimeout) {
- Timers.clearTimeout(this.connectTimeout);
- this.connectTimeout = null;
- }
- this._closing = true;
- this.stream.end();
- this.addCommand = this._addCommandClosedState;
- }
- createBinlogStream(opts) {
- // TODO: create proper stream class
- // TODO: use through2
- let test = 1;
- const stream = new Readable({ objectMode: true });
- stream._read = function() {
- return {
- data: test++
- };
- };
- this._registerSlave(opts, () => {
- const dumpCmd = this._binlogDump(opts);
- dumpCmd.on('event', ev => {
- stream.push(ev);
- });
- dumpCmd.on('eof', () => {
- stream.push(null);
- // if non-blocking, then close stream to prevent errors
- if (opts.flags && opts.flags & 0x01) {
- this.close();
- }
- });
- // TODO: pipe errors as well
- });
- return stream;
- }
- connect(cb) {
- if (!cb) {
- return;
- }
- let connectCalled = 0;
- function callbackOnce(isErrorHandler) {
- return function(param) {
- if (!connectCalled) {
- if (isErrorHandler) {
- cb(param);
- } else {
- cb(null, param);
- }
- }
- connectCalled = 1;
- };
- }
- this.once('error', callbackOnce(true));
- this.once('connect', callbackOnce(false));
- }
- // ===================================
- // outgoing server connection methods
- // ===================================
- writeColumns(columns) {
- this.writePacket(Packets.ResultSetHeader.toPacket(columns.length));
- columns.forEach(column => {
- this.writePacket(
- Packets.ColumnDefinition.toPacket(column, this.serverConfig.encoding)
- );
- });
- this.writeEof();
- }
- // row is array of columns, not hash
- writeTextRow(column) {
- this.writePacket(
- Packets.TextRow.toPacket(column, this.serverConfig.encoding)
- );
- }
- writeTextResult(rows, columns) {
- this.writeColumns(columns);
- rows.forEach(row => {
- const arrayRow = new Array(columns.length);
- columns.forEach(column => {
- arrayRow.push(row[column.name]);
- });
- this.writeTextRow(arrayRow);
- });
- this.writeEof();
- }
- writeEof(warnings, statusFlags) {
- this.writePacket(Packets.EOF.toPacket(warnings, statusFlags));
- }
- writeOk(args) {
- if (!args) {
- args = { affectedRows: 0 };
- }
- this.writePacket(Packets.OK.toPacket(args, this.serverConfig.encoding));
- }
- writeError(args) {
- // if we want to send error before initial hello was sent, use default encoding
- const encoding = this.serverConfig ? this.serverConfig.encoding : 'cesu8';
- this.writePacket(Packets.Error.toPacket(args, encoding));
- }
- serverHandshake(args) {
- this.serverConfig = args;
- this.serverConfig.encoding =
- CharsetToEncoding[this.serverConfig.characterSet];
- return this.addCommand(new Commands.ServerHandshake(args));
- }
- // ===============================================================
- end(callback) {
- if (this.config.isServer) {
- this._closing = true;
- const quitCmd = new EventEmitter();
- setImmediate(() => {
- this.stream.end();
- quitCmd.emit('end');
- });
- return quitCmd;
- }
- // trigger error if more commands enqueued after end command
- const quitCmd = this.addCommand(new Commands.Quit(callback));
- this.addCommand = this._addCommandClosedState;
- return quitCmd;
- }
- static createQuery(sql, values, cb, config) {
- let options = {
- rowsAsArray: config.rowsAsArray
- };
- if (typeof sql === 'object') {
- // query(options, cb)
- options = sql;
- if (typeof values === 'function') {
- cb = values;
- } else if (values !== undefined) {
- options.values = values;
- }
- } else if (typeof values === 'function') {
- // query(sql, cb)
- cb = values;
- options.sql = sql;
- options.values = undefined;
- } else {
- // query(sql, values, cb)
- options.sql = sql;
- options.values = values;
- }
- return new Commands.Query(options, cb);
- }
- static statementKey(options) {
- return (
- `${typeof options.nestTables}/${options.nestTables}/${options.rowsAsArray}${options.sql}`
- );
- }
- }
- if (Tls.TLSSocket) {
- // not supported
- } else {
- Connection.prototype.startTLS = function _startTLS(onSecure) {
- if (this.config.debug) {
- // eslint-disable-next-line no-console
- console.log('Upgrading connection to TLS');
- }
- const crypto = require('crypto');
- const config = this.config;
- const stream = this.stream;
- const rejectUnauthorized = this.config.ssl.rejectUnauthorized;
- const credentials = crypto.createCredentials({
- key: config.ssl.key,
- cert: config.ssl.cert,
- passphrase: config.ssl.passphrase,
- ca: config.ssl.ca,
- ciphers: config.ssl.ciphers
- });
- const securePair = Tls.createSecurePair(
- credentials,
- false,
- true,
- rejectUnauthorized
- );
- if (stream.ondata) {
- stream.ondata = null;
- }
- stream.removeAllListeners('data');
- stream.pipe(securePair.encrypted);
- securePair.encrypted.pipe(stream);
- securePair.cleartext.on('data', data => {
- this.packetParser.execute(data);
- });
- this.write = function(buffer) {
- securePair.cleartext.write(buffer);
- };
- securePair.on('secure', () => {
- onSecure(rejectUnauthorized ? securePair.ssl.verifyError() : null);
- });
- };
- }
- module.exports = Connection;
|