123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127 |
- 'use strict';
- // connection mixins
- // implementation of http://dev.mysql.com/doc/internals/en/compression.html
- const zlib = require('zlib');
- const PacketParser = require('./packet_parser.js');
- function handleCompressedPacket(packet) {
- // eslint-disable-next-line consistent-this, no-invalid-this
- const connection = this;
- const deflatedLength = packet.readInt24();
- const body = packet.readBuffer();
- if (deflatedLength !== 0) {
- connection.inflateQueue.push(task => {
- zlib.inflate(body, (err, data) => {
- if (err) {
- connection._handleNetworkError(err);
- return;
- }
- connection._bumpCompressedSequenceId(packet.numPackets);
- connection._inflatedPacketsParser.execute(data);
- task.done();
- });
- });
- } else {
- connection.inflateQueue.push(task => {
- connection._bumpCompressedSequenceId(packet.numPackets);
- connection._inflatedPacketsParser.execute(body);
- task.done();
- });
- }
- }
- function writeCompressed(buffer) {
- // http://dev.mysql.com/doc/internals/en/example-several-mysql-packets.html
- // note: sending a MySQL Packet of the size 2^24−5 to 2^24−1 via compression
- // leads to at least one extra compressed packet.
- // (this is because "length of the packet before compression" need to fit
- // into 3 byte unsigned int. "length of the packet before compression" includes
- // 4 byte packet header, hence 2^24−5)
- const MAX_COMPRESSED_LENGTH = 16777210;
- let start;
- if (buffer.length > MAX_COMPRESSED_LENGTH) {
- for (start = 0; start < buffer.length; start += MAX_COMPRESSED_LENGTH) {
- writeCompressed.call(
- // eslint-disable-next-line no-invalid-this
- this,
- buffer.slice(start, start + MAX_COMPRESSED_LENGTH)
- );
- }
- return;
- }
- // eslint-disable-next-line no-invalid-this, consistent-this
- const connection = this;
- let packetLen = buffer.length;
- const compressHeader = Buffer.allocUnsafe(7);
- // seqqueue is used here because zlib async execution is routed via thread pool
- // internally and when we have multiple compressed packets arriving we need
- // to assemble uncompressed result sequentially
- (function(seqId) {
- connection.deflateQueue.push(task => {
- zlib.deflate(buffer, (err, compressed) => {
- if (err) {
- connection._handleFatalError(err);
- return;
- }
- let compressedLength = compressed.length;
- if (compressedLength < packetLen) {
- compressHeader.writeUInt8(compressedLength & 0xff, 0);
- compressHeader.writeUInt16LE(compressedLength >> 8, 1);
- compressHeader.writeUInt8(seqId, 3);
- compressHeader.writeUInt8(packetLen & 0xff, 4);
- compressHeader.writeUInt16LE(packetLen >> 8, 5);
- connection.writeUncompressed(compressHeader);
- connection.writeUncompressed(compressed);
- } else {
- // http://dev.mysql.com/doc/internals/en/uncompressed-payload.html
- // To send an uncompressed payload:
- // - set length of payload before compression to 0
- // - the compressed payload contains the uncompressed payload instead.
- compressedLength = packetLen;
- packetLen = 0;
- compressHeader.writeUInt8(compressedLength & 0xff, 0);
- compressHeader.writeUInt16LE(compressedLength >> 8, 1);
- compressHeader.writeUInt8(seqId, 3);
- compressHeader.writeUInt8(packetLen & 0xff, 4);
- compressHeader.writeUInt16LE(packetLen >> 8, 5);
- connection.writeUncompressed(compressHeader);
- connection.writeUncompressed(buffer);
- }
- task.done();
- });
- });
- })(connection.compressedSequenceId);
- connection._bumpCompressedSequenceId(1);
- }
- function enableCompression(connection) {
- connection._lastWrittenPacketId = 0;
- connection._lastReceivedPacketId = 0;
- connection._handleCompressedPacket = handleCompressedPacket;
- connection._inflatedPacketsParser = new PacketParser(p => {
- connection.handlePacket(p);
- }, 4);
- connection._inflatedPacketsParser._lastPacket = 0;
- connection.packetParser = new PacketParser(packet => {
- connection._handleCompressedPacket(packet);
- }, 7);
- connection.writeUncompressed = connection.write;
- connection.write = writeCompressed;
- const seqqueue = require('seq-queue');
- connection.inflateQueue = seqqueue.createQueue();
- connection.deflateQueue = seqqueue.createQueue();
- }
- module.exports = {
- enableCompression: enableCompression
- };
|