batch-bulk.js 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. 'use strict';
  2. const CommonBinary = require('./common-binary-cmd');
  3. const Errors = require('../misc/errors');
  4. const Parse = require('../misc/parse');
  5. const BulkPacket = require('../io/bulk-packet');
  6. /**
  7. * Protocol COM_STMT_BULK_EXECUTE
  8. * see : https://mariadb.com/kb/en/library/com_stmt_bulk_execute/
  9. */
  10. class BatchBulk extends CommonBinary {
  11. constructor(resolve, reject, options, connOpts, sql, values) {
  12. super(resolve, reject, options, connOpts, sql, values);
  13. this.onPacketReceive = this.readPrepareResultPacket;
  14. }
  15. /**
  16. * Send COM_STMT_BULK_EXECUTE
  17. *
  18. * @param out output writer
  19. * @param opts connection options
  20. * @param info connection information
  21. */
  22. start(out, opts, info) {
  23. this.sending = true;
  24. this.info = info;
  25. this.values = this.initialValues;
  26. if (this.opts.timeout) {
  27. const err = Errors.createError(
  28. 'Cannot use timeout for Batch statement',
  29. false,
  30. info,
  31. 'HY000',
  32. Errors.ER_TIMEOUT_NOT_SUPPORTED
  33. );
  34. this.emit('send_end');
  35. this.throwError(err, info);
  36. return;
  37. }
  38. let questionMarkSql = this.sql;
  39. if (this.opts.namedPlaceholders) {
  40. const res = Parse.searchPlaceholder(
  41. this.sql,
  42. info,
  43. this.initialValues,
  44. this.displaySql.bind(this)
  45. );
  46. questionMarkSql = res.sql;
  47. this.values = res.values;
  48. }
  49. if (!this.validateParameters(info)) {
  50. this.sending = false;
  51. return;
  52. }
  53. //send COM_STMT_PREPARE command
  54. this.out = out;
  55. this.packet = new BulkPacket(this.opts, out, this.values[0]);
  56. out.startPacket(this);
  57. out.writeInt8(0x16);
  58. out.writeString(questionMarkSql);
  59. out.flushBuffer(true);
  60. if (this.opts.pipelining) {
  61. out.startPacket(this);
  62. this.valueIdx = 0;
  63. this.sendQueries();
  64. } else {
  65. this.out = out;
  66. }
  67. }
  68. sendQueries() {
  69. let flushed = false;
  70. while (!flushed && this.sending && this.valueIdx < this.values.length) {
  71. this.valueRow = this.values[this.valueIdx++];
  72. //********************************************
  73. // send params
  74. //********************************************
  75. const len = this.valueRow.length;
  76. for (let i = 0; i < len; i++) {
  77. const value = this.valueRow[i];
  78. if (value === null) {
  79. flushed = this.packet.writeInt8(0x01) || flushed;
  80. continue;
  81. }
  82. //********************************************
  83. // param has no stream. directly write in buffer
  84. //********************************************
  85. flushed = this.writeParam(this.packet, value, this.opts, this.info) || flushed;
  86. }
  87. const last = this.valueIdx === this.values.length;
  88. flushed = this.packet.mark(last, last ? null : this.values[this.valueIdx]) || flushed;
  89. }
  90. if (this.valueIdx < this.values.length && !this.packet.haveErrorResponse) {
  91. //there is still data to send
  92. setImmediate(this.sendQueries.bind(this));
  93. } else {
  94. if (this.sending && this.valueIdx === this.values.length) this.emit('send_end');
  95. this.sending = false;
  96. }
  97. }
  98. displaySql() {
  99. if (this.opts && this.initialValues) {
  100. if (this.sql.length > this.opts.debugLen) {
  101. return 'sql: ' + this.sql.substring(0, this.opts.debugLen) + '...';
  102. }
  103. let sqlMsg = 'sql: ' + this.sql + ' - parameters:';
  104. sqlMsg += '[';
  105. for (let i = 0; i < this.initialValues.length; i++) {
  106. if (i !== 0) sqlMsg += ',';
  107. let param = this.initialValues[i];
  108. sqlMsg = this.logParameters(sqlMsg, param);
  109. if (sqlMsg.length > this.opts.debugLen) {
  110. sqlMsg = sqlMsg.substr(0, this.opts.debugLen) + '...';
  111. break;
  112. }
  113. }
  114. sqlMsg += ']';
  115. return sqlMsg;
  116. }
  117. return 'sql: ' + this.sql + ' - parameters:[]';
  118. }
  119. success(val) {
  120. this.packet.waitingResponseNo--;
  121. if (!this.opts.pipelining && this.packet.statementId === -1) {
  122. this.packet.statementId = this.statementId;
  123. this.out.startPacket(this);
  124. this.valueIdx = 0;
  125. this.sendQueries();
  126. this._responseIndex++;
  127. this.onPacketReceive = this.readResponsePacket;
  128. return;
  129. }
  130. if (!this.sending && this.packet.waitingResponseNo === 0) {
  131. //send COM_STMT_CLOSE packet
  132. if (!this.firstError || !this.firstError.fatal) {
  133. this.sequenceNo = -1;
  134. this.compressSequenceNo = -1;
  135. this.out.startPacket(this);
  136. this.out.writeInt8(0x19);
  137. this.out.writeInt32(this.statementId);
  138. this.out.flushBuffer(true);
  139. }
  140. this.sending = false;
  141. this.emit('send_end');
  142. if (this.packet.haveErrorResponse) {
  143. this.packet = null;
  144. this.resolve = null;
  145. this.onPacketReceive = null;
  146. this._columns = null;
  147. this._rows = null;
  148. process.nextTick(this.reject, this.firstError);
  149. this.reject = null;
  150. this.emit('end', this.firstError);
  151. } else {
  152. this.packet = null;
  153. let totalAffectedRows = 0;
  154. this._rows.forEach((row) => {
  155. totalAffectedRows += row.affectedRows;
  156. });
  157. const rs = {
  158. affectedRows: totalAffectedRows,
  159. insertId: this._rows[0].insertId,
  160. warningStatus: this._rows[this._rows.length - 1].warningStatus
  161. };
  162. this.successEnd(rs);
  163. this._columns = null;
  164. this._rows = null;
  165. }
  166. return;
  167. }
  168. if (!this.packet.haveErrorResponse) {
  169. this._responseIndex++;
  170. this.onPacketReceive = this.readResponsePacket;
  171. }
  172. }
  173. throwError(err, info) {
  174. this.packet.waitingResponseNo--;
  175. this.sending = false;
  176. if (this.packet && !this.packet.haveErrorResponse) {
  177. if (err.fatal) {
  178. this.packet.waitingResponseNo = 0;
  179. }
  180. if (this.stack) {
  181. err = Errors.createError(
  182. err.message,
  183. err.fatal,
  184. info,
  185. err.sqlState,
  186. err.errno,
  187. this.stack,
  188. false
  189. );
  190. }
  191. this.firstError = err;
  192. this.packet.endedWithError();
  193. }
  194. if (!this.sending && this.packet.waitingResponseNo === 0) {
  195. this.resolve = null;
  196. //send COM_STMT_CLOSE packet
  197. if (!err.fatal && this.statementId) {
  198. this.sequenceNo = -1;
  199. this.compressSequenceNo = -1;
  200. this.out.startPacket(this);
  201. this.out.writeInt8(0x19);
  202. this.out.writeInt32(this.statementId);
  203. this.out.flushBuffer(true);
  204. }
  205. this.emit('send_end');
  206. process.nextTick(this.reject, this.firstError);
  207. this.reject = null;
  208. this.onPacketReceive = null;
  209. this.emit('end', this.firstError);
  210. } else {
  211. this._responseIndex++;
  212. this.onPacketReceive = this.readResponsePacket;
  213. }
  214. }
  215. /**
  216. * Validate that parameters exists and are defined.
  217. *
  218. * @param info connection info
  219. * @returns {boolean} return false if any error occur.
  220. */
  221. validateParameters(info) {
  222. //validate parameter size.
  223. for (let r = 0; r < this.values.length; r++) {
  224. if (!Array.isArray(this.values[r])) this.values[r] = [this.values[r]];
  225. //validate parameter is defined.
  226. for (let i = 0; i < this.values[r].length; i++) {
  227. if (this.values[r][i] === undefined) {
  228. this.emit('send_end');
  229. this.throwNewError(
  230. 'Parameter at position ' +
  231. (i + 1) +
  232. ' is undefined for values ' +
  233. r +
  234. '\n' +
  235. this.displaySql(),
  236. false,
  237. info,
  238. 'HY000',
  239. Errors.ER_PARAMETER_UNDEFINED
  240. );
  241. return false;
  242. }
  243. }
  244. }
  245. return true;
  246. }
  247. }
  248. module.exports = BatchBulk;