batch-rewrite.js 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370
  1. 'use strict';
  2. const CommonText = require('./common-text-cmd');
  3. const Errors = require('../misc/errors');
  4. const Parse = require('../misc/parse');
  5. const RewritePacket = require('../io/rewrite-packet');
  6. const QUOTE = 0x27;
  7. /**
  8. * Protocol COM_QUERY
  9. * see : https://mariadb.com/kb/en/library/com_query/
  10. */
  11. class BatchRewrite extends CommonText {
  12. constructor(resolve, reject, options, connOpts, sql, values) {
  13. super(resolve, reject, options, connOpts, sql, values);
  14. }
  15. /**
  16. * Send COM_QUERY
  17. *
  18. * @param out output writer
  19. * @param opts connection options
  20. * @param info connection information
  21. */
  22. start(out, opts, info) {
  23. this.sending = true;
  24. this.info = info;
  25. if (this.opts.timeout) {
  26. const err = Errors.createError(
  27. 'Cannot use timeout for Batch statement',
  28. false,
  29. info,
  30. 'HY000',
  31. Errors.ER_TIMEOUT_NOT_SUPPORTED
  32. );
  33. this.emit('send_end');
  34. this.throwError(err, info);
  35. return;
  36. }
  37. if (this.initialValues.length === 0) this.initialValues = [[]];
  38. if (this.opts.namedPlaceholders) {
  39. this.parseResults = Parse.splitRewritableNamedParameterQuery(this.sql, this.initialValues);
  40. this.values = this.parseResults.values;
  41. } else {
  42. this.parseResults = Parse.splitRewritableQuery(this.sql);
  43. this.values = this.initialValues;
  44. if (!this.validateParameters(info)) {
  45. this.sending = false;
  46. return;
  47. }
  48. }
  49. out.startPacket(this);
  50. this.packet = new RewritePacket(
  51. this.opts.maxAllowedPacket,
  52. out,
  53. this.parseResults.partList[0],
  54. this.parseResults.partList[this.parseResults.partList.length - 1]
  55. );
  56. this.onPacketReceive = this.readResponsePacket;
  57. this.valueIdx = 0;
  58. this.sendQueries();
  59. }
  60. sendQueries() {
  61. let flushed = false;
  62. while (!flushed && this.sending && this.valueIdx < this.values.length) {
  63. this.valueRow = this.values[this.valueIdx++];
  64. //********************************************
  65. // send params
  66. //********************************************
  67. const len = this.parseResults.partList.length - 3;
  68. for (let i = 0; i < len; i++) {
  69. const value = this.valueRow[i];
  70. flushed = this.packet.writeString(this.parseResults.partList[i + 1]) || flushed;
  71. if (value === null) {
  72. flushed = this.packet.writeStringAscii('NULL') || flushed;
  73. continue;
  74. }
  75. if (
  76. typeof value === 'object' &&
  77. typeof value.pipe === 'function' &&
  78. typeof value.read === 'function'
  79. ) {
  80. //********************************************
  81. // param is stream,
  82. // now all params will be written by event
  83. //********************************************
  84. this.registerStreamSendEvent(this.packet, this.info);
  85. this.currentParam = i;
  86. this.packet.writeInt8(QUOTE); //'
  87. value.on(
  88. 'data',
  89. function (chunk) {
  90. this.packet.writeBufferEscape(chunk);
  91. }.bind(this)
  92. );
  93. value.on(
  94. 'end',
  95. function () {
  96. this.packet.writeInt8(QUOTE); //'
  97. this.currentParam++;
  98. this.paramWritten();
  99. }.bind(this)
  100. );
  101. return;
  102. } else {
  103. //********************************************
  104. // param isn't stream. directly write in buffer
  105. //********************************************
  106. flushed = this.writeParam(this.packet, value, this.opts, this.info) || flushed;
  107. }
  108. }
  109. this.packet.writeString(this.parseResults.partList[this.parseResults.partList.length - 2]);
  110. this.packet.mark(!this.parseResults.reWritable || this.valueIdx === this.values.length);
  111. }
  112. if (this.valueIdx < this.values.length && !this.packet.haveErrorResponse) {
  113. //there is still data to send
  114. setImmediate(this.sendQueries.bind(this));
  115. } else {
  116. if (this.sending && this.valueIdx === this.values.length) this.emit('send_end');
  117. this.sending = false;
  118. }
  119. }
  120. displaySql() {
  121. if (this.opts && this.initialValues) {
  122. if (this.sql.length > this.opts.debugLen) {
  123. return 'sql: ' + this.sql.substring(0, this.opts.debugLen) + '...';
  124. }
  125. let sqlMsg = 'sql: ' + this.sql + ' - parameters:';
  126. sqlMsg += '[';
  127. for (let i = 0; i < this.initialValues.length; i++) {
  128. if (i !== 0) sqlMsg += ',';
  129. let param = this.initialValues[i];
  130. sqlMsg = this.logParameters(sqlMsg, param);
  131. if (sqlMsg.length > this.opts.debugLen) {
  132. sqlMsg = sqlMsg.substr(0, this.opts.debugLen) + '...';
  133. break;
  134. }
  135. }
  136. sqlMsg += ']';
  137. return sqlMsg;
  138. }
  139. return 'sql: ' + this.sql + ' - parameters:[]';
  140. }
  141. success(val) {
  142. this.packet.waitingResponseNo--;
  143. if (this.packet.haveErrorResponse) {
  144. if (!this.sending && this.packet.waitingResponseNo === 0) {
  145. this.packet = null;
  146. this.onPacketReceive = null;
  147. this.resolve = null;
  148. this._columns = null;
  149. this._rows = null;
  150. process.nextTick(this.reject, this.firstError);
  151. this.reject = null;
  152. this.emit('end', this.firstError);
  153. }
  154. } else {
  155. if (!this.sending && this.packet.waitingResponseNo === 0) {
  156. if (this.parseResults.reWritable) {
  157. this.packet = null;
  158. let totalAffectedRows = 0;
  159. this._rows.forEach((row) => {
  160. totalAffectedRows += row.affectedRows;
  161. });
  162. const rs = {
  163. affectedRows: totalAffectedRows,
  164. insertId: this._rows[0].insertId,
  165. warningStatus: this._rows[this._rows.length - 1].warningStatus
  166. };
  167. this.successEnd(rs);
  168. return;
  169. } else {
  170. this.successEnd(this._rows);
  171. }
  172. this._columns = null;
  173. this._rows = null;
  174. return;
  175. }
  176. this._responseIndex++;
  177. this.onPacketReceive = this.readResponsePacket;
  178. }
  179. }
  180. throwError(err, info) {
  181. this.packet.waitingResponseNo--;
  182. this.sending = false;
  183. if (this.packet && !this.packet.haveErrorResponse) {
  184. if (err.fatal) {
  185. this.packet.waitingResponseNo = 0;
  186. }
  187. if (this.stack) {
  188. err = Errors.createError(
  189. err.message,
  190. err.fatal,
  191. info,
  192. err.sqlState,
  193. err.errno,
  194. this.stack,
  195. false
  196. );
  197. }
  198. this.firstError = err;
  199. this.packet.endedWithError();
  200. }
  201. if (!this.sending && this.packet.waitingResponseNo === 0) {
  202. this.packet = null;
  203. this.onPacketReceive = null;
  204. this.resolve = null;
  205. process.nextTick(this.reject, this.firstError);
  206. this.reject = null;
  207. this.emit('end', this.firstError);
  208. } else {
  209. this._responseIndex++;
  210. this.onPacketReceive = this.readResponsePacket;
  211. }
  212. }
  213. /**
  214. * Validate that parameters exists and are defined.
  215. *
  216. * @param info connection info
  217. * @returns {boolean} return false if any error occur.
  218. */
  219. validateParameters(info) {
  220. //validate parameter size.
  221. for (let r = 0; r < this.values.length; r++) {
  222. let val = this.values[r];
  223. if (!Array.isArray(val)) {
  224. val = [val];
  225. this.values[r] = val;
  226. }
  227. if (this.parseResults.partList.length - 3 > val.length) {
  228. this.emit('send_end');
  229. this.throwNewError(
  230. 'Parameter at position ' +
  231. val.length +
  232. ' is not set for values ' +
  233. r +
  234. '\n' +
  235. this.displaySql(),
  236. false,
  237. info,
  238. 'HY000',
  239. Errors.ER_MISSING_PARAMETER
  240. );
  241. return false;
  242. }
  243. //validate parameter is defined.
  244. for (let i = 0; i < this.parseResults.partList.length - 3; i++) {
  245. if (val[i] === undefined) {
  246. this.emit('send_end');
  247. this.throwNewError(
  248. 'Parameter at position ' +
  249. (i + 1) +
  250. ' is undefined for values ' +
  251. r +
  252. '\n' +
  253. this.displaySql(),
  254. false,
  255. info,
  256. 'HY000',
  257. Errors.ER_PARAMETER_UNDEFINED
  258. );
  259. return false;
  260. }
  261. }
  262. }
  263. return true;
  264. }
  265. /**
  266. * Define params events.
  267. * Each parameter indicate that he is written to socket,
  268. * emitting event so next parameter can be written.
  269. */
  270. registerStreamSendEvent(packet, info) {
  271. this.paramWritten = function () {
  272. let flushed = false;
  273. while (!flushed) {
  274. if (this.packet.haveErrorResponse) {
  275. this.sending = false;
  276. this.emit('send_end');
  277. return;
  278. }
  279. if (this.currentParam === this.valueRow.length) {
  280. // all parameters from row are written.
  281. flushed =
  282. packet.writeString(this.parseResults.partList[this.parseResults.partList.length - 2]) ||
  283. flushed;
  284. flushed =
  285. packet.mark(!this.parseResults.reWritable || this.valueIdx === this.values.length) ||
  286. flushed;
  287. if (this.valueIdx < this.values.length) {
  288. // still remaining rows
  289. this.valueRow = this.values[this.valueIdx++];
  290. this.currentParam = 0;
  291. } else {
  292. // all rows are written
  293. this.sending = false;
  294. this.emit('send_end');
  295. return;
  296. }
  297. }
  298. flushed = packet.writeString(this.parseResults.partList[this.currentParam + 1]) || flushed;
  299. const value = this.valueRow[this.currentParam];
  300. if (value === null) {
  301. flushed = packet.writeStringAscii('NULL') || flushed;
  302. this.currentParam++;
  303. continue;
  304. }
  305. if (
  306. typeof value === 'object' &&
  307. typeof value.pipe === 'function' &&
  308. typeof value.read === 'function'
  309. ) {
  310. //********************************************
  311. // param is stream,
  312. //********************************************
  313. flushed = packet.writeInt8(QUOTE) || flushed;
  314. value.once(
  315. 'end',
  316. function () {
  317. packet.writeInt8(QUOTE);
  318. this.currentParam++;
  319. this.paramWritten();
  320. }.bind(this)
  321. );
  322. value.on('data', function (chunk) {
  323. packet.writeBufferEscape(chunk);
  324. });
  325. return;
  326. }
  327. //********************************************
  328. // param isn't stream. directly write in buffer
  329. //********************************************
  330. flushed = this.writeParam(packet, value, this.opts, info) || flushed;
  331. this.currentParam++;
  332. }
  333. if (this.sending) setImmediate(this.paramWritten.bind(this));
  334. }.bind(this);
  335. }
  336. }
  337. module.exports = BatchRewrite;