compression-input-stream.js 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. 'use strict';
  2. const ZLib = require('zlib');
  3. const Utils = require('../misc/utils');
  4. /**
  5. * MySQL packet parser
  6. * see : https://mariadb.com/kb/en/library/0-packet/
  7. */
  8. class CompressionInputStream {
  9. constructor(reader, receiveQueue, opts, info) {
  10. this.reader = reader;
  11. this.receiveQueue = receiveQueue;
  12. this.info = info;
  13. this.opts = opts;
  14. this.header = Buffer.allocUnsafe(7);
  15. this.headerLen = 0;
  16. this.compressPacketLen = null;
  17. this.packetLen = null;
  18. this.remainingLen = null;
  19. this.parts = null;
  20. this.partsTotalLen = 0;
  21. }
  22. receivePacket(chunk) {
  23. let cmd = this.currentCmd();
  24. if (this.opts.debugCompress) {
  25. console.log(
  26. '<== conn:%d %s (compress)\n%s',
  27. this.info.threadId ? this.info.threadId : -1,
  28. cmd
  29. ? cmd.onPacketReceive
  30. ? cmd.constructor.name + '.' + cmd.onPacketReceive.name
  31. : cmd.constructor.name
  32. : 'no command',
  33. Utils.log(this.opts, chunk, 0, chunk.length, this.header)
  34. );
  35. }
  36. if (cmd) cmd.compressSequenceNo = this.header[3];
  37. const unCompressLen = this.header[4] | (this.header[5] << 8) | (this.header[6] << 16);
  38. if (unCompressLen === 0) {
  39. this.reader.onData(chunk);
  40. } else {
  41. //use synchronous inflating, to ensure FIFO packet order
  42. const unCompressChunk = ZLib.inflateSync(chunk);
  43. this.reader.onData(unCompressChunk);
  44. }
  45. }
  46. currentCmd() {
  47. let cmd;
  48. while ((cmd = this.receiveQueue.peek())) {
  49. if (cmd.onPacketReceive) return cmd;
  50. this.receiveQueue.shift();
  51. }
  52. return null;
  53. }
  54. resetHeader() {
  55. this.remainingLen = null;
  56. this.headerLen = 0;
  57. }
  58. onData(chunk) {
  59. let pos = 0;
  60. let length;
  61. const chunkLen = chunk.length;
  62. do {
  63. if (this.remainingLen) {
  64. length = this.remainingLen;
  65. } else if (this.headerLen === 0 && chunkLen - pos >= 7) {
  66. this.header[0] = chunk[pos];
  67. this.header[1] = chunk[pos + 1];
  68. this.header[2] = chunk[pos + 2];
  69. this.header[3] = chunk[pos + 3];
  70. this.header[4] = chunk[pos + 4];
  71. this.header[5] = chunk[pos + 5];
  72. this.header[6] = chunk[pos + 6];
  73. this.headerLen = 7;
  74. pos += 7;
  75. this.compressPacketLen = this.header[0] + (this.header[1] << 8) + (this.header[2] << 16);
  76. this.packetLen = this.header[4] | (this.header[5] << 8) | (this.header[6] << 16);
  77. if (this.packetLen === 0) this.packetLen = this.compressPacketLen;
  78. length = this.compressPacketLen;
  79. } else {
  80. length = null;
  81. while (chunkLen - pos > 0) {
  82. this.header[this.headerLen++] = chunk[pos++];
  83. if (this.headerLen === 7) {
  84. this.compressPacketLen =
  85. this.header[0] + (this.header[1] << 8) + (this.header[2] << 16);
  86. this.packetLen = this.header[4] | (this.header[5] << 8) | (this.header[6] << 16);
  87. if (this.packetLen === 0) this.packetLen = this.compressPacketLen;
  88. length = this.compressPacketLen;
  89. break;
  90. }
  91. }
  92. }
  93. if (length) {
  94. if (chunkLen - pos >= length) {
  95. const buf = chunk.slice(pos, pos + length);
  96. pos += length;
  97. if (this.parts) {
  98. this.parts.push(buf);
  99. this.partsTotalLen += length;
  100. if (this.compressPacketLen < 0xffffff) {
  101. let buf = Buffer.concat(this.parts, this.partsTotalLen);
  102. this.parts = null;
  103. this.receivePacket(buf);
  104. }
  105. } else {
  106. if (this.compressPacketLen < 0xffffff) {
  107. this.receivePacket(buf);
  108. } else {
  109. this.parts = [buf];
  110. this.partsTotalLen = length;
  111. }
  112. }
  113. this.resetHeader();
  114. } else {
  115. const buf = chunk.slice(pos, chunkLen);
  116. if (!this.parts) {
  117. this.parts = [buf];
  118. this.partsTotalLen = chunkLen - pos;
  119. } else {
  120. this.parts.push(buf);
  121. this.partsTotalLen += chunkLen - pos;
  122. }
  123. this.remainingLen = length - (chunkLen - pos);
  124. return;
  125. }
  126. }
  127. } while (pos < chunkLen);
  128. }
  129. }
  130. module.exports = CompressionInputStream;