query.js 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. 'use strict';
  2. const Readable = require('stream').Readable;
  3. const Command = require('./command.js');
  4. const Packets = require('../packets/index.js');
  5. const getTextParser = require('../parsers/text_parser.js');
  6. const ServerStatus = require('../constants/server_status.js');
  7. const CharsetToEncoding = require('../constants/charset_encodings.js');
  8. const EmptyPacket = new Packets.Packet(0, Buffer.allocUnsafe(4), 0, 4);
  9. // http://dev.mysql.com/doc/internals/en/com-query.html
  10. class Query extends Command {
  11. constructor(options, callback) {
  12. super();
  13. this.sql = options.sql;
  14. this.values = options.values;
  15. this._queryOptions = options;
  16. this.namedPlaceholders = options.namedPlaceholders || false;
  17. this.onResult = callback;
  18. this._fieldCount = 0;
  19. this._rowParser = null;
  20. this._fields = [];
  21. this._rows = [];
  22. this._receivedFieldsCount = 0;
  23. this._resultIndex = 0;
  24. this._localStream = null;
  25. this._unpipeStream = function() {};
  26. this._streamFactory = options.infileStreamFactory;
  27. this._connection = null;
  28. }
  29. then() {
  30. const err =
  31. "You have tried to call .then(), .catch(), or invoked await on the result of query that is not a promise, which is a programming error. Try calling con.promise().query(), or require('mysql2/promise') instead of 'mysql2' for a promise-compatible version of the query interface. To learn how to use async/await or Promises check out documentation at https://www.npmjs.com/package/mysql2#using-promise-wrapper, or the mysql2 documentation at https://github.com/sidorares/node-mysql2/tree/master/documentation/Promise-Wrapper.md";
  32. // eslint-disable-next-line
  33. console.log(err);
  34. throw new Error(err);
  35. }
  36. start(packet, connection) {
  37. if (connection.config.debug) {
  38. // eslint-disable-next-line
  39. console.log(' Sending query command: %s', this.sql);
  40. }
  41. this._connection = connection;
  42. this.options = Object.assign({}, connection.config, this._queryOptions);
  43. const cmdPacket = new Packets.Query(
  44. this.sql,
  45. connection.config.charsetNumber
  46. );
  47. connection.writePacket(cmdPacket.toPacket(1));
  48. return Query.prototype.resultsetHeader;
  49. }
  50. done() {
  51. this._unpipeStream();
  52. if (this.onResult) {
  53. let rows, fields;
  54. if (this._resultIndex === 0) {
  55. rows = this._rows[0];
  56. fields = this._fields[0];
  57. } else {
  58. rows = this._rows;
  59. fields = this._fields;
  60. }
  61. if (fields) {
  62. process.nextTick(() => {
  63. this.onResult(null, rows, fields);
  64. });
  65. } else {
  66. process.nextTick(() => {
  67. this.onResult(null, rows);
  68. });
  69. }
  70. }
  71. return null;
  72. }
  73. doneInsert(rs) {
  74. if (this._localStreamError) {
  75. if (this.onResult) {
  76. this.onResult(this._localStreamError, rs);
  77. } else {
  78. this.emit('error', this._localStreamError);
  79. }
  80. return null;
  81. }
  82. this._rows.push(rs);
  83. this._fields.push(void 0);
  84. this.emit('fields', void 0);
  85. this.emit('result', rs);
  86. if (rs.serverStatus & ServerStatus.SERVER_MORE_RESULTS_EXISTS) {
  87. this._resultIndex++;
  88. return this.resultsetHeader;
  89. }
  90. return this.done();
  91. }
  92. resultsetHeader(packet, connection) {
  93. const rs = new Packets.ResultSetHeader(packet, connection);
  94. this._fieldCount = rs.fieldCount;
  95. if (connection.config.debug) {
  96. // eslint-disable-next-line
  97. console.log(
  98. ` Resultset header received, expecting ${rs.fieldCount} column definition packets`
  99. );
  100. }
  101. if (this._fieldCount === 0) {
  102. return this.doneInsert(rs);
  103. }
  104. if (this._fieldCount === null) {
  105. return this._streamLocalInfile(connection, rs.infileName);
  106. }
  107. this._receivedFieldsCount = 0;
  108. this._rows.push([]);
  109. this._fields.push([]);
  110. return this.readField;
  111. }
  112. _streamLocalInfile(connection, path) {
  113. if (this._streamFactory) {
  114. this._localStream = this._streamFactory(path);
  115. } else {
  116. this._localStreamError = new Error(
  117. `As a result of LOCAL INFILE command server wants to read ${path} file, but as of v2.0 you must provide streamFactory option returning ReadStream.`
  118. );
  119. connection.writePacket(EmptyPacket);
  120. return this.infileOk;
  121. }
  122. const onConnectionError = () => {
  123. this._unpipeStream();
  124. };
  125. const onDrain = () => {
  126. this._localStream.resume();
  127. };
  128. const onPause = () => {
  129. this._localStream.pause();
  130. };
  131. const onData = function(data) {
  132. const dataWithHeader = Buffer.allocUnsafe(data.length + 4);
  133. data.copy(dataWithHeader, 4);
  134. connection.writePacket(
  135. new Packets.Packet(0, dataWithHeader, 0, dataWithHeader.length)
  136. );
  137. };
  138. const onEnd = () => {
  139. connection.removeListener('error', onConnectionError);
  140. connection.writePacket(EmptyPacket);
  141. };
  142. const onError = err => {
  143. this._localStreamError = err;
  144. connection.removeListener('error', onConnectionError);
  145. connection.writePacket(EmptyPacket);
  146. };
  147. this._unpipeStream = () => {
  148. connection.stream.removeListener('pause', onPause);
  149. connection.stream.removeListener('drain', onDrain);
  150. this._localStream.removeListener('data', onData);
  151. this._localStream.removeListener('end', onEnd);
  152. this._localStream.removeListener('error', onError);
  153. };
  154. connection.stream.on('pause', onPause);
  155. connection.stream.on('drain', onDrain);
  156. this._localStream.on('data', onData);
  157. this._localStream.on('end', onEnd);
  158. this._localStream.on('error', onError);
  159. connection.once('error', onConnectionError);
  160. return this.infileOk;
  161. }
  162. readField(packet, connection) {
  163. this._receivedFieldsCount++;
  164. // Often there is much more data in the column definition than in the row itself
  165. // If you set manually _fields[0] to array of ColumnDefinition's (from previous call)
  166. // you can 'cache' result of parsing. Field packets still received, but ignored in that case
  167. // this is the reason _receivedFieldsCount exist (otherwise we could just use current length of fields array)
  168. if (this._fields[this._resultIndex].length !== this._fieldCount) {
  169. const field = new Packets.ColumnDefinition(
  170. packet,
  171. connection.clientEncoding
  172. );
  173. this._fields[this._resultIndex].push(field);
  174. if (connection.config.debug) {
  175. /* eslint-disable no-console */
  176. console.log(' Column definition:');
  177. console.log(` name: ${field.name}`);
  178. console.log(` type: ${field.columnType}`);
  179. console.log(` flags: ${field.flags}`);
  180. /* eslint-enable no-console */
  181. }
  182. }
  183. // last field received
  184. if (this._receivedFieldsCount === this._fieldCount) {
  185. const fields = this._fields[this._resultIndex];
  186. this.emit('fields', fields);
  187. this._rowParser = getTextParser(fields, this.options, connection.config);
  188. return Query.prototype.fieldsEOF;
  189. }
  190. return Query.prototype.readField;
  191. }
  192. fieldsEOF(packet, connection) {
  193. // check EOF
  194. if (!packet.isEOF()) {
  195. return connection.protocolError('Expected EOF packet');
  196. }
  197. return this.row;
  198. }
  199. row(packet) {
  200. if (packet.isEOF()) {
  201. const status = packet.eofStatusFlags();
  202. const moreResults = status & ServerStatus.SERVER_MORE_RESULTS_EXISTS;
  203. if (moreResults) {
  204. this._resultIndex++;
  205. return Query.prototype.resultsetHeader;
  206. }
  207. return this.done();
  208. }
  209. const row = new this._rowParser(
  210. packet,
  211. this._fields[this._resultIndex],
  212. this.options,
  213. CharsetToEncoding
  214. );
  215. if (this.onResult) {
  216. this._rows[this._resultIndex].push(row);
  217. } else {
  218. this.emit('result', row);
  219. }
  220. return Query.prototype.row;
  221. }
  222. infileOk(packet, connection) {
  223. const rs = new Packets.ResultSetHeader(packet, connection);
  224. return this.doneInsert(rs);
  225. }
  226. stream(options) {
  227. options = options || {};
  228. options.objectMode = true;
  229. const stream = new Readable(options);
  230. stream._read = () => {
  231. this._connection && this._connection.resume();
  232. };
  233. this.on('result', row => {
  234. if (!stream.push(row)) {
  235. this._connection.pause();
  236. }
  237. stream.emit('result', row); // replicate old emitter
  238. });
  239. this.on('error', err => {
  240. stream.emit('error', err); // Pass on any errors
  241. });
  242. this.on('end', () => {
  243. stream.push(null); // pushing null, indicating EOF
  244. stream.emit('close'); // notify readers that query has completed
  245. });
  246. this.on('fields', fields => {
  247. stream.emit('fields', fields); // replicate old emitter
  248. });
  249. return stream;
  250. }
  251. }
  252. Query.prototype.catch = Query.prototype.then;
  253. module.exports = Query;