connection.js 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388
  1. 'use strict';
  2. const EventEmitter = require('events');
  3. const MessageStream = require('./message_stream');
  4. const MongoError = require('../core/error').MongoError;
  5. const MongoNetworkError = require('../core/error').MongoNetworkError;
  6. const MongoNetworkTimeoutError = require('../core/error').MongoNetworkTimeoutError;
  7. const MongoWriteConcernError = require('../core/error').MongoWriteConcernError;
  8. const CommandResult = require('../core/connection/command_result');
  9. const StreamDescription = require('./stream_description').StreamDescription;
  10. const wp = require('../core/wireprotocol');
  11. const apm = require('../core/connection/apm');
  12. const updateSessionFromResponse = require('../core/sessions').updateSessionFromResponse;
  13. const uuidV4 = require('../core/utils').uuidV4;
  14. const now = require('../utils').now;
  15. const calculateDurationInMs = require('../utils').calculateDurationInMs;
  16. const kStream = Symbol('stream');
  17. const kQueue = Symbol('queue');
  18. const kMessageStream = Symbol('messageStream');
  19. const kGeneration = Symbol('generation');
  20. const kLastUseTime = Symbol('lastUseTime');
  21. const kClusterTime = Symbol('clusterTime');
  22. const kDescription = Symbol('description');
  23. const kIsMaster = Symbol('ismaster');
  24. const kAutoEncrypter = Symbol('autoEncrypter');
  25. class Connection extends EventEmitter {
  26. constructor(stream, options) {
  27. super(options);
  28. this.id = options.id;
  29. this.address = streamIdentifier(stream);
  30. this.bson = options.bson;
  31. this.socketTimeout = typeof options.socketTimeout === 'number' ? options.socketTimeout : 0;
  32. this.host = options.host || 'localhost';
  33. this.port = options.port || 27017;
  34. this.monitorCommands =
  35. typeof options.monitorCommands === 'boolean' ? options.monitorCommands : false;
  36. this.closed = false;
  37. this.destroyed = false;
  38. this[kDescription] = new StreamDescription(this.address, options);
  39. this[kGeneration] = options.generation;
  40. this[kLastUseTime] = now();
  41. // retain a reference to an `AutoEncrypter` if present
  42. if (options.autoEncrypter) {
  43. this[kAutoEncrypter] = options.autoEncrypter;
  44. }
  45. // setup parser stream and message handling
  46. this[kQueue] = new Map();
  47. this[kMessageStream] = new MessageStream(options);
  48. this[kMessageStream].on('message', messageHandler(this));
  49. this[kStream] = stream;
  50. stream.on('error', () => {
  51. /* ignore errors, listen to `close` instead */
  52. });
  53. this[kMessageStream].on('error', error => this.handleIssue({ destroy: error }));
  54. stream.on('close', () => this.handleIssue({ isClose: true }));
  55. stream.on('timeout', () => this.handleIssue({ isTimeout: true, destroy: true }));
  56. // hook the message stream up to the passed in stream
  57. stream.pipe(this[kMessageStream]);
  58. this[kMessageStream].pipe(stream);
  59. }
  60. get description() {
  61. return this[kDescription];
  62. }
  63. get ismaster() {
  64. return this[kIsMaster];
  65. }
  66. // the `connect` method stores the result of the handshake ismaster on the connection
  67. set ismaster(response) {
  68. this[kDescription].receiveResponse(response);
  69. // TODO: remove this, and only use the `StreamDescription` in the future
  70. this[kIsMaster] = response;
  71. }
  72. get generation() {
  73. return this[kGeneration] || 0;
  74. }
  75. get idleTime() {
  76. return calculateDurationInMs(this[kLastUseTime]);
  77. }
  78. get clusterTime() {
  79. return this[kClusterTime];
  80. }
  81. get stream() {
  82. return this[kStream];
  83. }
  84. markAvailable() {
  85. this[kLastUseTime] = now();
  86. }
  87. /**
  88. * @param {{ isTimeout?: boolean; isClose?: boolean; destroy?: boolean | Error }} issue
  89. */
  90. handleIssue(issue) {
  91. if (this.closed) {
  92. return;
  93. }
  94. if (issue.destroy) {
  95. this[kStream].destroy(typeof issue.destroy === 'boolean' ? undefined : issue.destroy);
  96. }
  97. this.closed = true;
  98. for (const idAndOp of this[kQueue]) {
  99. const op = idAndOp[1];
  100. if (issue.isTimeout) {
  101. op.cb(
  102. new MongoNetworkTimeoutError(`connection ${this.id} to ${this.address} timed out`, {
  103. beforeHandshake: this.ismaster == null
  104. })
  105. );
  106. } else if (issue.isClose) {
  107. op.cb(new MongoNetworkError(`connection ${this.id} to ${this.address} closed`));
  108. } else {
  109. op.cb(typeof issue.destroy === 'boolean' ? undefined : issue.destroy);
  110. }
  111. }
  112. this[kQueue].clear();
  113. this.emit('close');
  114. }
  115. destroy(options, callback) {
  116. if (typeof options === 'function') {
  117. callback = options;
  118. options = {};
  119. }
  120. options = Object.assign({ force: false }, options);
  121. if (this[kStream] == null || this.destroyed) {
  122. this.destroyed = true;
  123. if (typeof callback === 'function') {
  124. callback();
  125. }
  126. return;
  127. }
  128. if (options.force) {
  129. this[kStream].destroy();
  130. this.destroyed = true;
  131. if (typeof callback === 'function') {
  132. callback();
  133. }
  134. return;
  135. }
  136. this[kStream].end(err => {
  137. this.destroyed = true;
  138. if (typeof callback === 'function') {
  139. callback(err);
  140. }
  141. });
  142. }
  143. // Wire protocol methods
  144. command(ns, cmd, options, callback) {
  145. wp.command(makeServerTrampoline(this), ns, cmd, options, callback);
  146. }
  147. query(ns, cmd, cursorState, options, callback) {
  148. wp.query(makeServerTrampoline(this), ns, cmd, cursorState, options, callback);
  149. }
  150. getMore(ns, cursorState, batchSize, options, callback) {
  151. wp.getMore(makeServerTrampoline(this), ns, cursorState, batchSize, options, callback);
  152. }
  153. killCursors(ns, cursorState, callback) {
  154. wp.killCursors(makeServerTrampoline(this), ns, cursorState, callback);
  155. }
  156. insert(ns, ops, options, callback) {
  157. wp.insert(makeServerTrampoline(this), ns, ops, options, callback);
  158. }
  159. update(ns, ops, options, callback) {
  160. wp.update(makeServerTrampoline(this), ns, ops, options, callback);
  161. }
  162. remove(ns, ops, options, callback) {
  163. wp.remove(makeServerTrampoline(this), ns, ops, options, callback);
  164. }
  165. }
  166. /// This lets us emulate a legacy `Server` instance so we can work with the existing wire
  167. /// protocol methods. Eventually, the operation executor will return a `Connection` to execute
  168. /// against.
  169. function makeServerTrampoline(connection) {
  170. const server = {
  171. description: connection.description,
  172. clusterTime: connection[kClusterTime],
  173. s: {
  174. bson: connection.bson,
  175. pool: { write: write.bind(connection), isConnected: () => true }
  176. }
  177. };
  178. if (connection[kAutoEncrypter]) {
  179. server.autoEncrypter = connection[kAutoEncrypter];
  180. }
  181. return server;
  182. }
  183. function messageHandler(conn) {
  184. return function messageHandler(message) {
  185. // always emit the message, in case we are streaming
  186. conn.emit('message', message);
  187. if (!conn[kQueue].has(message.responseTo)) {
  188. return;
  189. }
  190. const operationDescription = conn[kQueue].get(message.responseTo);
  191. const callback = operationDescription.cb;
  192. // SERVER-45775: For exhaust responses we should be able to use the same requestId to
  193. // track response, however the server currently synthetically produces remote requests
  194. // making the `responseTo` change on each response
  195. conn[kQueue].delete(message.responseTo);
  196. if (message.moreToCome) {
  197. // requeue the callback for next synthetic request
  198. conn[kQueue].set(message.requestId, operationDescription);
  199. } else if (operationDescription.socketTimeoutOverride) {
  200. conn[kStream].setTimeout(conn.socketTimeout);
  201. }
  202. try {
  203. // Pass in the entire description because it has BSON parsing options
  204. message.parse(operationDescription);
  205. } catch (err) {
  206. callback(new MongoError(err));
  207. return;
  208. }
  209. if (message.documents[0]) {
  210. const document = message.documents[0];
  211. const session = operationDescription.session;
  212. if (session) {
  213. updateSessionFromResponse(session, document);
  214. }
  215. if (document.$clusterTime) {
  216. conn[kClusterTime] = document.$clusterTime;
  217. conn.emit('clusterTimeReceived', document.$clusterTime);
  218. }
  219. if (operationDescription.command) {
  220. if (document.writeConcernError) {
  221. callback(new MongoWriteConcernError(document.writeConcernError, document));
  222. return;
  223. }
  224. if (document.ok === 0 || document.$err || document.errmsg || document.code) {
  225. callback(new MongoError(document));
  226. return;
  227. }
  228. }
  229. }
  230. // NODE-2382: reenable in our glorious non-leaky abstraction future
  231. // callback(null, operationDescription.fullResult ? message : message.documents[0]);
  232. callback(
  233. undefined,
  234. new CommandResult(
  235. operationDescription.fullResult ? message : message.documents[0],
  236. conn,
  237. message
  238. )
  239. );
  240. };
  241. }
  242. function streamIdentifier(stream) {
  243. if (typeof stream.address === 'function') {
  244. return `${stream.remoteAddress}:${stream.remotePort}`;
  245. }
  246. return uuidV4().toString('hex');
  247. }
  248. // Not meant to be called directly, the wire protocol methods call this assuming it is a `Pool` instance
  249. function write(command, options, callback) {
  250. if (typeof options === 'function') {
  251. callback = options;
  252. }
  253. options = options || {};
  254. const operationDescription = {
  255. requestId: command.requestId,
  256. cb: callback,
  257. session: options.session,
  258. fullResult: typeof options.fullResult === 'boolean' ? options.fullResult : false,
  259. noResponse: typeof options.noResponse === 'boolean' ? options.noResponse : false,
  260. documentsReturnedIn: options.documentsReturnedIn,
  261. command: !!options.command,
  262. // for BSON parsing
  263. promoteLongs: typeof options.promoteLongs === 'boolean' ? options.promoteLongs : true,
  264. promoteValues: typeof options.promoteValues === 'boolean' ? options.promoteValues : true,
  265. promoteBuffers: typeof options.promoteBuffers === 'boolean' ? options.promoteBuffers : false,
  266. raw: typeof options.raw === 'boolean' ? options.raw : false
  267. };
  268. if (this[kDescription] && this[kDescription].compressor) {
  269. operationDescription.agreedCompressor = this[kDescription].compressor;
  270. if (this[kDescription].zlibCompressionLevel) {
  271. operationDescription.zlibCompressionLevel = this[kDescription].zlibCompressionLevel;
  272. }
  273. }
  274. if (typeof options.socketTimeout === 'number') {
  275. operationDescription.socketTimeoutOverride = true;
  276. this[kStream].setTimeout(options.socketTimeout);
  277. }
  278. // if command monitoring is enabled we need to modify the callback here
  279. if (this.monitorCommands) {
  280. this.emit('commandStarted', new apm.CommandStartedEvent(this, command));
  281. operationDescription.started = now();
  282. operationDescription.cb = (err, reply) => {
  283. if (err) {
  284. this.emit(
  285. 'commandFailed',
  286. new apm.CommandFailedEvent(this, command, err, operationDescription.started)
  287. );
  288. } else {
  289. if (reply && reply.result && (reply.result.ok === 0 || reply.result.$err)) {
  290. this.emit(
  291. 'commandFailed',
  292. new apm.CommandFailedEvent(this, command, reply.result, operationDescription.started)
  293. );
  294. } else {
  295. this.emit(
  296. 'commandSucceeded',
  297. new apm.CommandSucceededEvent(this, command, reply, operationDescription.started)
  298. );
  299. }
  300. }
  301. if (typeof callback === 'function') {
  302. callback(err, reply);
  303. }
  304. };
  305. }
  306. if (!operationDescription.noResponse) {
  307. this[kQueue].set(operationDescription.requestId, operationDescription);
  308. }
  309. try {
  310. this[kMessageStream].writeCommand(command, operationDescription);
  311. } catch (e) {
  312. if (!operationDescription.noResponse) {
  313. this[kQueue].delete(operationDescription.requestId);
  314. operationDescription.cb(e);
  315. return;
  316. }
  317. }
  318. if (operationDescription.noResponse) {
  319. operationDescription.cb();
  320. }
  321. }
  322. module.exports = {
  323. Connection
  324. };