connection.js 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.hasSessionSupport = exports.CryptoConnection = exports.Connection = void 0;
  4. const timers_1 = require("timers");
  5. const bson_1 = require("../bson");
  6. const constants_1 = require("../constants");
  7. const error_1 = require("../error");
  8. const mongo_types_1 = require("../mongo_types");
  9. const sessions_1 = require("../sessions");
  10. const utils_1 = require("../utils");
  11. const command_monitoring_events_1 = require("./command_monitoring_events");
  12. const commands_1 = require("./commands");
  13. const message_stream_1 = require("./message_stream");
  14. const stream_description_1 = require("./stream_description");
  15. const shared_1 = require("./wire_protocol/shared");
  16. /** @internal */
  17. const kStream = Symbol('stream');
  18. /** @internal */
  19. const kQueue = Symbol('queue');
  20. /** @internal */
  21. const kMessageStream = Symbol('messageStream');
  22. /** @internal */
  23. const kGeneration = Symbol('generation');
  24. /** @internal */
  25. const kLastUseTime = Symbol('lastUseTime');
  26. /** @internal */
  27. const kClusterTime = Symbol('clusterTime');
  28. /** @internal */
  29. const kDescription = Symbol('description');
  30. /** @internal */
  31. const kHello = Symbol('hello');
  32. /** @internal */
  33. const kAutoEncrypter = Symbol('autoEncrypter');
  34. /** @internal */
  35. const kFullResult = Symbol('fullResult');
  36. /** @internal */
  37. const kDelayedTimeoutId = Symbol('delayedTimeoutId');
  38. /** @internal */
  39. class Connection extends mongo_types_1.TypedEventEmitter {
  40. constructor(stream, options) {
  41. var _a, _b;
  42. super();
  43. this.id = options.id;
  44. this.address = streamIdentifier(stream, options);
  45. this.socketTimeoutMS = (_a = options.socketTimeoutMS) !== null && _a !== void 0 ? _a : 0;
  46. this.monitorCommands = options.monitorCommands;
  47. this.serverApi = options.serverApi;
  48. this.closed = false;
  49. this.destroyed = false;
  50. this[kHello] = null;
  51. this[kClusterTime] = null;
  52. this[kDescription] = new stream_description_1.StreamDescription(this.address, options);
  53. this[kGeneration] = options.generation;
  54. this[kLastUseTime] = (0, utils_1.now)();
  55. // setup parser stream and message handling
  56. this[kQueue] = new Map();
  57. this[kMessageStream] = new message_stream_1.MessageStream({
  58. ...options,
  59. maxBsonMessageSize: (_b = this.hello) === null || _b === void 0 ? void 0 : _b.maxBsonMessageSize
  60. });
  61. this[kStream] = stream;
  62. this[kDelayedTimeoutId] = null;
  63. this[kMessageStream].on('message', message => this.onMessage(message));
  64. this[kMessageStream].on('error', error => this.onError(error));
  65. this[kStream].on('close', () => this.onClose());
  66. this[kStream].on('timeout', () => this.onTimeout());
  67. this[kStream].on('error', () => {
  68. /* ignore errors, listen to `close` instead */
  69. });
  70. // hook the message stream up to the passed in stream
  71. this[kStream].pipe(this[kMessageStream]);
  72. this[kMessageStream].pipe(this[kStream]);
  73. }
  74. get description() {
  75. return this[kDescription];
  76. }
  77. get hello() {
  78. return this[kHello];
  79. }
  80. // the `connect` method stores the result of the handshake hello on the connection
  81. set hello(response) {
  82. this[kDescription].receiveResponse(response);
  83. this[kDescription] = Object.freeze(this[kDescription]);
  84. // TODO: remove this, and only use the `StreamDescription` in the future
  85. this[kHello] = response;
  86. }
  87. // Set the whether the message stream is for a monitoring connection.
  88. set isMonitoringConnection(value) {
  89. this[kMessageStream].isMonitoringConnection = value;
  90. }
  91. get isMonitoringConnection() {
  92. return this[kMessageStream].isMonitoringConnection;
  93. }
  94. get serviceId() {
  95. var _a;
  96. return (_a = this.hello) === null || _a === void 0 ? void 0 : _a.serviceId;
  97. }
  98. get loadBalanced() {
  99. return this.description.loadBalanced;
  100. }
  101. get generation() {
  102. return this[kGeneration] || 0;
  103. }
  104. set generation(generation) {
  105. this[kGeneration] = generation;
  106. }
  107. get idleTime() {
  108. return (0, utils_1.calculateDurationInMs)(this[kLastUseTime]);
  109. }
  110. get clusterTime() {
  111. return this[kClusterTime];
  112. }
  113. get stream() {
  114. return this[kStream];
  115. }
  116. markAvailable() {
  117. this[kLastUseTime] = (0, utils_1.now)();
  118. }
  119. onError(error) {
  120. if (this.closed) {
  121. return;
  122. }
  123. this[kStream].destroy(error);
  124. this.closed = true;
  125. for (const op of this[kQueue].values()) {
  126. op.cb(error);
  127. }
  128. this[kQueue].clear();
  129. this.emit(Connection.CLOSE);
  130. }
  131. onClose() {
  132. if (this.closed) {
  133. return;
  134. }
  135. this.closed = true;
  136. const message = `connection ${this.id} to ${this.address} closed`;
  137. for (const op of this[kQueue].values()) {
  138. op.cb(new error_1.MongoNetworkError(message));
  139. }
  140. this[kQueue].clear();
  141. this.emit(Connection.CLOSE);
  142. }
  143. onTimeout() {
  144. if (this.closed) {
  145. return;
  146. }
  147. this[kDelayedTimeoutId] = (0, timers_1.setTimeout)(() => {
  148. this[kStream].destroy();
  149. this.closed = true;
  150. const message = `connection ${this.id} to ${this.address} timed out`;
  151. const beforeHandshake = this.hello == null;
  152. for (const op of this[kQueue].values()) {
  153. op.cb(new error_1.MongoNetworkTimeoutError(message, { beforeHandshake }));
  154. }
  155. this[kQueue].clear();
  156. this.emit(Connection.CLOSE);
  157. }, 1).unref(); // No need for this timer to hold the event loop open
  158. }
  159. onMessage(message) {
  160. const delayedTimeoutId = this[kDelayedTimeoutId];
  161. if (delayedTimeoutId != null) {
  162. clearTimeout(delayedTimeoutId);
  163. this[kDelayedTimeoutId] = null;
  164. }
  165. // always emit the message, in case we are streaming
  166. this.emit('message', message);
  167. const operationDescription = this[kQueue].get(message.responseTo);
  168. if (!operationDescription) {
  169. return;
  170. }
  171. const callback = operationDescription.cb;
  172. // SERVER-45775: For exhaust responses we should be able to use the same requestId to
  173. // track response, however the server currently synthetically produces remote requests
  174. // making the `responseTo` change on each response
  175. this[kQueue].delete(message.responseTo);
  176. if ('moreToCome' in message && message.moreToCome) {
  177. // requeue the callback for next synthetic request
  178. this[kQueue].set(message.requestId, operationDescription);
  179. }
  180. else if (operationDescription.socketTimeoutOverride) {
  181. this[kStream].setTimeout(this.socketTimeoutMS);
  182. }
  183. try {
  184. // Pass in the entire description because it has BSON parsing options
  185. message.parse(operationDescription);
  186. }
  187. catch (err) {
  188. // If this error is generated by our own code, it will already have the correct class applied
  189. // if it is not, then it is coming from a catastrophic data parse failure or the BSON library
  190. // in either case, it should not be wrapped
  191. callback(err);
  192. return;
  193. }
  194. if (message.documents[0]) {
  195. const document = message.documents[0];
  196. const session = operationDescription.session;
  197. if (session) {
  198. (0, sessions_1.updateSessionFromResponse)(session, document);
  199. }
  200. if (document.$clusterTime) {
  201. this[kClusterTime] = document.$clusterTime;
  202. this.emit(Connection.CLUSTER_TIME_RECEIVED, document.$clusterTime);
  203. }
  204. if (operationDescription.command) {
  205. if (document.writeConcernError) {
  206. callback(new error_1.MongoWriteConcernError(document.writeConcernError, document));
  207. return;
  208. }
  209. if (document.ok === 0 || document.$err || document.errmsg || document.code) {
  210. callback(new error_1.MongoServerError(document));
  211. return;
  212. }
  213. }
  214. else {
  215. // Pre 3.2 support
  216. if (document.ok === 0 || document.$err || document.errmsg) {
  217. callback(new error_1.MongoServerError(document));
  218. return;
  219. }
  220. }
  221. }
  222. callback(undefined, operationDescription.fullResult ? message : message.documents[0]);
  223. }
  224. destroy(options, callback) {
  225. if (typeof options === 'function') {
  226. callback = options;
  227. options = { force: false };
  228. }
  229. this.removeAllListeners(Connection.PINNED);
  230. this.removeAllListeners(Connection.UNPINNED);
  231. options = Object.assign({ force: false }, options);
  232. if (this[kStream] == null || this.destroyed) {
  233. this.destroyed = true;
  234. if (typeof callback === 'function') {
  235. callback();
  236. }
  237. return;
  238. }
  239. if (options.force) {
  240. this[kStream].destroy();
  241. this.destroyed = true;
  242. if (typeof callback === 'function') {
  243. callback();
  244. }
  245. return;
  246. }
  247. this[kStream].end(() => {
  248. this.destroyed = true;
  249. if (typeof callback === 'function') {
  250. callback();
  251. }
  252. });
  253. }
  254. command(ns, cmd, options, callback) {
  255. if (!(ns instanceof utils_1.MongoDBNamespace)) {
  256. // TODO(NODE-3483): Replace this with a MongoCommandError
  257. throw new error_1.MongoRuntimeError('Must provide a MongoDBNamespace instance');
  258. }
  259. const readPreference = (0, shared_1.getReadPreference)(cmd, options);
  260. const shouldUseOpMsg = supportsOpMsg(this);
  261. const session = options === null || options === void 0 ? void 0 : options.session;
  262. let clusterTime = this.clusterTime;
  263. let finalCmd = Object.assign({}, cmd);
  264. if (this.serverApi) {
  265. const { version, strict, deprecationErrors } = this.serverApi;
  266. finalCmd.apiVersion = version;
  267. if (strict != null)
  268. finalCmd.apiStrict = strict;
  269. if (deprecationErrors != null)
  270. finalCmd.apiDeprecationErrors = deprecationErrors;
  271. }
  272. if (hasSessionSupport(this) && session) {
  273. if (session.clusterTime &&
  274. clusterTime &&
  275. session.clusterTime.clusterTime.greaterThan(clusterTime.clusterTime)) {
  276. clusterTime = session.clusterTime;
  277. }
  278. const err = (0, sessions_1.applySession)(session, finalCmd, options);
  279. if (err) {
  280. return callback(err);
  281. }
  282. }
  283. // if we have a known cluster time, gossip it
  284. if (clusterTime) {
  285. finalCmd.$clusterTime = clusterTime;
  286. }
  287. if ((0, shared_1.isSharded)(this) && !shouldUseOpMsg && readPreference && readPreference.mode !== 'primary') {
  288. finalCmd = {
  289. $query: finalCmd,
  290. $readPreference: readPreference.toJSON()
  291. };
  292. }
  293. const commandOptions = Object.assign({
  294. command: true,
  295. numberToSkip: 0,
  296. numberToReturn: -1,
  297. checkKeys: false,
  298. // This value is not overridable
  299. secondaryOk: readPreference.secondaryOk()
  300. }, options);
  301. const cmdNs = `${ns.db}.$cmd`;
  302. const message = shouldUseOpMsg
  303. ? new commands_1.Msg(cmdNs, finalCmd, commandOptions)
  304. : new commands_1.Query(cmdNs, finalCmd, commandOptions);
  305. try {
  306. write(this, message, commandOptions, callback);
  307. }
  308. catch (err) {
  309. callback(err);
  310. }
  311. }
  312. getMore(ns, cursorId, options, callback) {
  313. const fullResult = !!options[kFullResult];
  314. const wireVersion = (0, utils_1.maxWireVersion)(this);
  315. if (!cursorId) {
  316. // TODO(NODE-3483): Replace this with a MongoCommandError
  317. callback(new error_1.MongoRuntimeError('Invalid internal cursor state, no known cursor id'));
  318. return;
  319. }
  320. if (wireVersion < 4) {
  321. const getMoreOp = new commands_1.GetMore(ns.toString(), cursorId, { numberToReturn: options.batchSize });
  322. const queryOptions = (0, shared_1.applyCommonQueryOptions)({}, Object.assign(options, { ...(0, bson_1.pluckBSONSerializeOptions)(options) }));
  323. queryOptions[kFullResult] = true;
  324. queryOptions.command = true;
  325. write(this, getMoreOp, queryOptions, (err, response) => {
  326. if (fullResult)
  327. return callback(err, response);
  328. if (err)
  329. return callback(err);
  330. callback(undefined, { cursor: { id: response.cursorId, nextBatch: response.documents } });
  331. });
  332. return;
  333. }
  334. const getMoreCmd = {
  335. getMore: cursorId,
  336. collection: ns.collection
  337. };
  338. if (typeof options.batchSize === 'number') {
  339. getMoreCmd.batchSize = Math.abs(options.batchSize);
  340. }
  341. if (typeof options.maxAwaitTimeMS === 'number') {
  342. getMoreCmd.maxTimeMS = options.maxAwaitTimeMS;
  343. }
  344. // we check for undefined specifically here to allow falsy values
  345. // eslint-disable-next-line no-restricted-syntax
  346. if (options.comment !== undefined) {
  347. getMoreCmd.comment = options.comment;
  348. }
  349. const commandOptions = Object.assign({
  350. returnFieldSelector: null,
  351. documentsReturnedIn: 'nextBatch'
  352. }, options);
  353. this.command(ns, getMoreCmd, commandOptions, callback);
  354. }
  355. killCursors(ns, cursorIds, options, callback) {
  356. if (!cursorIds || !Array.isArray(cursorIds)) {
  357. // TODO(NODE-3483): Replace this with a MongoCommandError
  358. throw new error_1.MongoRuntimeError(`Invalid list of cursor ids provided: ${cursorIds}`);
  359. }
  360. if ((0, utils_1.maxWireVersion)(this) < 4) {
  361. try {
  362. write(this, new commands_1.KillCursor(ns.toString(), cursorIds), { noResponse: true, ...options }, callback);
  363. }
  364. catch (err) {
  365. callback(err);
  366. }
  367. return;
  368. }
  369. this.command(ns, { killCursors: ns.collection, cursors: cursorIds }, { [kFullResult]: true, ...options }, (err, response) => {
  370. if (err || !response)
  371. return callback(err);
  372. if (response.cursorNotFound) {
  373. return callback(new error_1.MongoNetworkError('cursor killed or timed out'), null);
  374. }
  375. if (!Array.isArray(response.documents) || response.documents.length === 0) {
  376. return callback(
  377. // TODO(NODE-3483)
  378. new error_1.MongoRuntimeError(`invalid killCursors result returned for cursor id ${cursorIds[0]}`));
  379. }
  380. callback(undefined, response.documents[0]);
  381. });
  382. }
  383. }
  384. exports.Connection = Connection;
  385. /** @event */
  386. Connection.COMMAND_STARTED = constants_1.COMMAND_STARTED;
  387. /** @event */
  388. Connection.COMMAND_SUCCEEDED = constants_1.COMMAND_SUCCEEDED;
  389. /** @event */
  390. Connection.COMMAND_FAILED = constants_1.COMMAND_FAILED;
  391. /** @event */
  392. Connection.CLUSTER_TIME_RECEIVED = constants_1.CLUSTER_TIME_RECEIVED;
  393. /** @event */
  394. Connection.CLOSE = constants_1.CLOSE;
  395. /** @event */
  396. Connection.MESSAGE = constants_1.MESSAGE;
  397. /** @event */
  398. Connection.PINNED = constants_1.PINNED;
  399. /** @event */
  400. Connection.UNPINNED = constants_1.UNPINNED;
  401. /** @internal */
  402. class CryptoConnection extends Connection {
  403. constructor(stream, options) {
  404. super(stream, options);
  405. this[kAutoEncrypter] = options.autoEncrypter;
  406. }
  407. /** @internal @override */
  408. command(ns, cmd, options, callback) {
  409. const autoEncrypter = this[kAutoEncrypter];
  410. if (!autoEncrypter) {
  411. return callback(new error_1.MongoMissingDependencyError('No AutoEncrypter available for encryption'));
  412. }
  413. const serverWireVersion = (0, utils_1.maxWireVersion)(this);
  414. if (serverWireVersion === 0) {
  415. // This means the initial handshake hasn't happened yet
  416. return super.command(ns, cmd, options, callback);
  417. }
  418. if (serverWireVersion < 8) {
  419. callback(new error_1.MongoCompatibilityError('Auto-encryption requires a minimum MongoDB version of 4.2'));
  420. return;
  421. }
  422. autoEncrypter.encrypt(ns.toString(), cmd, options, (err, encrypted) => {
  423. if (err || encrypted == null) {
  424. callback(err, null);
  425. return;
  426. }
  427. super.command(ns, encrypted, options, (err, response) => {
  428. if (err || response == null) {
  429. callback(err, response);
  430. return;
  431. }
  432. autoEncrypter.decrypt(response, options, callback);
  433. });
  434. });
  435. }
  436. }
  437. exports.CryptoConnection = CryptoConnection;
  438. /** @internal */
  439. function hasSessionSupport(conn) {
  440. const description = conn.description;
  441. return description.logicalSessionTimeoutMinutes != null || !!description.loadBalanced;
  442. }
  443. exports.hasSessionSupport = hasSessionSupport;
  444. function supportsOpMsg(conn) {
  445. const description = conn.description;
  446. if (description == null) {
  447. return false;
  448. }
  449. return (0, utils_1.maxWireVersion)(conn) >= 6 && !description.__nodejs_mock_server__;
  450. }
  451. function streamIdentifier(stream, options) {
  452. if (options.proxyHost) {
  453. // If proxy options are specified, the properties of `stream` itself
  454. // will not accurately reflect what endpoint this is connected to.
  455. return options.hostAddress.toString();
  456. }
  457. if (typeof stream.address === 'function') {
  458. return `${stream.remoteAddress}:${stream.remotePort}`;
  459. }
  460. return (0, utils_1.uuidV4)().toString('hex');
  461. }
  462. function write(conn, command, options, callback) {
  463. if (typeof options === 'function') {
  464. callback = options;
  465. }
  466. options = options !== null && options !== void 0 ? options : {};
  467. const operationDescription = {
  468. requestId: command.requestId,
  469. cb: callback,
  470. session: options.session,
  471. fullResult: !!options[kFullResult],
  472. noResponse: typeof options.noResponse === 'boolean' ? options.noResponse : false,
  473. documentsReturnedIn: options.documentsReturnedIn,
  474. command: !!options.command,
  475. // for BSON parsing
  476. promoteLongs: typeof options.promoteLongs === 'boolean' ? options.promoteLongs : true,
  477. promoteValues: typeof options.promoteValues === 'boolean' ? options.promoteValues : true,
  478. promoteBuffers: typeof options.promoteBuffers === 'boolean' ? options.promoteBuffers : false,
  479. bsonRegExp: typeof options.bsonRegExp === 'boolean' ? options.bsonRegExp : false,
  480. enableUtf8Validation: typeof options.enableUtf8Validation === 'boolean' ? options.enableUtf8Validation : true,
  481. raw: typeof options.raw === 'boolean' ? options.raw : false,
  482. started: 0
  483. };
  484. if (conn[kDescription] && conn[kDescription].compressor) {
  485. operationDescription.agreedCompressor = conn[kDescription].compressor;
  486. if (conn[kDescription].zlibCompressionLevel) {
  487. operationDescription.zlibCompressionLevel = conn[kDescription].zlibCompressionLevel;
  488. }
  489. }
  490. if (typeof options.socketTimeoutMS === 'number') {
  491. operationDescription.socketTimeoutOverride = true;
  492. conn[kStream].setTimeout(options.socketTimeoutMS);
  493. }
  494. // if command monitoring is enabled we need to modify the callback here
  495. if (conn.monitorCommands) {
  496. conn.emit(Connection.COMMAND_STARTED, new command_monitoring_events_1.CommandStartedEvent(conn, command));
  497. operationDescription.started = (0, utils_1.now)();
  498. operationDescription.cb = (err, reply) => {
  499. if (err) {
  500. conn.emit(Connection.COMMAND_FAILED, new command_monitoring_events_1.CommandFailedEvent(conn, command, err, operationDescription.started));
  501. }
  502. else {
  503. if (reply && (reply.ok === 0 || reply.$err)) {
  504. conn.emit(Connection.COMMAND_FAILED, new command_monitoring_events_1.CommandFailedEvent(conn, command, reply, operationDescription.started));
  505. }
  506. else {
  507. conn.emit(Connection.COMMAND_SUCCEEDED, new command_monitoring_events_1.CommandSucceededEvent(conn, command, reply, operationDescription.started));
  508. }
  509. }
  510. if (typeof callback === 'function') {
  511. callback(err, reply);
  512. }
  513. };
  514. }
  515. if (!operationDescription.noResponse) {
  516. conn[kQueue].set(operationDescription.requestId, operationDescription);
  517. }
  518. try {
  519. conn[kMessageStream].writeCommand(command, operationDescription);
  520. }
  521. catch (e) {
  522. if (!operationDescription.noResponse) {
  523. conn[kQueue].delete(operationDescription.requestId);
  524. operationDescription.cb(e);
  525. return;
  526. }
  527. }
  528. if (operationDescription.noResponse) {
  529. operationDescription.cb();
  530. }
  531. }
  532. //# sourceMappingURL=connection.js.map