message_stream.js 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.MessageStream = void 0;
  4. const stream_1 = require("stream");
  5. const error_1 = require("../error");
  6. const utils_1 = require("../utils");
  7. const commands_1 = require("./commands");
  8. const compression_1 = require("./wire_protocol/compression");
  9. const constants_1 = require("./wire_protocol/constants");
  10. const MESSAGE_HEADER_SIZE = 16;
  11. const COMPRESSION_DETAILS_SIZE = 9; // originalOpcode + uncompressedSize, compressorID
  12. const kDefaultMaxBsonMessageSize = 1024 * 1024 * 16 * 4;
  13. /** @internal */
  14. const kBuffer = Symbol('buffer');
  15. /**
  16. * A duplex stream that is capable of reading and writing raw wire protocol messages, with
  17. * support for optional compression
  18. * @internal
  19. */
  20. class MessageStream extends stream_1.Duplex {
  21. constructor(options = {}) {
  22. super(options);
  23. /** @internal */
  24. this.isMonitoringConnection = false;
  25. this.maxBsonMessageSize = options.maxBsonMessageSize || kDefaultMaxBsonMessageSize;
  26. this[kBuffer] = new utils_1.BufferPool();
  27. }
  28. get buffer() {
  29. return this[kBuffer];
  30. }
  31. _write(chunk, _, callback) {
  32. this[kBuffer].append(chunk);
  33. processIncomingData(this, callback);
  34. }
  35. _read( /* size */) {
  36. // NOTE: This implementation is empty because we explicitly push data to be read
  37. // when `writeMessage` is called.
  38. return;
  39. }
  40. writeCommand(command, operationDescription) {
  41. // TODO: agreed compressor should live in `StreamDescription`
  42. const compressorName = operationDescription && operationDescription.agreedCompressor
  43. ? operationDescription.agreedCompressor
  44. : 'none';
  45. if (compressorName === 'none' || !canCompress(command)) {
  46. const data = command.toBin();
  47. this.push(Array.isArray(data) ? Buffer.concat(data) : data);
  48. return;
  49. }
  50. // otherwise, compress the message
  51. const concatenatedOriginalCommandBuffer = Buffer.concat(command.toBin());
  52. const messageToBeCompressed = concatenatedOriginalCommandBuffer.slice(MESSAGE_HEADER_SIZE);
  53. // Extract information needed for OP_COMPRESSED from the uncompressed message
  54. const originalCommandOpCode = concatenatedOriginalCommandBuffer.readInt32LE(12);
  55. // Compress the message body
  56. (0, compression_1.compress)({ options: operationDescription }, messageToBeCompressed, (err, compressedMessage) => {
  57. if (err || !compressedMessage) {
  58. operationDescription.cb(err);
  59. return;
  60. }
  61. // Create the msgHeader of OP_COMPRESSED
  62. const msgHeader = Buffer.alloc(MESSAGE_HEADER_SIZE);
  63. msgHeader.writeInt32LE(MESSAGE_HEADER_SIZE + COMPRESSION_DETAILS_SIZE + compressedMessage.length, 0); // messageLength
  64. msgHeader.writeInt32LE(command.requestId, 4); // requestID
  65. msgHeader.writeInt32LE(0, 8); // responseTo (zero)
  66. msgHeader.writeInt32LE(constants_1.OP_COMPRESSED, 12); // opCode
  67. // Create the compression details of OP_COMPRESSED
  68. const compressionDetails = Buffer.alloc(COMPRESSION_DETAILS_SIZE);
  69. compressionDetails.writeInt32LE(originalCommandOpCode, 0); // originalOpcode
  70. compressionDetails.writeInt32LE(messageToBeCompressed.length, 4); // Size of the uncompressed compressedMessage, excluding the MsgHeader
  71. compressionDetails.writeUInt8(compression_1.Compressor[compressorName], 8); // compressorID
  72. this.push(Buffer.concat([msgHeader, compressionDetails, compressedMessage]));
  73. });
  74. }
  75. }
  76. exports.MessageStream = MessageStream;
  77. // Return whether a command contains an uncompressible command term
  78. // Will return true if command contains no uncompressible command terms
  79. function canCompress(command) {
  80. const commandDoc = command instanceof commands_1.Msg ? command.command : command.query;
  81. const commandName = Object.keys(commandDoc)[0];
  82. return !compression_1.uncompressibleCommands.has(commandName);
  83. }
  84. function processIncomingData(stream, callback) {
  85. const buffer = stream[kBuffer];
  86. if (buffer.length < 4) {
  87. callback();
  88. return;
  89. }
  90. const sizeOfMessage = buffer.peek(4).readInt32LE();
  91. if (sizeOfMessage < 0) {
  92. callback(new error_1.MongoParseError(`Invalid message size: ${sizeOfMessage}`));
  93. return;
  94. }
  95. if (sizeOfMessage > stream.maxBsonMessageSize) {
  96. callback(new error_1.MongoParseError(`Invalid message size: ${sizeOfMessage}, max allowed: ${stream.maxBsonMessageSize}`));
  97. return;
  98. }
  99. if (sizeOfMessage > buffer.length) {
  100. callback();
  101. return;
  102. }
  103. const message = buffer.read(sizeOfMessage);
  104. const messageHeader = {
  105. length: message.readInt32LE(0),
  106. requestId: message.readInt32LE(4),
  107. responseTo: message.readInt32LE(8),
  108. opCode: message.readInt32LE(12)
  109. };
  110. const monitorHasAnotherHello = () => {
  111. if (stream.isMonitoringConnection) {
  112. // Can we read the next message size?
  113. if (buffer.length >= 4) {
  114. const sizeOfMessage = buffer.peek(4).readInt32LE();
  115. if (sizeOfMessage <= buffer.length) {
  116. return true;
  117. }
  118. }
  119. }
  120. return false;
  121. };
  122. let ResponseType = messageHeader.opCode === constants_1.OP_MSG ? commands_1.BinMsg : commands_1.Response;
  123. if (messageHeader.opCode !== constants_1.OP_COMPRESSED) {
  124. const messageBody = message.slice(MESSAGE_HEADER_SIZE);
  125. // If we are a monitoring connection message stream and
  126. // there is more in the buffer that can be read, skip processing since we
  127. // want the last hello command response that is in the buffer.
  128. if (monitorHasAnotherHello()) {
  129. processIncomingData(stream, callback);
  130. }
  131. else {
  132. stream.emit('message', new ResponseType(message, messageHeader, messageBody));
  133. if (buffer.length >= 4) {
  134. processIncomingData(stream, callback);
  135. }
  136. else {
  137. callback();
  138. }
  139. }
  140. return;
  141. }
  142. messageHeader.fromCompressed = true;
  143. messageHeader.opCode = message.readInt32LE(MESSAGE_HEADER_SIZE);
  144. messageHeader.length = message.readInt32LE(MESSAGE_HEADER_SIZE + 4);
  145. const compressorID = message[MESSAGE_HEADER_SIZE + 8];
  146. const compressedBuffer = message.slice(MESSAGE_HEADER_SIZE + 9);
  147. // recalculate based on wrapped opcode
  148. ResponseType = messageHeader.opCode === constants_1.OP_MSG ? commands_1.BinMsg : commands_1.Response;
  149. (0, compression_1.decompress)(compressorID, compressedBuffer, (err, messageBody) => {
  150. if (err || !messageBody) {
  151. callback(err);
  152. return;
  153. }
  154. if (messageBody.length !== messageHeader.length) {
  155. callback(new error_1.MongoDecompressionError('Message body and message header must be the same length'));
  156. return;
  157. }
  158. // If we are a monitoring connection message stream and
  159. // there is more in the buffer that can be read, skip processing since we
  160. // want the last hello command response that is in the buffer.
  161. if (monitorHasAnotherHello()) {
  162. processIncomingData(stream, callback);
  163. }
  164. else {
  165. stream.emit('message', new ResponseType(message, messageHeader, messageBody));
  166. if (buffer.length >= 4) {
  167. processIncomingData(stream, callback);
  168. }
  169. else {
  170. callback();
  171. }
  172. }
  173. });
  174. }
  175. //# sourceMappingURL=message_stream.js.map