message_stream.js 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. 'use strict';
  2. const Duplex = require('stream').Duplex;
  3. const BufferList = require('bl');
  4. const MongoParseError = require('../core/error').MongoParseError;
  5. const decompress = require('../core/wireprotocol/compression').decompress;
  6. const Response = require('../core/connection/commands').Response;
  7. const BinMsg = require('../core/connection/msg').BinMsg;
  8. const MongoError = require('../core/error').MongoError;
  9. const OP_COMPRESSED = require('../core/wireprotocol/shared').opcodes.OP_COMPRESSED;
  10. const OP_MSG = require('../core/wireprotocol/shared').opcodes.OP_MSG;
  11. const MESSAGE_HEADER_SIZE = require('../core/wireprotocol/shared').MESSAGE_HEADER_SIZE;
  12. const COMPRESSION_DETAILS_SIZE = require('../core/wireprotocol/shared').COMPRESSION_DETAILS_SIZE;
  13. const opcodes = require('../core/wireprotocol/shared').opcodes;
  14. const compress = require('../core/wireprotocol/compression').compress;
  15. const compressorIDs = require('../core/wireprotocol/compression').compressorIDs;
  16. const uncompressibleCommands = require('../core/wireprotocol/compression').uncompressibleCommands;
  17. const Msg = require('../core/connection/msg').Msg;
  18. const kDefaultMaxBsonMessageSize = 1024 * 1024 * 16 * 4;
  19. const kBuffer = Symbol('buffer');
  20. /**
  21. * A duplex stream that is capable of reading and writing raw wire protocol messages, with
  22. * support for optional compression
  23. */
  24. class MessageStream extends Duplex {
  25. constructor(options) {
  26. options = options || {};
  27. super(options);
  28. this.bson = options.bson;
  29. this.maxBsonMessageSize = options.maxBsonMessageSize || kDefaultMaxBsonMessageSize;
  30. this[kBuffer] = new BufferList();
  31. }
  32. _write(chunk, _, callback) {
  33. const buffer = this[kBuffer];
  34. buffer.append(chunk);
  35. processIncomingData(this, callback);
  36. }
  37. _read(/* size */) {
  38. // NOTE: This implementation is empty because we explicitly push data to be read
  39. // when `writeMessage` is called.
  40. return;
  41. }
  42. writeCommand(command, operationDescription) {
  43. // TODO: agreed compressor should live in `StreamDescription`
  44. const shouldCompress = operationDescription && !!operationDescription.agreedCompressor;
  45. if (!shouldCompress || !canCompress(command)) {
  46. const data = command.toBin();
  47. this.push(Array.isArray(data) ? Buffer.concat(data) : data);
  48. return;
  49. }
  50. // otherwise, compress the message
  51. const concatenatedOriginalCommandBuffer = Buffer.concat(command.toBin());
  52. const messageToBeCompressed = concatenatedOriginalCommandBuffer.slice(MESSAGE_HEADER_SIZE);
  53. // Extract information needed for OP_COMPRESSED from the uncompressed message
  54. const originalCommandOpCode = concatenatedOriginalCommandBuffer.readInt32LE(12);
  55. // Compress the message body
  56. compress({ options: operationDescription }, messageToBeCompressed, (err, compressedMessage) => {
  57. if (err) {
  58. operationDescription.cb(err, null);
  59. return;
  60. }
  61. // Create the msgHeader of OP_COMPRESSED
  62. const msgHeader = Buffer.alloc(MESSAGE_HEADER_SIZE);
  63. msgHeader.writeInt32LE(
  64. MESSAGE_HEADER_SIZE + COMPRESSION_DETAILS_SIZE + compressedMessage.length,
  65. 0
  66. ); // messageLength
  67. msgHeader.writeInt32LE(command.requestId, 4); // requestID
  68. msgHeader.writeInt32LE(0, 8); // responseTo (zero)
  69. msgHeader.writeInt32LE(opcodes.OP_COMPRESSED, 12); // opCode
  70. // Create the compression details of OP_COMPRESSED
  71. const compressionDetails = Buffer.alloc(COMPRESSION_DETAILS_SIZE);
  72. compressionDetails.writeInt32LE(originalCommandOpCode, 0); // originalOpcode
  73. compressionDetails.writeInt32LE(messageToBeCompressed.length, 4); // Size of the uncompressed compressedMessage, excluding the MsgHeader
  74. compressionDetails.writeUInt8(compressorIDs[operationDescription.agreedCompressor], 8); // compressorID
  75. this.push(Buffer.concat([msgHeader, compressionDetails, compressedMessage]));
  76. });
  77. }
  78. }
  79. // Return whether a command contains an uncompressible command term
  80. // Will return true if command contains no uncompressible command terms
  81. function canCompress(command) {
  82. const commandDoc = command instanceof Msg ? command.command : command.query;
  83. const commandName = Object.keys(commandDoc)[0];
  84. return !uncompressibleCommands.has(commandName);
  85. }
  86. function processIncomingData(stream, callback) {
  87. const buffer = stream[kBuffer];
  88. if (buffer.length < 4) {
  89. callback();
  90. return;
  91. }
  92. const sizeOfMessage = buffer.readInt32LE(0);
  93. if (sizeOfMessage < 0) {
  94. callback(new MongoParseError(`Invalid message size: ${sizeOfMessage}`));
  95. return;
  96. }
  97. if (sizeOfMessage > stream.maxBsonMessageSize) {
  98. callback(
  99. new MongoParseError(
  100. `Invalid message size: ${sizeOfMessage}, max allowed: ${stream.maxBsonMessageSize}`
  101. )
  102. );
  103. return;
  104. }
  105. if (sizeOfMessage > buffer.length) {
  106. callback();
  107. return;
  108. }
  109. const message = buffer.slice(0, sizeOfMessage);
  110. buffer.consume(sizeOfMessage);
  111. const messageHeader = {
  112. length: message.readInt32LE(0),
  113. requestId: message.readInt32LE(4),
  114. responseTo: message.readInt32LE(8),
  115. opCode: message.readInt32LE(12)
  116. };
  117. let ResponseType = messageHeader.opCode === OP_MSG ? BinMsg : Response;
  118. const responseOptions = stream.responseOptions;
  119. if (messageHeader.opCode !== OP_COMPRESSED) {
  120. const messageBody = message.slice(MESSAGE_HEADER_SIZE);
  121. stream.emit(
  122. 'message',
  123. new ResponseType(stream.bson, message, messageHeader, messageBody, responseOptions)
  124. );
  125. if (buffer.length >= 4) {
  126. processIncomingData(stream, callback);
  127. } else {
  128. callback();
  129. }
  130. return;
  131. }
  132. messageHeader.fromCompressed = true;
  133. messageHeader.opCode = message.readInt32LE(MESSAGE_HEADER_SIZE);
  134. messageHeader.length = message.readInt32LE(MESSAGE_HEADER_SIZE + 4);
  135. const compressorID = message[MESSAGE_HEADER_SIZE + 8];
  136. const compressedBuffer = message.slice(MESSAGE_HEADER_SIZE + 9);
  137. // recalculate based on wrapped opcode
  138. ResponseType = messageHeader.opCode === OP_MSG ? BinMsg : Response;
  139. decompress(compressorID, compressedBuffer, (err, messageBody) => {
  140. if (err) {
  141. callback(err);
  142. return;
  143. }
  144. if (messageBody.length !== messageHeader.length) {
  145. callback(
  146. new MongoError(
  147. 'Decompressing a compressed message from the server failed. The message is corrupt.'
  148. )
  149. );
  150. return;
  151. }
  152. stream.emit(
  153. 'message',
  154. new ResponseType(stream.bson, message, messageHeader, messageBody, responseOptions)
  155. );
  156. if (buffer.length >= 4) {
  157. processIncomingData(stream, callback);
  158. } else {
  159. callback();
  160. }
  161. });
  162. }
  163. module.exports = MessageStream;