connection.js 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377
  1. 'use strict';
  2. const EventEmitter = require('events');
  3. const MessageStream = require('./message_stream');
  4. const MongoError = require('../core/error').MongoError;
  5. const MongoNetworkError = require('../core/error').MongoNetworkError;
  6. const MongoWriteConcernError = require('../core/error').MongoWriteConcernError;
  7. const CommandResult = require('../core/connection/command_result');
  8. const StreamDescription = require('./stream_description').StreamDescription;
  9. const wp = require('../core/wireprotocol');
  10. const apm = require('../core/connection/apm');
  11. const updateSessionFromResponse = require('../core/sessions').updateSessionFromResponse;
  12. const uuidV4 = require('../core/utils').uuidV4;
  13. const kStream = Symbol('stream');
  14. const kQueue = Symbol('queue');
  15. const kMessageStream = Symbol('messageStream');
  16. const kGeneration = Symbol('generation');
  17. const kLastUseTime = Symbol('lastUseTime');
  18. const kClusterTime = Symbol('clusterTime');
  19. const kDescription = Symbol('description');
  20. const kIsMaster = Symbol('ismaster');
  21. const kAutoEncrypter = Symbol('autoEncrypter');
  22. class Connection extends EventEmitter {
  23. constructor(stream, options) {
  24. super(options);
  25. this.id = options.id;
  26. this.address = streamIdentifier(stream);
  27. this.bson = options.bson;
  28. this.socketTimeout = typeof options.socketTimeout === 'number' ? options.socketTimeout : 360000;
  29. this.monitorCommands =
  30. typeof options.monitorCommands === 'boolean' ? options.monitorCommands : false;
  31. this.closed = false;
  32. this.destroyed = false;
  33. this[kDescription] = new StreamDescription(this.address, options);
  34. this[kGeneration] = options.generation;
  35. this[kLastUseTime] = Date.now();
  36. // retain a reference to an `AutoEncrypter` if present
  37. if (options.autoEncrypter) {
  38. this[kAutoEncrypter] = options.autoEncrypter;
  39. }
  40. // setup parser stream and message handling
  41. this[kQueue] = new Map();
  42. this[kMessageStream] = new MessageStream(options);
  43. this[kMessageStream].on('message', messageHandler(this));
  44. this[kStream] = stream;
  45. stream.on('error', () => {
  46. /* ignore errors, listen to `close` instead */
  47. });
  48. stream.on('close', () => {
  49. if (this.closed) {
  50. return;
  51. }
  52. this.closed = true;
  53. this[kQueue].forEach(op =>
  54. op.cb(new MongoNetworkError(`connection ${this.id} to ${this.address} closed`))
  55. );
  56. this[kQueue].clear();
  57. this.emit('close');
  58. });
  59. stream.on('timeout', () => {
  60. if (this.closed) {
  61. return;
  62. }
  63. stream.destroy();
  64. this.closed = true;
  65. this[kQueue].forEach(op =>
  66. op.cb(new MongoNetworkError(`connection ${this.id} to ${this.address} timed out`))
  67. );
  68. this[kQueue].clear();
  69. this.emit('close');
  70. });
  71. // hook the message stream up to the passed in stream
  72. stream.pipe(this[kMessageStream]);
  73. this[kMessageStream].pipe(stream);
  74. }
  75. get description() {
  76. return this[kDescription];
  77. }
  78. get ismaster() {
  79. return this[kIsMaster];
  80. }
  81. // the `connect` method stores the result of the handshake ismaster on the connection
  82. set ismaster(response) {
  83. this[kDescription].receiveResponse(response);
  84. // TODO: remove this, and only use the `StreamDescription` in the future
  85. this[kIsMaster] = response;
  86. }
  87. get generation() {
  88. return this[kGeneration] || 0;
  89. }
  90. get idleTime() {
  91. return Date.now() - this[kLastUseTime];
  92. }
  93. get clusterTime() {
  94. return this[kClusterTime];
  95. }
  96. get stream() {
  97. return this[kStream];
  98. }
  99. markAvailable() {
  100. this[kLastUseTime] = Date.now();
  101. }
  102. destroy(options, callback) {
  103. if (typeof options === 'function') {
  104. callback = options;
  105. options = {};
  106. }
  107. options = Object.assign({ force: false }, options);
  108. if (this[kStream] == null || this.destroyed) {
  109. this.destroyed = true;
  110. if (typeof callback === 'function') {
  111. callback();
  112. }
  113. return;
  114. }
  115. if (options.force) {
  116. this[kStream].destroy();
  117. this.destroyed = true;
  118. if (typeof callback === 'function') {
  119. callback();
  120. }
  121. return;
  122. }
  123. this[kStream].end(err => {
  124. this.destroyed = true;
  125. if (typeof callback === 'function') {
  126. callback(err);
  127. }
  128. });
  129. }
  130. // Wire protocol methods
  131. command(ns, cmd, options, callback) {
  132. wp.command(makeServerTrampoline(this), ns, cmd, options, callback);
  133. }
  134. query(ns, cmd, cursorState, options, callback) {
  135. wp.query(makeServerTrampoline(this), ns, cmd, cursorState, options, callback);
  136. }
  137. getMore(ns, cursorState, batchSize, options, callback) {
  138. wp.getMore(makeServerTrampoline(this), ns, cursorState, batchSize, options, callback);
  139. }
  140. killCursors(ns, cursorState, callback) {
  141. wp.killCursors(makeServerTrampoline(this), ns, cursorState, callback);
  142. }
  143. insert(ns, ops, options, callback) {
  144. wp.insert(makeServerTrampoline(this), ns, ops, options, callback);
  145. }
  146. update(ns, ops, options, callback) {
  147. wp.update(makeServerTrampoline(this), ns, ops, options, callback);
  148. }
  149. remove(ns, ops, options, callback) {
  150. wp.remove(makeServerTrampoline(this), ns, ops, options, callback);
  151. }
  152. }
  153. /// This lets us emulate a legacy `Server` instance so we can work with the existing wire
  154. /// protocol methods. Eventually, the operation executor will return a `Connection` to execute
  155. /// against.
  156. function makeServerTrampoline(connection) {
  157. const server = {
  158. description: connection.description,
  159. clusterTime: connection[kClusterTime],
  160. s: {
  161. bson: connection.bson,
  162. pool: { write: write.bind(connection), isConnected: () => true }
  163. }
  164. };
  165. if (connection[kAutoEncrypter]) {
  166. server.autoEncrypter = connection[kAutoEncrypter];
  167. }
  168. return server;
  169. }
  170. function messageHandler(conn) {
  171. return function messageHandler(message) {
  172. // always emit the message, in case we are streaming
  173. conn.emit('message', message);
  174. if (!conn[kQueue].has(message.responseTo)) {
  175. return;
  176. }
  177. const operationDescription = conn[kQueue].get(message.responseTo);
  178. // SERVER-45775: For exhaust responses we should be able to use the same requestId to
  179. // track response, however the server currently synthetically produces remote requests
  180. // making the `responseTo` change on each response
  181. conn[kQueue].delete(message.responseTo);
  182. if (message.moreToCome) {
  183. // requeue the callback for next synthetic request
  184. conn[kQueue].set(message.requestId, operationDescription);
  185. }
  186. const callback = operationDescription.cb;
  187. if (operationDescription.socketTimeoutOverride) {
  188. conn[kStream].setTimeout(conn.socketTimeout);
  189. }
  190. try {
  191. // Pass in the entire description because it has BSON parsing options
  192. message.parse(operationDescription);
  193. } catch (err) {
  194. callback(new MongoError(err));
  195. return;
  196. }
  197. if (message.documents[0]) {
  198. const document = message.documents[0];
  199. const session = operationDescription.session;
  200. if (session) {
  201. updateSessionFromResponse(session, document);
  202. }
  203. if (document.$clusterTime) {
  204. conn[kClusterTime] = document.$clusterTime;
  205. conn.emit('clusterTimeReceived', document.$clusterTime);
  206. }
  207. if (operationDescription.command) {
  208. if (document.writeConcernError) {
  209. callback(new MongoWriteConcernError(document.writeConcernError, document));
  210. return;
  211. }
  212. if (document.ok === 0 || document.$err || document.errmsg || document.code) {
  213. callback(new MongoError(document));
  214. return;
  215. }
  216. }
  217. }
  218. // NODE-2382: reenable in our glorious non-leaky abstraction future
  219. // callback(null, operationDescription.fullResult ? message : message.documents[0]);
  220. callback(
  221. undefined,
  222. new CommandResult(
  223. operationDescription.fullResult ? message : message.documents[0],
  224. conn,
  225. message
  226. )
  227. );
  228. };
  229. }
  230. function streamIdentifier(stream) {
  231. if (typeof stream.address === 'function') {
  232. return `${stream.remoteAddress}:${stream.remotePort}`;
  233. }
  234. return uuidV4().toString('hex');
  235. }
  236. // Not meant to be called directly, the wire protocol methods call this assuming it is a `Pool` instance
  237. function write(command, options, callback) {
  238. if (typeof options === 'function') {
  239. callback = options;
  240. }
  241. options = options || {};
  242. const operationDescription = {
  243. requestId: command.requestId,
  244. cb: callback,
  245. session: options.session,
  246. fullResult: typeof options.fullResult === 'boolean' ? options.fullResult : false,
  247. noResponse: typeof options.noResponse === 'boolean' ? options.noResponse : false,
  248. documentsReturnedIn: options.documentsReturnedIn,
  249. command: !!options.command,
  250. // for BSON parsing
  251. promoteLongs: typeof options.promoteLongs === 'boolean' ? options.promoteLongs : true,
  252. promoteValues: typeof options.promoteValues === 'boolean' ? options.promoteValues : true,
  253. promoteBuffers: typeof options.promoteBuffers === 'boolean' ? options.promoteBuffers : false,
  254. raw: typeof options.raw === 'boolean' ? options.raw : false
  255. };
  256. if (this[kDescription] && this[kDescription].compressor) {
  257. operationDescription.agreedCompressor = this[kDescription].compressor;
  258. if (this[kDescription].zlibCompressionLevel) {
  259. operationDescription.zlibCompressionLevel = this[kDescription].zlibCompressionLevel;
  260. }
  261. }
  262. if (typeof options.socketTimeout === 'number') {
  263. operationDescription.socketTimeoutOverride = true;
  264. this[kStream].setTimeout(options.socketTimeout);
  265. }
  266. // if command monitoring is enabled we need to modify the callback here
  267. if (this.monitorCommands) {
  268. this.emit('commandStarted', new apm.CommandStartedEvent(this, command));
  269. operationDescription.started = process.hrtime();
  270. operationDescription.cb = (err, reply) => {
  271. if (err) {
  272. this.emit(
  273. 'commandFailed',
  274. new apm.CommandFailedEvent(this, command, err, operationDescription.started)
  275. );
  276. } else {
  277. if (reply && reply.result && (reply.result.ok === 0 || reply.result.$err)) {
  278. this.emit(
  279. 'commandFailed',
  280. new apm.CommandFailedEvent(this, command, reply.result, operationDescription.started)
  281. );
  282. } else {
  283. this.emit(
  284. 'commandSucceeded',
  285. new apm.CommandSucceededEvent(this, command, reply, operationDescription.started)
  286. );
  287. }
  288. }
  289. if (typeof callback === 'function') {
  290. callback(err, reply);
  291. }
  292. };
  293. }
  294. if (!operationDescription.noResponse) {
  295. this[kQueue].set(operationDescription.requestId, operationDescription);
  296. }
  297. try {
  298. this[kMessageStream].writeCommand(command, operationDescription);
  299. } catch (e) {
  300. if (!operationDescription.noResponse) {
  301. this[kQueue].delete(operationDescription.requestId);
  302. operationDescription.cb(e);
  303. return;
  304. }
  305. }
  306. if (operationDescription.noResponse) {
  307. operationDescription.cb();
  308. }
  309. }
  310. module.exports = {
  311. Connection
  312. };