compressed_protocol.js 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. 'use strict';
  2. // connection mixins
  3. // implementation of http://dev.mysql.com/doc/internals/en/compression.html
  4. const zlib = require('zlib');
  5. const PacketParser = require('./packet_parser.js');
  6. function handleCompressedPacket(packet) {
  7. // eslint-disable-next-line consistent-this, no-invalid-this
  8. const connection = this;
  9. const deflatedLength = packet.readInt24();
  10. const body = packet.readBuffer();
  11. if (deflatedLength !== 0) {
  12. connection.inflateQueue.push(task => {
  13. zlib.inflate(body, (err, data) => {
  14. if (err) {
  15. connection._handleNetworkError(err);
  16. return;
  17. }
  18. connection._bumpCompressedSequenceId(packet.numPackets);
  19. connection._inflatedPacketsParser.execute(data);
  20. task.done();
  21. });
  22. });
  23. } else {
  24. connection.inflateQueue.push(task => {
  25. connection._bumpCompressedSequenceId(packet.numPackets);
  26. connection._inflatedPacketsParser.execute(body);
  27. task.done();
  28. });
  29. }
  30. }
  31. function writeCompressed(buffer) {
  32. // http://dev.mysql.com/doc/internals/en/example-several-mysql-packets.html
  33. // note: sending a MySQL Packet of the size 2^24−5 to 2^24−1 via compression
  34. // leads to at least one extra compressed packet.
  35. // (this is because "length of the packet before compression" need to fit
  36. // into 3 byte unsigned int. "length of the packet before compression" includes
  37. // 4 byte packet header, hence 2^24−5)
  38. const MAX_COMPRESSED_LENGTH = 16777210;
  39. let start;
  40. if (buffer.length > MAX_COMPRESSED_LENGTH) {
  41. for (start = 0; start < buffer.length; start += MAX_COMPRESSED_LENGTH) {
  42. writeCompressed.call(
  43. // eslint-disable-next-line no-invalid-this
  44. this,
  45. buffer.slice(start, start + MAX_COMPRESSED_LENGTH)
  46. );
  47. }
  48. return;
  49. }
  50. // eslint-disable-next-line no-invalid-this, consistent-this
  51. const connection = this;
  52. let packetLen = buffer.length;
  53. const compressHeader = Buffer.allocUnsafe(7);
  54. // seqqueue is used here because zlib async execution is routed via thread pool
  55. // internally and when we have multiple compressed packets arriving we need
  56. // to assemble uncompressed result sequentially
  57. (function(seqId) {
  58. connection.deflateQueue.push(task => {
  59. zlib.deflate(buffer, (err, compressed) => {
  60. if (err) {
  61. connection._handleFatalError(err);
  62. return;
  63. }
  64. let compressedLength = compressed.length;
  65. if (compressedLength < packetLen) {
  66. compressHeader.writeUInt8(compressedLength & 0xff, 0);
  67. compressHeader.writeUInt16LE(compressedLength >> 8, 1);
  68. compressHeader.writeUInt8(seqId, 3);
  69. compressHeader.writeUInt8(packetLen & 0xff, 4);
  70. compressHeader.writeUInt16LE(packetLen >> 8, 5);
  71. connection.writeUncompressed(compressHeader);
  72. connection.writeUncompressed(compressed);
  73. } else {
  74. // http://dev.mysql.com/doc/internals/en/uncompressed-payload.html
  75. // To send an uncompressed payload:
  76. // - set length of payload before compression to 0
  77. // - the compressed payload contains the uncompressed payload instead.
  78. compressedLength = packetLen;
  79. packetLen = 0;
  80. compressHeader.writeUInt8(compressedLength & 0xff, 0);
  81. compressHeader.writeUInt16LE(compressedLength >> 8, 1);
  82. compressHeader.writeUInt8(seqId, 3);
  83. compressHeader.writeUInt8(packetLen & 0xff, 4);
  84. compressHeader.writeUInt16LE(packetLen >> 8, 5);
  85. connection.writeUncompressed(compressHeader);
  86. connection.writeUncompressed(buffer);
  87. }
  88. task.done();
  89. });
  90. });
  91. })(connection.compressedSequenceId);
  92. connection._bumpCompressedSequenceId(1);
  93. }
  94. function enableCompression(connection) {
  95. connection._lastWrittenPacketId = 0;
  96. connection._lastReceivedPacketId = 0;
  97. connection._handleCompressedPacket = handleCompressedPacket;
  98. connection._inflatedPacketsParser = new PacketParser(p => {
  99. connection.handlePacket(p);
  100. }, 4);
  101. connection._inflatedPacketsParser._lastPacket = 0;
  102. connection.packetParser = new PacketParser(packet => {
  103. connection._handleCompressedPacket(packet);
  104. }, 7);
  105. connection.writeUncompressed = connection.write;
  106. connection.write = writeCompressed;
  107. const seqqueue = require('seq-queue');
  108. connection.inflateQueue = seqqueue.createQueue();
  109. connection.deflateQueue = seqqueue.createQueue();
  110. }
  111. module.exports = {
  112. enableCompression: enableCompression
  113. };