123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370 |
- 'use strict';
- const CommonText = require('./common-text-cmd');
- const Errors = require('../misc/errors');
- const Parse = require('../misc/parse');
- const RewritePacket = require('../io/rewrite-packet');
- const QUOTE = 0x27;
- /**
- * Protocol COM_QUERY
- * see : https://mariadb.com/kb/en/library/com_query/
- */
- class BatchRewrite extends CommonText {
- constructor(resolve, reject, options, connOpts, sql, values) {
- super(resolve, reject, options, connOpts, sql, values);
- }
- /**
- * Send COM_QUERY
- *
- * @param out output writer
- * @param opts connection options
- * @param info connection information
- */
- start(out, opts, info) {
- this.sending = true;
- this.info = info;
- if (this.opts.timeout) {
- const err = Errors.createError(
- 'Cannot use timeout for Batch statement',
- false,
- info,
- 'HY000',
- Errors.ER_TIMEOUT_NOT_SUPPORTED
- );
- this.emit('send_end');
- this.throwError(err, info);
- return;
- }
- if (this.initialValues.length === 0) this.initialValues = [[]];
- if (this.opts.namedPlaceholders) {
- this.parseResults = Parse.splitRewritableNamedParameterQuery(this.sql, this.initialValues);
- this.values = this.parseResults.values;
- } else {
- this.parseResults = Parse.splitRewritableQuery(this.sql);
- this.values = this.initialValues;
- if (!this.validateParameters(info)) {
- this.sending = false;
- return;
- }
- }
- out.startPacket(this);
- this.packet = new RewritePacket(
- this.opts.maxAllowedPacket,
- out,
- this.parseResults.partList[0],
- this.parseResults.partList[this.parseResults.partList.length - 1]
- );
- this.onPacketReceive = this.readResponsePacket;
- this.valueIdx = 0;
- this.sendQueries();
- }
- sendQueries() {
- let flushed = false;
- while (!flushed && this.sending && this.valueIdx < this.values.length) {
- this.valueRow = this.values[this.valueIdx++];
- //********************************************
- // send params
- //********************************************
- const len = this.parseResults.partList.length - 3;
- for (let i = 0; i < len; i++) {
- const value = this.valueRow[i];
- flushed = this.packet.writeString(this.parseResults.partList[i + 1]) || flushed;
- if (value === null) {
- flushed = this.packet.writeStringAscii('NULL') || flushed;
- continue;
- }
- if (
- typeof value === 'object' &&
- typeof value.pipe === 'function' &&
- typeof value.read === 'function'
- ) {
- //********************************************
- // param is stream,
- // now all params will be written by event
- //********************************************
- this.registerStreamSendEvent(this.packet, this.info);
- this.currentParam = i;
- this.packet.writeInt8(QUOTE); //'
- value.on(
- 'data',
- function (chunk) {
- this.packet.writeBufferEscape(chunk);
- }.bind(this)
- );
- value.on(
- 'end',
- function () {
- this.packet.writeInt8(QUOTE); //'
- this.currentParam++;
- this.paramWritten();
- }.bind(this)
- );
- return;
- } else {
- //********************************************
- // param isn't stream. directly write in buffer
- //********************************************
- flushed = this.writeParam(this.packet, value, this.opts, this.info) || flushed;
- }
- }
- this.packet.writeString(this.parseResults.partList[this.parseResults.partList.length - 2]);
- this.packet.mark(!this.parseResults.reWritable || this.valueIdx === this.values.length);
- }
- if (this.valueIdx < this.values.length && !this.packet.haveErrorResponse) {
- //there is still data to send
- setImmediate(this.sendQueries.bind(this));
- } else {
- if (this.sending && this.valueIdx === this.values.length) this.emit('send_end');
- this.sending = false;
- }
- }
- displaySql() {
- if (this.opts && this.initialValues) {
- if (this.sql.length > this.opts.debugLen) {
- return 'sql: ' + this.sql.substring(0, this.opts.debugLen) + '...';
- }
- let sqlMsg = 'sql: ' + this.sql + ' - parameters:';
- sqlMsg += '[';
- for (let i = 0; i < this.initialValues.length; i++) {
- if (i !== 0) sqlMsg += ',';
- let param = this.initialValues[i];
- sqlMsg = this.logParameters(sqlMsg, param);
- if (sqlMsg.length > this.opts.debugLen) {
- sqlMsg = sqlMsg.substr(0, this.opts.debugLen) + '...';
- break;
- }
- }
- sqlMsg += ']';
- return sqlMsg;
- }
- return 'sql: ' + this.sql + ' - parameters:[]';
- }
- success(val) {
- this.packet.waitingResponseNo--;
- if (this.packet.haveErrorResponse) {
- if (!this.sending && this.packet.waitingResponseNo === 0) {
- this.packet = null;
- this.onPacketReceive = null;
- this.resolve = null;
- this._columns = null;
- this._rows = null;
- process.nextTick(this.reject, this.firstError);
- this.reject = null;
- this.emit('end', this.firstError);
- }
- } else {
- if (!this.sending && this.packet.waitingResponseNo === 0) {
- if (this.parseResults.reWritable) {
- this.packet = null;
- let totalAffectedRows = 0;
- this._rows.forEach((row) => {
- totalAffectedRows += row.affectedRows;
- });
- const rs = {
- affectedRows: totalAffectedRows,
- insertId: this._rows[0].insertId,
- warningStatus: this._rows[this._rows.length - 1].warningStatus
- };
- this.successEnd(rs);
- return;
- } else {
- this.successEnd(this._rows);
- }
- this._columns = null;
- this._rows = null;
- return;
- }
- this._responseIndex++;
- this.onPacketReceive = this.readResponsePacket;
- }
- }
- throwError(err, info) {
- this.packet.waitingResponseNo--;
- this.sending = false;
- if (this.packet && !this.packet.haveErrorResponse) {
- if (err.fatal) {
- this.packet.waitingResponseNo = 0;
- }
- if (this.stack) {
- err = Errors.createError(
- err.message,
- err.fatal,
- info,
- err.sqlState,
- err.errno,
- this.stack,
- false
- );
- }
- this.firstError = err;
- this.packet.endedWithError();
- }
- if (!this.sending && this.packet.waitingResponseNo === 0) {
- this.packet = null;
- this.onPacketReceive = null;
- this.resolve = null;
- process.nextTick(this.reject, this.firstError);
- this.reject = null;
- this.emit('end', this.firstError);
- } else {
- this._responseIndex++;
- this.onPacketReceive = this.readResponsePacket;
- }
- }
- /**
- * Validate that parameters exists and are defined.
- *
- * @param info connection info
- * @returns {boolean} return false if any error occur.
- */
- validateParameters(info) {
- //validate parameter size.
- for (let r = 0; r < this.values.length; r++) {
- let val = this.values[r];
- if (!Array.isArray(val)) {
- val = [val];
- this.values[r] = val;
- }
- if (this.parseResults.partList.length - 3 > val.length) {
- this.emit('send_end');
- this.throwNewError(
- 'Parameter at position ' +
- val.length +
- ' is not set for values ' +
- r +
- '\n' +
- this.displaySql(),
- false,
- info,
- 'HY000',
- Errors.ER_MISSING_PARAMETER
- );
- return false;
- }
- //validate parameter is defined.
- for (let i = 0; i < this.parseResults.partList.length - 3; i++) {
- if (val[i] === undefined) {
- this.emit('send_end');
- this.throwNewError(
- 'Parameter at position ' +
- (i + 1) +
- ' is undefined for values ' +
- r +
- '\n' +
- this.displaySql(),
- false,
- info,
- 'HY000',
- Errors.ER_PARAMETER_UNDEFINED
- );
- return false;
- }
- }
- }
- return true;
- }
- /**
- * Define params events.
- * Each parameter indicate that he is written to socket,
- * emitting event so next parameter can be written.
- */
- registerStreamSendEvent(packet, info) {
- this.paramWritten = function () {
- let flushed = false;
- while (!flushed) {
- if (this.packet.haveErrorResponse) {
- this.sending = false;
- this.emit('send_end');
- return;
- }
- if (this.currentParam === this.valueRow.length) {
- // all parameters from row are written.
- flushed =
- packet.writeString(this.parseResults.partList[this.parseResults.partList.length - 2]) ||
- flushed;
- flushed =
- packet.mark(!this.parseResults.reWritable || this.valueIdx === this.values.length) ||
- flushed;
- if (this.valueIdx < this.values.length) {
- // still remaining rows
- this.valueRow = this.values[this.valueIdx++];
- this.currentParam = 0;
- } else {
- // all rows are written
- this.sending = false;
- this.emit('send_end');
- return;
- }
- }
- flushed = packet.writeString(this.parseResults.partList[this.currentParam + 1]) || flushed;
- const value = this.valueRow[this.currentParam];
- if (value === null) {
- flushed = packet.writeStringAscii('NULL') || flushed;
- this.currentParam++;
- continue;
- }
- if (
- typeof value === 'object' &&
- typeof value.pipe === 'function' &&
- typeof value.read === 'function'
- ) {
- //********************************************
- // param is stream,
- //********************************************
- flushed = packet.writeInt8(QUOTE) || flushed;
- value.once(
- 'end',
- function () {
- packet.writeInt8(QUOTE);
- this.currentParam++;
- this.paramWritten();
- }.bind(this)
- );
- value.on('data', function (chunk) {
- packet.writeBufferEscape(chunk);
- });
- return;
- }
- //********************************************
- // param isn't stream. directly write in buffer
- //********************************************
- flushed = this.writeParam(packet, value, this.opts, info) || flushed;
- this.currentParam++;
- }
- if (this.sending) setImmediate(this.paramWritten.bind(this));
- }.bind(this);
- }
- }
- module.exports = BatchRewrite;
|