123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171 |
- 'use strict';
- const Utils = require('../misc/utils');
- const ZLib = require('zlib');
- //increase by level to avoid buffer copy.
- const SMALL_BUFFER_SIZE = 2048;
- const MEDIUM_BUFFER_SIZE = 131072; //128k
- const LARGE_BUFFER_SIZE = 1048576; //1M
- const MAX_BUFFER_SIZE = 16777222; //16M + 7
- /**
- /**
- * MySQL compression filter.
- * see https://mariadb.com/kb/en/library/0-packet/#compressed-packet
- */
- class CompressionOutputStream {
- /**
- * Constructor
- *
- * @param socket current socket
- * @param opts current connection options
- * @param info current connection information
- * @constructor
- */
- constructor(socket, opts, info) {
- this.info = info;
- this.opts = opts;
- this.pos = 7;
- this.header = Buffer.allocUnsafe(7);
- this.buf = Buffer.allocUnsafe(SMALL_BUFFER_SIZE);
- this.writer = (buffer) => {
- socket.write(buffer);
- };
- }
- growBuffer(len) {
- let newCapacity;
- if (len + this.pos < MEDIUM_BUFFER_SIZE) {
- newCapacity = MEDIUM_BUFFER_SIZE;
- } else if (len + this.pos < LARGE_BUFFER_SIZE) {
- newCapacity = LARGE_BUFFER_SIZE;
- } else newCapacity = MAX_BUFFER_SIZE;
- let newBuf = Buffer.allocUnsafe(newCapacity);
- this.buf.copy(newBuf, 0, 0, this.pos);
- this.buf = newBuf;
- }
- writeBuf(arr, cmd) {
- let off = 0,
- len = arr.length;
- if (len > this.buf.length - this.pos) {
- if (this.buf.length !== MAX_BUFFER_SIZE) {
- this.growBuffer(len);
- }
- //max buffer size
- if (len > this.buf.length - this.pos) {
- //not enough space in buffer, will stream :
- // fill buffer and flush until all data are snd
- let remainingLen = len;
- while (true) {
- //filling buffer
- let lenToFillBuffer = Math.min(MAX_BUFFER_SIZE - this.pos, remainingLen);
- arr.copy(this.buf, this.pos, off, off + lenToFillBuffer);
- remainingLen -= lenToFillBuffer;
- off += lenToFillBuffer;
- this.pos += lenToFillBuffer;
- if (remainingLen === 0) return;
- this.flush(false, cmd, remainingLen);
- }
- }
- }
- arr.copy(this.buf, this.pos, off, off + len);
- this.pos += len;
- }
- /**
- * Flush the internal buffer.
- */
- flush(cmdEnd, cmd, remainingLen) {
- if (this.pos < 1536) {
- //*******************************************************************************
- // small packet, no compression
- //*******************************************************************************
- this.buf[0] = this.pos - 7;
- this.buf[1] = (this.pos - 7) >>> 8;
- this.buf[2] = (this.pos - 7) >>> 16;
- this.buf[3] = ++cmd.compressSequenceNo;
- this.buf[4] = 0;
- this.buf[5] = 0;
- this.buf[6] = 0;
- if (this.opts.debugCompress) {
- console.log(
- '==> conn:%d %s (compress)\n%s',
- this.info.threadId ? this.info.threadId : -1,
- cmd ? cmd.constructor.name + '(0,' + this.pos + ')' : 'unknown',
- Utils.log(this.opts, this.buf, 0, this.pos)
- );
- }
- this.writer(this.buf.slice(0, this.pos));
- } else {
- //*******************************************************************************
- // compressing packet
- //*******************************************************************************
- //use synchronous inflating, to ensure FIFO packet order
- const compressChunk = ZLib.deflateSync(this.buf.slice(7, this.pos));
- const compressChunkLen = compressChunk.length;
- this.header[0] = compressChunkLen;
- this.header[1] = compressChunkLen >>> 8;
- this.header[2] = compressChunkLen >>> 16;
- this.header[3] = ++cmd.compressSequenceNo;
- this.header[4] = this.pos - 7;
- this.header[5] = (this.pos - 7) >>> 8;
- this.header[6] = (this.pos - 7) >>> 16;
- if (this.opts.debugCompress) {
- console.log(
- '==> conn:%d %s (compress)\n%s',
- this.info.threadId ? this.info.threadId : -1,
- cmd ? cmd.constructor.name + '(0,' + this.pos + '=>' + compressChunkLen + ')' : 'unknown',
- Utils.log(this.opts, compressChunk, 0, compressChunkLen, this.header)
- );
- }
- this.writer(this.header);
- this.writer(compressChunk);
- if (cmdEnd && this.pos === MAX_BUFFER_SIZE) this.writeEmptyPacket(cmd);
- this.header = Buffer.allocUnsafe(7);
- }
- this.buf = remainingLen
- ? CompressionOutputStream.allocateBuffer(remainingLen)
- : Buffer.allocUnsafe(SMALL_BUFFER_SIZE);
- this.pos = 7;
- }
- static allocateBuffer(len) {
- if (len + 4 < SMALL_BUFFER_SIZE) {
- return Buffer.allocUnsafe(SMALL_BUFFER_SIZE);
- } else if (len + 4 < MEDIUM_BUFFER_SIZE) {
- return Buffer.allocUnsafe(MEDIUM_BUFFER_SIZE);
- } else if (len + 4 < LARGE_BUFFER_SIZE) {
- return Buffer.allocUnsafe(LARGE_BUFFER_SIZE);
- }
- return Buffer.allocUnsafe(MAX_BUFFER_SIZE);
- }
- writeEmptyPacket(cmd) {
- const emptyBuf = Buffer.from([0x00, 0x00, 0x00, cmd.compressSequenceNo, 0x00, 0x00, 0x00]);
- if (this.opts.debugCompress) {
- console.log(
- '==> conn:%d %s (compress)\n%s',
- this.info.threadId ? this.info.threadId : -1,
- cmd ? cmd.constructor.name + '(0,' + this.pos + ')' : 'unknown',
- Utils.log(this.opts, emptyBuf, 0, 7)
- );
- }
- this.writer(emptyBuf);
- }
- }
- module.exports = CompressionOutputStream;
|