connection.js 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886
  1. 'use strict';
  2. const Net = require('net');
  3. const Tls = require('tls');
  4. const Timers = require('timers');
  5. const EventEmitter = require('events').EventEmitter;
  6. const Readable = require('stream').Readable;
  7. const Queue = require('denque');
  8. const SqlString = require('sqlstring');
  9. const LRU = require('lru-cache');
  10. const PacketParser = require('./packet_parser.js');
  11. const Packets = require('./packets/index.js');
  12. const Commands = require('./commands/index.js');
  13. const ConnectionConfig = require('./connection_config.js');
  14. const CharsetToEncoding = require('./constants/charset_encodings.js');
  15. let _connectionId = 0;
  16. let convertNamedPlaceholders = null;
  17. class Connection extends EventEmitter {
  18. constructor(opts) {
  19. super();
  20. this.config = opts.config;
  21. // TODO: fill defaults
  22. // if no params, connect to /var/lib/mysql/mysql.sock ( /tmp/mysql.sock on OSX )
  23. // if host is given, connect to host:3306
  24. // TODO: use `/usr/local/mysql/bin/mysql_config --socket` output? as default socketPath
  25. // if there is no host/port and no socketPath parameters?
  26. if (!opts.config.stream) {
  27. if (opts.config.socketPath) {
  28. this.stream = Net.connect(opts.config.socketPath);
  29. } else {
  30. this.stream = Net.connect(
  31. opts.config.port,
  32. opts.config.host
  33. );
  34. // Enable keep-alive on the socket. It's disabled by default, but the
  35. // user can enable it and supply an initial delay.
  36. this.stream.setKeepAlive(true, this.config.keepAliveInitialDelay);
  37. }
  38. // if stream is a function, treat it as "stream agent / factory"
  39. } else if (typeof opts.config.stream === 'function') {
  40. this.stream = opts.config.stream(opts);
  41. } else {
  42. this.stream = opts.config.stream;
  43. }
  44. this._internalId = _connectionId++;
  45. this._commands = new Queue();
  46. this._command = null;
  47. this._paused = false;
  48. this._paused_packets = new Queue();
  49. this._statements = new LRU({
  50. max: this.config.maxPreparedStatements,
  51. dispose: function(key, statement) {
  52. statement.close();
  53. }
  54. });
  55. this.serverCapabilityFlags = 0;
  56. this.authorized = false;
  57. this.sequenceId = 0;
  58. this.compressedSequenceId = 0;
  59. this.threadId = null;
  60. this._handshakePacket = null;
  61. this._fatalError = null;
  62. this._protocolError = null;
  63. this._outOfOrderPackets = [];
  64. this.clientEncoding = CharsetToEncoding[this.config.charsetNumber];
  65. this.stream.on('error', this._handleNetworkError.bind(this));
  66. // see https://gist.github.com/khoomeister/4985691#use-that-instead-of-bind
  67. this.packetParser = new PacketParser(p => {
  68. this.handlePacket(p);
  69. });
  70. this.stream.on('data', data => {
  71. if (this.connectTimeout) {
  72. Timers.clearTimeout(this.connectTimeout);
  73. this.connectTimeout = null;
  74. }
  75. this.packetParser.execute(data);
  76. });
  77. this.stream.on('close', () => {
  78. // we need to set this flag everywhere where we want connection to close
  79. if (this._closing) {
  80. return;
  81. }
  82. if (!this._protocolError) {
  83. // no particular error message before disconnect
  84. this._protocolError = new Error(
  85. 'Connection lost: The server closed the connection.'
  86. );
  87. this._protocolError.fatal = true;
  88. this._protocolError.code = 'PROTOCOL_CONNECTION_LOST';
  89. }
  90. this._notifyError(this._protocolError);
  91. });
  92. let handshakeCommand;
  93. if (!this.config.isServer) {
  94. handshakeCommand = new Commands.ClientHandshake(this.config.clientFlags);
  95. handshakeCommand.on('end', () => {
  96. // this happens when handshake finishes early and first packet is error
  97. // and not server hello ( for example, 'Too many connactions' error)
  98. if (!handshakeCommand.handshake) {
  99. return;
  100. }
  101. this._handshakePacket = handshakeCommand.handshake;
  102. this.threadId = handshakeCommand.handshake.connectionId;
  103. this.emit('connect', handshakeCommand.handshake);
  104. });
  105. handshakeCommand.on('error', err => {
  106. this._closing = true;
  107. this._notifyError(err);
  108. });
  109. this.addCommand(handshakeCommand);
  110. }
  111. // in case there was no initiall handshake but we need to read sting, assume it utf-8
  112. // most common example: "Too many connections" error ( packet is sent immediately on connection attempt, we don't know server encoding yet)
  113. // will be overwrittedn with actial encoding value as soon as server handshake packet is received
  114. this.serverEncoding = 'utf8';
  115. if (this.config.connectTimeout) {
  116. const timeoutHandler = this._handleTimeoutError.bind(this);
  117. this.connectTimeout = Timers.setTimeout(
  118. timeoutHandler,
  119. this.config.connectTimeout
  120. );
  121. }
  122. }
  123. promise(promiseImpl) {
  124. const PromiseConnection = require('../promise').PromiseConnection;
  125. return new PromiseConnection(this, promiseImpl);
  126. }
  127. _addCommandClosedState(cmd) {
  128. const err = new Error(
  129. "Can't add new command when connection is in closed state"
  130. );
  131. err.fatal = true;
  132. if (cmd.onResult) {
  133. cmd.onResult(err);
  134. } else {
  135. this.emit('error', err);
  136. }
  137. }
  138. _handleFatalError(err) {
  139. err.fatal = true;
  140. // stop receiving packets
  141. this.stream.removeAllListeners('data');
  142. this.addCommand = this._addCommandClosedState;
  143. this.write = () => {
  144. this.emit('error', new Error("Can't write in closed state"));
  145. };
  146. this._notifyError(err);
  147. this._fatalError = err;
  148. }
  149. _handleNetworkError(err) {
  150. if (this.connectTimeout) {
  151. Timers.clearTimeout(this.connectTimeout);
  152. this.connectTimeout = null;
  153. }
  154. // Do not throw an error when a connection ends with a RST,ACK packet
  155. if (err.errno === 'ECONNRESET' && this._closing) {
  156. return;
  157. }
  158. this._handleFatalError(err);
  159. }
  160. _handleTimeoutError() {
  161. if (this.connectTimeout) {
  162. Timers.clearTimeout(this.connectTimeout);
  163. this.connectTimeout = null;
  164. }
  165. this.stream.destroy && this.stream.destroy();
  166. const err = new Error('connect ETIMEDOUT');
  167. err.errorno = 'ETIMEDOUT';
  168. err.code = 'ETIMEDOUT';
  169. err.syscall = 'connect';
  170. this._handleNetworkError(err);
  171. }
  172. // notify all commands in the queue and bubble error as connection "error"
  173. // called on stream error or unexpected termination
  174. _notifyError(err) {
  175. // prevent from emitting 'PROTOCOL_CONNECTION_LOST' after EPIPE or ECONNRESET
  176. if (this._fatalError) {
  177. return;
  178. }
  179. let command;
  180. // if there is no active command, notify connection
  181. // if there are commands and all of them have callbacks, pass error via callback
  182. let bubbleErrorToConnection = !this._command;
  183. if (this._command && this._command.onResult) {
  184. this._command.onResult(err);
  185. this._command = null;
  186. // connection handshake is special because we allow it to be implicit
  187. // if error happened during handshake, but there are others commands in queue
  188. // then bubble error to other commands and not to connection
  189. } else if (
  190. !(
  191. this._command &&
  192. this._command.constructor === Commands.ClientHandshake &&
  193. this._commands.length > 0
  194. )
  195. ) {
  196. bubbleErrorToConnection = true;
  197. }
  198. while ((command = this._commands.shift())) {
  199. if (command.onResult) {
  200. command.onResult(err);
  201. } else {
  202. bubbleErrorToConnection = true;
  203. }
  204. }
  205. // notify connection if some comands in the queue did not have callbacks
  206. // or if this is pool connection ( so it can be removed from pool )
  207. if (bubbleErrorToConnection || this._pool) {
  208. this.emit('error', err);
  209. }
  210. }
  211. write(buffer) {
  212. this.stream.write(buffer, err => {
  213. if (err) {
  214. this._handleNetworkError(err);
  215. }
  216. });
  217. }
  218. // http://dev.mysql.com/doc/internals/en/sequence-id.html
  219. //
  220. // The sequence-id is incremented with each packet and may wrap around.
  221. // It starts at 0 and is reset to 0 when a new command
  222. // begins in the Command Phase.
  223. // http://dev.mysql.com/doc/internals/en/example-several-mysql-packets.html
  224. _resetSequenceId() {
  225. this.sequenceId = 0;
  226. this.compressedSequenceId = 0;
  227. }
  228. _bumpCompressedSequenceId(numPackets) {
  229. this.compressedSequenceId += numPackets;
  230. this.compressedSequenceId %= 256;
  231. }
  232. _bumpSequenceId(numPackets) {
  233. this.sequenceId += numPackets;
  234. this.sequenceId %= 256;
  235. }
  236. writePacket(packet) {
  237. const MAX_PACKET_LENGTH = 16777215;
  238. const length = packet.length();
  239. let chunk, offset, header;
  240. if (length < MAX_PACKET_LENGTH) {
  241. packet.writeHeader(this.sequenceId);
  242. if (this.config.debug) {
  243. // eslint-disable-next-line no-console
  244. console.log(
  245. `${this._internalId} ${this.connectionId} <== ${this._command._commandName}#${this._command.stateName()}(${[this.sequenceId, packet._name, packet.length()].join(',')})`
  246. );
  247. // eslint-disable-next-line no-console
  248. console.log(
  249. `${this._internalId} ${this.connectionId} <== ${packet.buffer.toString('hex')}`
  250. );
  251. }
  252. this._bumpSequenceId(1);
  253. this.write(packet.buffer);
  254. } else {
  255. if (this.config.debug) {
  256. // eslint-disable-next-line no-console
  257. console.log(
  258. `${this._internalId} ${this.connectionId} <== Writing large packet, raw content not written:`
  259. );
  260. // eslint-disable-next-line no-console
  261. console.log(
  262. `${this._internalId} ${this.connectionId} <== ${this._command._commandName}#${this._command.stateName()}(${[this.sequenceId, packet._name, packet.length()].join(',')})`
  263. );
  264. }
  265. for (offset = 4; offset < 4 + length; offset += MAX_PACKET_LENGTH) {
  266. chunk = packet.buffer.slice(offset, offset + MAX_PACKET_LENGTH);
  267. if (chunk.length === MAX_PACKET_LENGTH) {
  268. header = Buffer.from([0xff, 0xff, 0xff, this.sequenceId]);
  269. } else {
  270. header = Buffer.from([
  271. chunk.length & 0xff,
  272. (chunk.length >> 8) & 0xff,
  273. (chunk.length >> 16) & 0xff,
  274. this.sequenceId
  275. ]);
  276. }
  277. this._bumpSequenceId(1);
  278. this.write(header);
  279. this.write(chunk);
  280. }
  281. }
  282. }
  283. // 0.11+ environment
  284. startTLS(onSecure) {
  285. if (this.config.debug) {
  286. // eslint-disable-next-line no-console
  287. console.log('Upgrading connection to TLS');
  288. }
  289. const secureContext = Tls.createSecureContext({
  290. ca: this.config.ssl.ca,
  291. cert: this.config.ssl.cert,
  292. ciphers: this.config.ssl.ciphers,
  293. key: this.config.ssl.key,
  294. passphrase: this.config.ssl.passphrase,
  295. minVersion: this.config.ssl.minVersion
  296. });
  297. const rejectUnauthorized = this.config.ssl.rejectUnauthorized;
  298. let secureEstablished = false;
  299. const secureSocket = new Tls.TLSSocket(this.stream, {
  300. rejectUnauthorized: rejectUnauthorized,
  301. requestCert: true,
  302. secureContext: secureContext,
  303. isServer: false
  304. });
  305. // error handler for secure socket
  306. secureSocket.on('_tlsError', err => {
  307. if (secureEstablished) {
  308. this._handleNetworkError(err);
  309. } else {
  310. onSecure(err);
  311. }
  312. });
  313. secureSocket.on('secure', () => {
  314. secureEstablished = true;
  315. onSecure(rejectUnauthorized ? secureSocket.ssl.verifyError() : null);
  316. });
  317. secureSocket.on('data', data => {
  318. this.packetParser.execute(data);
  319. });
  320. this.write = buffer => {
  321. secureSocket.write(buffer);
  322. };
  323. // start TLS communications
  324. secureSocket._start();
  325. }
  326. pipe() {
  327. if (this.stream instanceof Net.Stream) {
  328. this.stream.ondata = (data, start, end) => {
  329. this.packetParser.execute(data, start, end);
  330. };
  331. } else {
  332. this.stream.on('data', data => {
  333. this.packetParser.execute(
  334. data.parent,
  335. data.offset,
  336. data.offset + data.length
  337. );
  338. });
  339. }
  340. }
  341. protocolError(message, code) {
  342. const err = new Error(message);
  343. err.fatal = true;
  344. err.code = code || 'PROTOCOL_ERROR';
  345. this.emit('error', err);
  346. }
  347. handlePacket(packet) {
  348. if (this._paused) {
  349. this._paused_packets.push(packet);
  350. return;
  351. }
  352. if (packet) {
  353. if (this.sequenceId !== packet.sequenceId) {
  354. const err = new Error(
  355. `Warning: got packets out of order. Expected ${this.sequenceId} but received ${packet.sequenceId}`
  356. );
  357. err.expected = this.sequenceId;
  358. err.received = packet.sequenceId;
  359. this.emit('warn', err); // REVIEW
  360. // eslint-disable-next-line no-console
  361. console.error(err.message);
  362. }
  363. this._bumpSequenceId(packet.numPackets);
  364. }
  365. if (this.config.debug) {
  366. if (packet) {
  367. // eslint-disable-next-line no-console
  368. console.log(
  369. ` raw: ${packet.buffer
  370. .slice(packet.offset, packet.offset + packet.length())
  371. .toString('hex')}`
  372. );
  373. // eslint-disable-next-line no-console
  374. console.trace();
  375. const commandName = this._command
  376. ? this._command._commandName
  377. : '(no command)';
  378. const stateName = this._command
  379. ? this._command.stateName()
  380. : '(no command)';
  381. // eslint-disable-next-line no-console
  382. console.log(
  383. `${this._internalId} ${this.connectionId} ==> ${commandName}#${stateName}(${[packet.sequenceId, packet.type(), packet.length()].join(',')})`
  384. );
  385. }
  386. }
  387. if (!this._command) {
  388. this.protocolError(
  389. 'Unexpected packet while no commands in the queue',
  390. 'PROTOCOL_UNEXPECTED_PACKET'
  391. );
  392. this.close();
  393. return;
  394. }
  395. const done = this._command.execute(packet, this);
  396. if (done) {
  397. this._command = this._commands.shift();
  398. if (this._command) {
  399. this.sequenceId = 0;
  400. this.compressedSequenceId = 0;
  401. this.handlePacket();
  402. }
  403. }
  404. }
  405. addCommand(cmd) {
  406. // this.compressedSequenceId = 0;
  407. // this.sequenceId = 0;
  408. if (this.config.debug) {
  409. const commandName = cmd.constructor.name;
  410. // eslint-disable-next-line no-console
  411. console.log(`Add command: ${commandName}`);
  412. cmd._commandName = commandName;
  413. }
  414. if (!this._command) {
  415. this._command = cmd;
  416. this.handlePacket();
  417. } else {
  418. this._commands.push(cmd);
  419. }
  420. return cmd;
  421. }
  422. format(sql, values) {
  423. if (typeof this.config.queryFormat === 'function') {
  424. return this.config.queryFormat.call(
  425. this,
  426. sql,
  427. values,
  428. this.config.timezone
  429. );
  430. }
  431. const opts = {
  432. sql: sql,
  433. values: values
  434. };
  435. this._resolveNamedPlaceholders(opts);
  436. return SqlString.format(
  437. opts.sql,
  438. opts.values,
  439. this.config.stringifyObjects,
  440. this.config.timezone
  441. );
  442. }
  443. escape(value) {
  444. return SqlString.escape(value, false, this.config.timezone);
  445. }
  446. escapeId(value) {
  447. return SqlString.escapeId(value, false);
  448. }
  449. raw(sql) {
  450. return SqlString.raw(sql);
  451. }
  452. _resolveNamedPlaceholders(options) {
  453. let unnamed;
  454. if (this.config.namedPlaceholders || options.namedPlaceholders) {
  455. if (convertNamedPlaceholders === null) {
  456. convertNamedPlaceholders = require('named-placeholders')();
  457. }
  458. unnamed = convertNamedPlaceholders(options.sql, options.values);
  459. options.sql = unnamed[0];
  460. options.values = unnamed[1];
  461. }
  462. }
  463. query(sql, values, cb) {
  464. let cmdQuery;
  465. if (sql.constructor === Commands.Query) {
  466. cmdQuery = sql;
  467. } else {
  468. cmdQuery = Connection.createQuery(sql, values, cb, this.config);
  469. }
  470. this._resolveNamedPlaceholders(cmdQuery);
  471. const rawSql = this.format(cmdQuery.sql, cmdQuery.values || []);
  472. cmdQuery.sql = rawSql;
  473. return this.addCommand(cmdQuery);
  474. }
  475. pause() {
  476. this._paused = true;
  477. this.stream.pause();
  478. }
  479. resume() {
  480. let packet;
  481. this._paused = false;
  482. while ((packet = this._paused_packets.shift())) {
  483. this.handlePacket(packet);
  484. // don't resume if packet hander paused connection
  485. if (this._paused) {
  486. return;
  487. }
  488. }
  489. this.stream.resume();
  490. }
  491. // TODO: named placeholders support
  492. prepare(options, cb) {
  493. if (typeof options === 'string') {
  494. options = { sql: options };
  495. }
  496. return this.addCommand(new Commands.Prepare(options, cb));
  497. }
  498. unprepare(sql) {
  499. let options = {};
  500. if (typeof sql === 'object') {
  501. options = sql;
  502. } else {
  503. options.sql = sql;
  504. }
  505. const key = Connection.statementKey(options);
  506. const stmt = this._statements.get(key);
  507. if (stmt) {
  508. this._statements.del(key);
  509. stmt.close();
  510. }
  511. return stmt;
  512. }
  513. execute(sql, values, cb) {
  514. let options = {};
  515. if (typeof sql === 'object') {
  516. // execute(options, cb)
  517. options = sql;
  518. if (typeof values === 'function') {
  519. cb = values;
  520. } else {
  521. options.values = options.values || values;
  522. }
  523. } else if (typeof values === 'function') {
  524. // execute(sql, cb)
  525. cb = values;
  526. options.sql = sql;
  527. options.values = undefined;
  528. } else {
  529. // execute(sql, values, cb)
  530. options.sql = sql;
  531. options.values = values;
  532. }
  533. this._resolveNamedPlaceholders(options);
  534. // check for values containing undefined
  535. if (options.values) {
  536. //If namedPlaceholder is not enabled and object is passed as bind parameters
  537. if (!Array.isArray(options.values)) {
  538. throw new TypeError(
  539. 'Bind parameters must be array if namedPlaceholders parameter is not enabled'
  540. );
  541. }
  542. options.values.forEach(val => {
  543. //If namedPlaceholder is not enabled and object is passed as bind parameters
  544. if (!Array.isArray(options.values)) {
  545. throw new TypeError(
  546. 'Bind parameters must be array if namedPlaceholders parameter is not enabled'
  547. );
  548. }
  549. if (val === undefined) {
  550. throw new TypeError(
  551. 'Bind parameters must not contain undefined. To pass SQL NULL specify JS null'
  552. );
  553. }
  554. if (typeof val === 'function') {
  555. throw new TypeError(
  556. 'Bind parameters must not contain function(s). To pass the body of a function as a string call .toString() first'
  557. );
  558. }
  559. });
  560. }
  561. const executeCommand = new Commands.Execute(options, cb);
  562. const prepareCommand = new Commands.Prepare(options, (err, stmt) => {
  563. if (err) {
  564. // skip execute command if prepare failed, we have main
  565. // combined callback here
  566. executeCommand.start = function() {
  567. return null;
  568. };
  569. if (cb) {
  570. cb(err);
  571. } else {
  572. executeCommand.emit('error', err);
  573. }
  574. executeCommand.emit('end');
  575. return;
  576. }
  577. executeCommand.statement = stmt;
  578. });
  579. this.addCommand(prepareCommand);
  580. this.addCommand(executeCommand);
  581. return executeCommand;
  582. }
  583. changeUser(options, callback) {
  584. if (!callback && typeof options === 'function') {
  585. callback = options;
  586. options = {};
  587. }
  588. const charsetNumber = options.charset
  589. ? ConnectionConfig.getCharsetNumber(options.charset)
  590. : this.config.charsetNumber;
  591. return this.addCommand(
  592. new Commands.ChangeUser(
  593. {
  594. user: options.user || this.config.user,
  595. password: options.password || this.config.password,
  596. passwordSha1: options.passwordSha1 || this.config.passwordSha1,
  597. database: options.database || this.config.database,
  598. timeout: options.timeout,
  599. charsetNumber: charsetNumber,
  600. currentConfig: this.config
  601. },
  602. err => {
  603. if (err) {
  604. err.fatal = true;
  605. }
  606. if (callback) {
  607. callback(err);
  608. }
  609. }
  610. )
  611. );
  612. }
  613. // transaction helpers
  614. beginTransaction(cb) {
  615. return this.query('START TRANSACTION', cb);
  616. }
  617. commit(cb) {
  618. return this.query('COMMIT', cb);
  619. }
  620. rollback(cb) {
  621. return this.query('ROLLBACK', cb);
  622. }
  623. ping(cb) {
  624. return this.addCommand(new Commands.Ping(cb));
  625. }
  626. _registerSlave(opts, cb) {
  627. return this.addCommand(new Commands.RegisterSlave(opts, cb));
  628. }
  629. _binlogDump(opts, cb) {
  630. return this.addCommand(new Commands.BinlogDump(opts, cb));
  631. }
  632. // currently just alias to close
  633. destroy() {
  634. this.close();
  635. }
  636. close() {
  637. if (this.connectTimeout) {
  638. Timers.clearTimeout(this.connectTimeout);
  639. this.connectTimeout = null;
  640. }
  641. this._closing = true;
  642. this.stream.end();
  643. this.addCommand = this._addCommandClosedState;
  644. }
  645. createBinlogStream(opts) {
  646. // TODO: create proper stream class
  647. // TODO: use through2
  648. let test = 1;
  649. const stream = new Readable({ objectMode: true });
  650. stream._read = function() {
  651. return {
  652. data: test++
  653. };
  654. };
  655. this._registerSlave(opts, () => {
  656. const dumpCmd = this._binlogDump(opts);
  657. dumpCmd.on('event', ev => {
  658. stream.push(ev);
  659. });
  660. dumpCmd.on('eof', () => {
  661. stream.push(null);
  662. // if non-blocking, then close stream to prevent errors
  663. if (opts.flags && opts.flags & 0x01) {
  664. this.close();
  665. }
  666. });
  667. // TODO: pipe errors as well
  668. });
  669. return stream;
  670. }
  671. connect(cb) {
  672. if (!cb) {
  673. return;
  674. }
  675. let connectCalled = 0;
  676. function callbackOnce(isErrorHandler) {
  677. return function(param) {
  678. if (!connectCalled) {
  679. if (isErrorHandler) {
  680. cb(param);
  681. } else {
  682. cb(null, param);
  683. }
  684. }
  685. connectCalled = 1;
  686. };
  687. }
  688. this.once('error', callbackOnce(true));
  689. this.once('connect', callbackOnce(false));
  690. }
  691. // ===================================
  692. // outgoing server connection methods
  693. // ===================================
  694. writeColumns(columns) {
  695. this.writePacket(Packets.ResultSetHeader.toPacket(columns.length));
  696. columns.forEach(column => {
  697. this.writePacket(
  698. Packets.ColumnDefinition.toPacket(column, this.serverConfig.encoding)
  699. );
  700. });
  701. this.writeEof();
  702. }
  703. // row is array of columns, not hash
  704. writeTextRow(column) {
  705. this.writePacket(
  706. Packets.TextRow.toPacket(column, this.serverConfig.encoding)
  707. );
  708. }
  709. writeTextResult(rows, columns) {
  710. this.writeColumns(columns);
  711. rows.forEach(row => {
  712. const arrayRow = new Array(columns.length);
  713. columns.forEach(column => {
  714. arrayRow.push(row[column.name]);
  715. });
  716. this.writeTextRow(arrayRow);
  717. });
  718. this.writeEof();
  719. }
  720. writeEof(warnings, statusFlags) {
  721. this.writePacket(Packets.EOF.toPacket(warnings, statusFlags));
  722. }
  723. writeOk(args) {
  724. if (!args) {
  725. args = { affectedRows: 0 };
  726. }
  727. this.writePacket(Packets.OK.toPacket(args, this.serverConfig.encoding));
  728. }
  729. writeError(args) {
  730. // if we want to send error before initial hello was sent, use default encoding
  731. const encoding = this.serverConfig ? this.serverConfig.encoding : 'cesu8';
  732. this.writePacket(Packets.Error.toPacket(args, encoding));
  733. }
  734. serverHandshake(args) {
  735. this.serverConfig = args;
  736. this.serverConfig.encoding =
  737. CharsetToEncoding[this.serverConfig.characterSet];
  738. return this.addCommand(new Commands.ServerHandshake(args));
  739. }
  740. // ===============================================================
  741. end(callback) {
  742. if (this.config.isServer) {
  743. this._closing = true;
  744. const quitCmd = new EventEmitter();
  745. setImmediate(() => {
  746. this.stream.end();
  747. quitCmd.emit('end');
  748. });
  749. return quitCmd;
  750. }
  751. // trigger error if more commands enqueued after end command
  752. const quitCmd = this.addCommand(new Commands.Quit(callback));
  753. this.addCommand = this._addCommandClosedState;
  754. return quitCmd;
  755. }
  756. static createQuery(sql, values, cb, config) {
  757. let options = {
  758. rowsAsArray: config.rowsAsArray
  759. };
  760. if (typeof sql === 'object') {
  761. // query(options, cb)
  762. options = sql;
  763. if (typeof values === 'function') {
  764. cb = values;
  765. } else if (values !== undefined) {
  766. options.values = values;
  767. }
  768. } else if (typeof values === 'function') {
  769. // query(sql, cb)
  770. cb = values;
  771. options.sql = sql;
  772. options.values = undefined;
  773. } else {
  774. // query(sql, values, cb)
  775. options.sql = sql;
  776. options.values = values;
  777. }
  778. return new Commands.Query(options, cb);
  779. }
  780. static statementKey(options) {
  781. return (
  782. `${typeof options.nestTables}/${options.nestTables}/${options.rowsAsArray}${options.sql}`
  783. );
  784. }
  785. }
  786. if (Tls.TLSSocket) {
  787. // not supported
  788. } else {
  789. Connection.prototype.startTLS = function _startTLS(onSecure) {
  790. if (this.config.debug) {
  791. // eslint-disable-next-line no-console
  792. console.log('Upgrading connection to TLS');
  793. }
  794. const crypto = require('crypto');
  795. const config = this.config;
  796. const stream = this.stream;
  797. const rejectUnauthorized = this.config.ssl.rejectUnauthorized;
  798. const credentials = crypto.createCredentials({
  799. key: config.ssl.key,
  800. cert: config.ssl.cert,
  801. passphrase: config.ssl.passphrase,
  802. ca: config.ssl.ca,
  803. ciphers: config.ssl.ciphers
  804. });
  805. const securePair = Tls.createSecurePair(
  806. credentials,
  807. false,
  808. true,
  809. rejectUnauthorized
  810. );
  811. if (stream.ondata) {
  812. stream.ondata = null;
  813. }
  814. stream.removeAllListeners('data');
  815. stream.pipe(securePair.encrypted);
  816. securePair.encrypted.pipe(stream);
  817. securePair.cleartext.on('data', data => {
  818. this.packetParser.execute(data);
  819. });
  820. this.write = function(buffer) {
  821. securePair.cleartext.write(buffer);
  822. };
  823. securePair.on('secure', () => {
  824. onSecure(rejectUnauthorized ? securePair.ssl.verifyError() : null);
  825. });
  826. };
  827. }
  828. module.exports = Connection;