connection.js 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416
  1. 'use strict';
  2. const EventEmitter = require('events');
  3. const MessageStream = require('./message_stream');
  4. const MongoError = require('../core/error').MongoError;
  5. const MongoNetworkError = require('../core/error').MongoNetworkError;
  6. const MongoNetworkTimeoutError = require('../core/error').MongoNetworkTimeoutError;
  7. const MongoWriteConcernError = require('../core/error').MongoWriteConcernError;
  8. const CommandResult = require('../core/connection/command_result');
  9. const StreamDescription = require('./stream_description').StreamDescription;
  10. const wp = require('../core/wireprotocol');
  11. const apm = require('../core/connection/apm');
  12. const updateSessionFromResponse = require('../core/sessions').updateSessionFromResponse;
  13. const uuidV4 = require('../core/utils').uuidV4;
  14. const now = require('../utils').now;
  15. const calculateDurationInMs = require('../utils').calculateDurationInMs;
  16. const kStream = Symbol('stream');
  17. const kQueue = Symbol('queue');
  18. const kMessageStream = Symbol('messageStream');
  19. const kGeneration = Symbol('generation');
  20. const kLastUseTime = Symbol('lastUseTime');
  21. const kClusterTime = Symbol('clusterTime');
  22. const kDescription = Symbol('description');
  23. const kIsMaster = Symbol('ismaster');
  24. const kAutoEncrypter = Symbol('autoEncrypter');
  25. class Connection extends EventEmitter {
  26. constructor(stream, options) {
  27. super(options);
  28. this.id = options.id;
  29. this.address = streamIdentifier(stream);
  30. this.bson = options.bson;
  31. this.socketTimeout = typeof options.socketTimeout === 'number' ? options.socketTimeout : 0;
  32. this.host = options.host || 'localhost';
  33. this.port = options.port || 27017;
  34. this.monitorCommands =
  35. typeof options.monitorCommands === 'boolean' ? options.monitorCommands : false;
  36. this.serverApi = options.serverApi;
  37. this.closed = false;
  38. this.destroyed = false;
  39. this[kDescription] = new StreamDescription(this.address, options);
  40. this[kGeneration] = options.generation;
  41. this[kLastUseTime] = now();
  42. // retain a reference to an `AutoEncrypter` if present
  43. if (options.autoEncrypter) {
  44. this[kAutoEncrypter] = options.autoEncrypter;
  45. }
  46. // setup parser stream and message handling
  47. this[kQueue] = new Map();
  48. this[kMessageStream] = new MessageStream(options);
  49. this[kMessageStream].on('message', messageHandler(this));
  50. this[kStream] = stream;
  51. stream.on('error', () => {
  52. /* ignore errors, listen to `close` instead */
  53. });
  54. this[kMessageStream].on('error', error => this.handleIssue({ destroy: error }));
  55. stream.on('close', () => this.handleIssue({ isClose: true }));
  56. stream.on('timeout', () => this.handleIssue({ isTimeout: true, destroy: true }));
  57. // hook the message stream up to the passed in stream
  58. stream.pipe(this[kMessageStream]);
  59. this[kMessageStream].pipe(stream);
  60. }
  61. get description() {
  62. return this[kDescription];
  63. }
  64. get ismaster() {
  65. return this[kIsMaster];
  66. }
  67. // the `connect` method stores the result of the handshake ismaster on the connection
  68. set ismaster(response) {
  69. this[kDescription].receiveResponse(response);
  70. // TODO: remove this, and only use the `StreamDescription` in the future
  71. this[kIsMaster] = response;
  72. }
  73. get generation() {
  74. return this[kGeneration] || 0;
  75. }
  76. get idleTime() {
  77. return calculateDurationInMs(this[kLastUseTime]);
  78. }
  79. get clusterTime() {
  80. return this[kClusterTime];
  81. }
  82. get stream() {
  83. return this[kStream];
  84. }
  85. markAvailable() {
  86. this[kLastUseTime] = now();
  87. }
  88. /**
  89. * @param {{ isTimeout?: boolean; isClose?: boolean; destroy?: boolean | Error }} issue
  90. */
  91. handleIssue(issue) {
  92. if (this.closed) {
  93. return;
  94. }
  95. if (issue.destroy) {
  96. this[kStream].destroy(typeof issue.destroy === 'boolean' ? undefined : issue.destroy);
  97. }
  98. this.closed = true;
  99. for (const idAndOp of this[kQueue]) {
  100. const op = idAndOp[1];
  101. if (issue.isTimeout) {
  102. op.cb(
  103. new MongoNetworkTimeoutError(`connection ${this.id} to ${this.address} timed out`, {
  104. beforeHandshake: this.ismaster == null
  105. })
  106. );
  107. } else if (issue.isClose) {
  108. op.cb(new MongoNetworkError(`connection ${this.id} to ${this.address} closed`));
  109. } else {
  110. op.cb(typeof issue.destroy === 'boolean' ? undefined : issue.destroy);
  111. }
  112. }
  113. this[kQueue].clear();
  114. this.emit('close');
  115. }
  116. destroy(options, callback) {
  117. if (typeof options === 'function') {
  118. callback = options;
  119. options = {};
  120. }
  121. options = Object.assign({ force: false }, options);
  122. if (this[kStream] == null || this.destroyed) {
  123. this.destroyed = true;
  124. if (typeof callback === 'function') {
  125. callback();
  126. }
  127. return;
  128. }
  129. if (options.force) {
  130. this[kStream].destroy();
  131. this.destroyed = true;
  132. if (typeof callback === 'function') {
  133. callback();
  134. }
  135. return;
  136. }
  137. this[kStream].end(err => {
  138. this.destroyed = true;
  139. if (typeof callback === 'function') {
  140. callback(err);
  141. }
  142. });
  143. }
  144. applyApiVersion(options) {
  145. if (this.serverApi) {
  146. options.serverApi = this.serverApi;
  147. }
  148. return options;
  149. }
  150. // Wire protocol methods
  151. command(ns, cmd, options, callback) {
  152. if (typeof options === 'function') {
  153. callback = options;
  154. options = {};
  155. }
  156. wp.command(makeServerTrampoline(this), ns, cmd, this.applyApiVersion(options), callback);
  157. }
  158. query(ns, cmd, cursorState, options, callback) {
  159. wp.query(
  160. makeServerTrampoline(this),
  161. ns,
  162. cmd,
  163. cursorState,
  164. this.applyApiVersion(options),
  165. callback
  166. );
  167. }
  168. getMore(ns, cursorState, batchSize, options, callback) {
  169. wp.getMore(
  170. makeServerTrampoline(this),
  171. ns,
  172. cursorState,
  173. batchSize,
  174. this.applyApiVersion(options),
  175. callback
  176. );
  177. }
  178. killCursors(ns, cursorState, callback) {
  179. wp.killCursors(makeServerTrampoline(this), ns, cursorState, this.applyApiVersion({}), callback);
  180. }
  181. insert(ns, ops, options, callback) {
  182. wp.insert(makeServerTrampoline(this), ns, ops, this.applyApiVersion(options), callback);
  183. }
  184. update(ns, ops, options, callback) {
  185. wp.update(makeServerTrampoline(this), ns, ops, this.applyApiVersion(options), callback);
  186. }
  187. remove(ns, ops, options, callback) {
  188. wp.remove(makeServerTrampoline(this), ns, ops, this.applyApiVersion(options), callback);
  189. }
  190. }
  191. /// This lets us emulate a legacy `Server` instance so we can work with the existing wire
  192. /// protocol methods. Eventually, the operation executor will return a `Connection` to execute
  193. /// against.
  194. function makeServerTrampoline(connection) {
  195. const server = {
  196. description: connection.description,
  197. clusterTime: connection[kClusterTime],
  198. s: {
  199. bson: connection.bson,
  200. pool: { write: write.bind(connection), isConnected: () => true }
  201. }
  202. };
  203. if (connection[kAutoEncrypter]) {
  204. server.autoEncrypter = connection[kAutoEncrypter];
  205. }
  206. return server;
  207. }
  208. function messageHandler(conn) {
  209. return function messageHandler(message) {
  210. // always emit the message, in case we are streaming
  211. conn.emit('message', message);
  212. if (!conn[kQueue].has(message.responseTo)) {
  213. return;
  214. }
  215. const operationDescription = conn[kQueue].get(message.responseTo);
  216. const callback = operationDescription.cb;
  217. // SERVER-45775: For exhaust responses we should be able to use the same requestId to
  218. // track response, however the server currently synthetically produces remote requests
  219. // making the `responseTo` change on each response
  220. conn[kQueue].delete(message.responseTo);
  221. if (message.moreToCome) {
  222. // requeue the callback for next synthetic request
  223. conn[kQueue].set(message.requestId, operationDescription);
  224. } else if (operationDescription.socketTimeoutOverride) {
  225. conn[kStream].setTimeout(conn.socketTimeout);
  226. }
  227. try {
  228. // Pass in the entire description because it has BSON parsing options
  229. message.parse(operationDescription);
  230. } catch (err) {
  231. callback(new MongoError(err));
  232. return;
  233. }
  234. if (message.documents[0]) {
  235. const document = message.documents[0];
  236. const session = operationDescription.session;
  237. if (session) {
  238. updateSessionFromResponse(session, document);
  239. }
  240. if (document.$clusterTime) {
  241. conn[kClusterTime] = document.$clusterTime;
  242. conn.emit('clusterTimeReceived', document.$clusterTime);
  243. }
  244. if (operationDescription.command) {
  245. if (document.writeConcernError) {
  246. callback(new MongoWriteConcernError(document.writeConcernError, document));
  247. return;
  248. }
  249. if (document.ok === 0 || document.$err || document.errmsg || document.code) {
  250. callback(new MongoError(document));
  251. return;
  252. }
  253. }
  254. }
  255. // NODE-2382: reenable in our glorious non-leaky abstraction future
  256. // callback(null, operationDescription.fullResult ? message : message.documents[0]);
  257. callback(
  258. undefined,
  259. new CommandResult(
  260. operationDescription.fullResult ? message : message.documents[0],
  261. conn,
  262. message
  263. )
  264. );
  265. };
  266. }
  267. function streamIdentifier(stream) {
  268. if (typeof stream.address === 'function') {
  269. return `${stream.remoteAddress}:${stream.remotePort}`;
  270. }
  271. return uuidV4().toString('hex');
  272. }
  273. // Not meant to be called directly, the wire protocol methods call this assuming it is a `Pool` instance
  274. function write(command, options, callback) {
  275. if (typeof options === 'function') {
  276. callback = options;
  277. }
  278. options = options || {};
  279. const operationDescription = {
  280. requestId: command.requestId,
  281. cb: callback,
  282. session: options.session,
  283. fullResult: typeof options.fullResult === 'boolean' ? options.fullResult : false,
  284. noResponse: typeof options.noResponse === 'boolean' ? options.noResponse : false,
  285. documentsReturnedIn: options.documentsReturnedIn,
  286. command: !!options.command,
  287. // for BSON parsing
  288. promoteLongs: typeof options.promoteLongs === 'boolean' ? options.promoteLongs : true,
  289. promoteValues: typeof options.promoteValues === 'boolean' ? options.promoteValues : true,
  290. promoteBuffers: typeof options.promoteBuffers === 'boolean' ? options.promoteBuffers : false,
  291. bsonRegExp: typeof options.bsonRegExp === 'boolean' ? options.bsonRegExp : false,
  292. raw: typeof options.raw === 'boolean' ? options.raw : false
  293. };
  294. if (this[kDescription] && this[kDescription].compressor) {
  295. operationDescription.agreedCompressor = this[kDescription].compressor;
  296. if (this[kDescription].zlibCompressionLevel) {
  297. operationDescription.zlibCompressionLevel = this[kDescription].zlibCompressionLevel;
  298. }
  299. }
  300. if (typeof options.socketTimeout === 'number') {
  301. operationDescription.socketTimeoutOverride = true;
  302. this[kStream].setTimeout(options.socketTimeout);
  303. }
  304. // if command monitoring is enabled we need to modify the callback here
  305. if (this.monitorCommands) {
  306. this.emit('commandStarted', new apm.CommandStartedEvent(this, command));
  307. operationDescription.started = now();
  308. operationDescription.cb = (err, reply) => {
  309. if (err) {
  310. this.emit(
  311. 'commandFailed',
  312. new apm.CommandFailedEvent(this, command, err, operationDescription.started)
  313. );
  314. } else {
  315. if (reply && reply.result && (reply.result.ok === 0 || reply.result.$err)) {
  316. this.emit(
  317. 'commandFailed',
  318. new apm.CommandFailedEvent(this, command, reply.result, operationDescription.started)
  319. );
  320. } else {
  321. this.emit(
  322. 'commandSucceeded',
  323. new apm.CommandSucceededEvent(this, command, reply, operationDescription.started)
  324. );
  325. }
  326. }
  327. if (typeof callback === 'function') {
  328. callback(err, reply);
  329. }
  330. };
  331. }
  332. if (!operationDescription.noResponse) {
  333. this[kQueue].set(operationDescription.requestId, operationDescription);
  334. }
  335. try {
  336. this[kMessageStream].writeCommand(command, operationDescription);
  337. } catch (e) {
  338. if (!operationDescription.noResponse) {
  339. this[kQueue].delete(operationDescription.requestId);
  340. operationDescription.cb(e);
  341. return;
  342. }
  343. }
  344. if (operationDescription.noResponse) {
  345. operationDescription.cb();
  346. }
  347. }
  348. module.exports = {
  349. Connection
  350. };