compression-output-stream.js 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. 'use strict';
  2. const Utils = require('../misc/utils');
  3. const ZLib = require('zlib');
  4. //increase by level to avoid buffer copy.
  5. const SMALL_BUFFER_SIZE = 2048;
  6. const MEDIUM_BUFFER_SIZE = 131072; //128k
  7. const LARGE_BUFFER_SIZE = 1048576; //1M
  8. const MAX_BUFFER_SIZE = 16777222; //16M + 7
  9. /**
  10. /**
  11. * MySQL compression filter.
  12. * see https://mariadb.com/kb/en/library/0-packet/#compressed-packet
  13. */
  14. class CompressionOutputStream {
  15. /**
  16. * Constructor
  17. *
  18. * @param socket current socket
  19. * @param opts current connection options
  20. * @param info current connection information
  21. * @constructor
  22. */
  23. constructor(socket, opts, info) {
  24. this.info = info;
  25. this.opts = opts;
  26. this.pos = 7;
  27. this.header = Buffer.allocUnsafe(7);
  28. this.buf = Buffer.allocUnsafe(SMALL_BUFFER_SIZE);
  29. this.writer = (buffer) => {
  30. socket.write(buffer);
  31. };
  32. }
  33. growBuffer(len) {
  34. let newCapacity;
  35. if (len + this.pos < MEDIUM_BUFFER_SIZE) {
  36. newCapacity = MEDIUM_BUFFER_SIZE;
  37. } else if (len + this.pos < LARGE_BUFFER_SIZE) {
  38. newCapacity = LARGE_BUFFER_SIZE;
  39. } else newCapacity = MAX_BUFFER_SIZE;
  40. let newBuf = Buffer.allocUnsafe(newCapacity);
  41. this.buf.copy(newBuf, 0, 0, this.pos);
  42. this.buf = newBuf;
  43. }
  44. writeBuf(arr, cmd) {
  45. let off = 0,
  46. len = arr.length;
  47. if (len > this.buf.length - this.pos) {
  48. if (this.buf.length !== MAX_BUFFER_SIZE) {
  49. this.growBuffer(len);
  50. }
  51. //max buffer size
  52. if (len > this.buf.length - this.pos) {
  53. //not enough space in buffer, will stream :
  54. // fill buffer and flush until all data are snd
  55. let remainingLen = len;
  56. while (true) {
  57. //filling buffer
  58. let lenToFillBuffer = Math.min(MAX_BUFFER_SIZE - this.pos, remainingLen);
  59. arr.copy(this.buf, this.pos, off, off + lenToFillBuffer);
  60. remainingLen -= lenToFillBuffer;
  61. off += lenToFillBuffer;
  62. this.pos += lenToFillBuffer;
  63. if (remainingLen === 0) return;
  64. this.flush(false, cmd, remainingLen);
  65. }
  66. }
  67. }
  68. arr.copy(this.buf, this.pos, off, off + len);
  69. this.pos += len;
  70. }
  71. /**
  72. * Flush the internal buffer.
  73. */
  74. flush(cmdEnd, cmd, remainingLen) {
  75. if (this.pos < 1536) {
  76. //*******************************************************************************
  77. // small packet, no compression
  78. //*******************************************************************************
  79. this.buf[0] = this.pos - 7;
  80. this.buf[1] = (this.pos - 7) >>> 8;
  81. this.buf[2] = (this.pos - 7) >>> 16;
  82. this.buf[3] = ++cmd.compressSequenceNo;
  83. this.buf[4] = 0;
  84. this.buf[5] = 0;
  85. this.buf[6] = 0;
  86. if (this.opts.debugCompress) {
  87. console.log(
  88. '==> conn:%d %s (compress)\n%s',
  89. this.info.threadId ? this.info.threadId : -1,
  90. cmd ? cmd.constructor.name + '(0,' + this.pos + ')' : 'unknown',
  91. Utils.log(this.opts, this.buf, 0, this.pos)
  92. );
  93. }
  94. this.writer(this.buf.slice(0, this.pos));
  95. } else {
  96. //*******************************************************************************
  97. // compressing packet
  98. //*******************************************************************************
  99. //use synchronous inflating, to ensure FIFO packet order
  100. const compressChunk = ZLib.deflateSync(this.buf.slice(7, this.pos));
  101. const compressChunkLen = compressChunk.length;
  102. this.header[0] = compressChunkLen;
  103. this.header[1] = compressChunkLen >>> 8;
  104. this.header[2] = compressChunkLen >>> 16;
  105. this.header[3] = ++cmd.compressSequenceNo;
  106. this.header[4] = this.pos - 7;
  107. this.header[5] = (this.pos - 7) >>> 8;
  108. this.header[6] = (this.pos - 7) >>> 16;
  109. if (this.opts.debugCompress) {
  110. console.log(
  111. '==> conn:%d %s (compress)\n%s',
  112. this.info.threadId ? this.info.threadId : -1,
  113. cmd ? cmd.constructor.name + '(0,' + this.pos + '=>' + compressChunkLen + ')' : 'unknown',
  114. Utils.log(this.opts, compressChunk, 0, compressChunkLen, this.header)
  115. );
  116. }
  117. this.writer(this.header);
  118. this.writer(compressChunk);
  119. if (cmdEnd && this.pos === MAX_BUFFER_SIZE) this.writeEmptyPacket(cmd);
  120. this.header = Buffer.allocUnsafe(7);
  121. }
  122. this.buf = remainingLen
  123. ? CompressionOutputStream.allocateBuffer(remainingLen)
  124. : Buffer.allocUnsafe(SMALL_BUFFER_SIZE);
  125. this.pos = 7;
  126. }
  127. static allocateBuffer(len) {
  128. if (len + 4 < SMALL_BUFFER_SIZE) {
  129. return Buffer.allocUnsafe(SMALL_BUFFER_SIZE);
  130. } else if (len + 4 < MEDIUM_BUFFER_SIZE) {
  131. return Buffer.allocUnsafe(MEDIUM_BUFFER_SIZE);
  132. } else if (len + 4 < LARGE_BUFFER_SIZE) {
  133. return Buffer.allocUnsafe(LARGE_BUFFER_SIZE);
  134. }
  135. return Buffer.allocUnsafe(MAX_BUFFER_SIZE);
  136. }
  137. writeEmptyPacket(cmd) {
  138. const emptyBuf = Buffer.from([0x00, 0x00, 0x00, cmd.compressSequenceNo, 0x00, 0x00, 0x00]);
  139. if (this.opts.debugCompress) {
  140. console.log(
  141. '==> conn:%d %s (compress)\n%s',
  142. this.info.threadId ? this.info.threadId : -1,
  143. cmd ? cmd.constructor.name + '(0,' + this.pos + ')' : 'unknown',
  144. Utils.log(this.opts, emptyBuf, 0, 7)
  145. );
  146. }
  147. this.writer(emptyBuf);
  148. }
  149. }
  150. module.exports = CompressionOutputStream;