packet-input-stream.js 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. 'use strict';
  2. const PacketNodeEncoded = require('./packet-node-encoded');
  3. const PacketIconvEncoded = require('./packet-node-iconv');
  4. const Utils = require('../misc/utils');
  5. /**
  6. * MySQL packet parser
  7. * see : https://mariadb.com/kb/en/library/0-packet/
  8. */
  9. class PacketInputStream {
  10. constructor(unexpectedPacket, receiveQueue, out, opts, info) {
  11. this.unexpectedPacket = unexpectedPacket;
  12. this.opts = opts;
  13. this.receiveQueue = receiveQueue;
  14. this.info = info;
  15. this.out = out;
  16. //in case packet is not complete
  17. this.header = Buffer.allocUnsafe(4);
  18. this.headerLen = 0;
  19. this.packetLen = null;
  20. this.remainingLen = null;
  21. this.parts = null;
  22. this.partsTotalLen = 0;
  23. this.changeEncoding(this.opts.collation);
  24. this.changeDebug(this.opts.logPackets, this.opts.debug);
  25. this.opts.on('collation', this.changeEncoding.bind(this));
  26. this.opts.on('debug', this.changeDebug.bind(this));
  27. }
  28. changeEncoding(collation) {
  29. this.encoding = collation.charset;
  30. this.packetConstructor = Buffer.isEncoding(this.encoding)
  31. ? PacketNodeEncoded
  32. : PacketIconvEncoded;
  33. }
  34. changeDebug(logPackets, debug) {
  35. this.logPackets = logPackets;
  36. this.debug = debug;
  37. this.receivePacket =
  38. this.logPackets || this.debug ? this.receivePacketDebug : this.receivePacketBasic;
  39. }
  40. receivePacketDebug(packet) {
  41. let cmd = this.currentCmd();
  42. if (packet) {
  43. const packetStr = Utils.log(this.opts, packet.buf, packet.pos, packet.end, this.header);
  44. if (this.opts.logPackets) {
  45. this.info.addPacket(
  46. '<== conn:' +
  47. (this.info.threadId ? this.info.threadId : -1) +
  48. ' ' +
  49. (cmd
  50. ? cmd.onPacketReceive
  51. ? cmd.constructor.name + '.' + cmd.onPacketReceive.name
  52. : cmd.constructor.name
  53. : 'no command') +
  54. ' (' +
  55. packet.pos +
  56. ',' +
  57. packet.end +
  58. '))\n' +
  59. packetStr
  60. );
  61. }
  62. if (this.opts.debug) {
  63. console.log(
  64. '<== conn:%d %s (%d,%d)\n%s',
  65. this.info.threadId ? this.info.threadId : -1,
  66. cmd
  67. ? cmd.onPacketReceive
  68. ? cmd.constructor.name + '.' + cmd.onPacketReceive.name
  69. : cmd.constructor.name
  70. : 'no command',
  71. packet.pos,
  72. packet.end,
  73. packetStr
  74. );
  75. }
  76. }
  77. if (!cmd) {
  78. this.unexpectedPacket(packet);
  79. return;
  80. }
  81. cmd.sequenceNo = this.header[3];
  82. cmd.onPacketReceive(packet, this.out, this.opts, this.info);
  83. if (!cmd.onPacketReceive) this.receiveQueue.shift();
  84. }
  85. receivePacketBasic(packet) {
  86. let cmd = this.currentCmd();
  87. if (!cmd) {
  88. this.unexpectedPacket(packet);
  89. return;
  90. }
  91. cmd.sequenceNo = this.header[3];
  92. cmd.onPacketReceive(packet, this.out, this.opts, this.info);
  93. if (!cmd.onPacketReceive) this.receiveQueue.shift();
  94. }
  95. resetHeader() {
  96. this.remainingLen = null;
  97. this.headerLen = 0;
  98. }
  99. currentCmd() {
  100. let cmd;
  101. while ((cmd = this.receiveQueue.peek())) {
  102. if (cmd.onPacketReceive) return cmd;
  103. this.receiveQueue.shift();
  104. }
  105. return null;
  106. }
  107. onData(chunk) {
  108. let pos = 0;
  109. let length;
  110. const chunkLen = chunk.length;
  111. do {
  112. //read header
  113. if (this.remainingLen) {
  114. length = this.remainingLen;
  115. } else if (this.headerLen === 0 && chunkLen - pos >= 4) {
  116. this.header[0] = chunk[pos];
  117. this.header[1] = chunk[pos + 1];
  118. this.header[2] = chunk[pos + 2];
  119. this.header[3] = chunk[pos + 3];
  120. pos += 4;
  121. this.headerLen = 4;
  122. this.packetLen = this.header[0] + (this.header[1] << 8) + (this.header[2] << 16);
  123. length = this.packetLen;
  124. } else {
  125. length = null;
  126. while (chunkLen - pos > 0) {
  127. this.header[this.headerLen++] = chunk[pos++];
  128. if (this.headerLen === 4) {
  129. this.packetLen = this.header[0] + (this.header[1] << 8) + (this.header[2] << 16);
  130. length = this.packetLen;
  131. break;
  132. }
  133. }
  134. }
  135. if (length) {
  136. if (chunkLen - pos >= length) {
  137. const buf = chunk.slice(pos, pos + length);
  138. pos += length;
  139. if (this.parts) {
  140. this.parts.push(buf);
  141. this.partsTotalLen += length;
  142. if (this.packetLen < 0xffffff) {
  143. let buf = Buffer.concat(this.parts, this.partsTotalLen);
  144. this.parts = null;
  145. const packet = new this.packetConstructor(buf, 0, this.partsTotalLen, this.encoding);
  146. this.receivePacket(packet);
  147. }
  148. } else {
  149. if (this.packetLen < 0xffffff) {
  150. const packet = new this.packetConstructor(buf, 0, length, this.encoding);
  151. this.receivePacket(packet);
  152. } else {
  153. this.parts = [buf];
  154. this.partsTotalLen = length;
  155. }
  156. }
  157. this.resetHeader();
  158. } else {
  159. const buf = chunk.slice(pos, chunkLen);
  160. if (!this.parts) {
  161. this.parts = [buf];
  162. this.partsTotalLen = chunkLen - pos;
  163. } else {
  164. this.parts.push(buf);
  165. this.partsTotalLen += chunkLen - pos;
  166. }
  167. this.remainingLen = length - (chunkLen - pos);
  168. return;
  169. }
  170. }
  171. } while (pos < chunkLen);
  172. }
  173. }
  174. module.exports = PacketInputStream;