monitor.js 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.RTTPinger = exports.Monitor = void 0;
  4. const timers_1 = require("timers");
  5. const bson_1 = require("../bson");
  6. const connect_1 = require("../cmap/connect");
  7. const connection_1 = require("../cmap/connection");
  8. const constants_1 = require("../constants");
  9. const error_1 = require("../error");
  10. const mongo_types_1 = require("../mongo_types");
  11. const utils_1 = require("../utils");
  12. const common_1 = require("./common");
  13. const events_1 = require("./events");
  14. const server_1 = require("./server");
  15. /** @internal */
  16. const kServer = Symbol('server');
  17. /** @internal */
  18. const kMonitorId = Symbol('monitorId');
  19. /** @internal */
  20. const kConnection = Symbol('connection');
  21. /** @internal */
  22. const kCancellationToken = Symbol('cancellationToken');
  23. /** @internal */
  24. const kRTTPinger = Symbol('rttPinger');
  25. /** @internal */
  26. const kRoundTripTime = Symbol('roundTripTime');
  27. const STATE_IDLE = 'idle';
  28. const STATE_MONITORING = 'monitoring';
  29. const stateTransition = (0, utils_1.makeStateMachine)({
  30. [common_1.STATE_CLOSING]: [common_1.STATE_CLOSING, STATE_IDLE, common_1.STATE_CLOSED],
  31. [common_1.STATE_CLOSED]: [common_1.STATE_CLOSED, STATE_MONITORING],
  32. [STATE_IDLE]: [STATE_IDLE, STATE_MONITORING, common_1.STATE_CLOSING],
  33. [STATE_MONITORING]: [STATE_MONITORING, STATE_IDLE, common_1.STATE_CLOSING]
  34. });
  35. const INVALID_REQUEST_CHECK_STATES = new Set([common_1.STATE_CLOSING, common_1.STATE_CLOSED, STATE_MONITORING]);
  36. function isInCloseState(monitor) {
  37. return monitor.s.state === common_1.STATE_CLOSED || monitor.s.state === common_1.STATE_CLOSING;
  38. }
  39. /** @internal */
  40. class Monitor extends mongo_types_1.TypedEventEmitter {
  41. constructor(server, options) {
  42. var _a, _b, _c;
  43. super();
  44. this[kServer] = server;
  45. this[kConnection] = undefined;
  46. this[kCancellationToken] = new mongo_types_1.CancellationToken();
  47. this[kCancellationToken].setMaxListeners(Infinity);
  48. this[kMonitorId] = undefined;
  49. this.s = {
  50. state: common_1.STATE_CLOSED
  51. };
  52. this.address = server.description.address;
  53. this.options = Object.freeze({
  54. connectTimeoutMS: (_a = options.connectTimeoutMS) !== null && _a !== void 0 ? _a : 10000,
  55. heartbeatFrequencyMS: (_b = options.heartbeatFrequencyMS) !== null && _b !== void 0 ? _b : 10000,
  56. minHeartbeatFrequencyMS: (_c = options.minHeartbeatFrequencyMS) !== null && _c !== void 0 ? _c : 500
  57. });
  58. const cancellationToken = this[kCancellationToken];
  59. // TODO: refactor this to pull it directly from the pool, requires new ConnectionPool integration
  60. const connectOptions = Object.assign({
  61. id: '<monitor>',
  62. generation: server.s.pool.generation,
  63. connectionType: connection_1.Connection,
  64. cancellationToken,
  65. hostAddress: server.description.hostAddress
  66. }, options,
  67. // force BSON serialization options
  68. {
  69. raw: false,
  70. promoteLongs: true,
  71. promoteValues: true,
  72. promoteBuffers: true
  73. });
  74. // ensure no authentication is used for monitoring
  75. delete connectOptions.credentials;
  76. if (connectOptions.autoEncrypter) {
  77. delete connectOptions.autoEncrypter;
  78. }
  79. this.connectOptions = Object.freeze(connectOptions);
  80. }
  81. get connection() {
  82. return this[kConnection];
  83. }
  84. connect() {
  85. if (this.s.state !== common_1.STATE_CLOSED) {
  86. return;
  87. }
  88. // start
  89. const heartbeatFrequencyMS = this.options.heartbeatFrequencyMS;
  90. const minHeartbeatFrequencyMS = this.options.minHeartbeatFrequencyMS;
  91. this[kMonitorId] = (0, utils_1.makeInterruptibleAsyncInterval)(monitorServer(this), {
  92. interval: heartbeatFrequencyMS,
  93. minInterval: minHeartbeatFrequencyMS,
  94. immediate: true
  95. });
  96. }
  97. requestCheck() {
  98. var _a;
  99. if (INVALID_REQUEST_CHECK_STATES.has(this.s.state)) {
  100. return;
  101. }
  102. (_a = this[kMonitorId]) === null || _a === void 0 ? void 0 : _a.wake();
  103. }
  104. reset() {
  105. const topologyVersion = this[kServer].description.topologyVersion;
  106. if (isInCloseState(this) || topologyVersion == null) {
  107. return;
  108. }
  109. stateTransition(this, common_1.STATE_CLOSING);
  110. resetMonitorState(this);
  111. // restart monitor
  112. stateTransition(this, STATE_IDLE);
  113. // restart monitoring
  114. const heartbeatFrequencyMS = this.options.heartbeatFrequencyMS;
  115. const minHeartbeatFrequencyMS = this.options.minHeartbeatFrequencyMS;
  116. this[kMonitorId] = (0, utils_1.makeInterruptibleAsyncInterval)(monitorServer(this), {
  117. interval: heartbeatFrequencyMS,
  118. minInterval: minHeartbeatFrequencyMS
  119. });
  120. }
  121. close() {
  122. if (isInCloseState(this)) {
  123. return;
  124. }
  125. stateTransition(this, common_1.STATE_CLOSING);
  126. resetMonitorState(this);
  127. // close monitor
  128. this.emit('close');
  129. stateTransition(this, common_1.STATE_CLOSED);
  130. }
  131. }
  132. exports.Monitor = Monitor;
  133. function resetMonitorState(monitor) {
  134. var _a, _b, _c;
  135. (_a = monitor[kMonitorId]) === null || _a === void 0 ? void 0 : _a.stop();
  136. monitor[kMonitorId] = undefined;
  137. (_b = monitor[kRTTPinger]) === null || _b === void 0 ? void 0 : _b.close();
  138. monitor[kRTTPinger] = undefined;
  139. monitor[kCancellationToken].emit('cancel');
  140. (_c = monitor[kConnection]) === null || _c === void 0 ? void 0 : _c.destroy({ force: true });
  141. monitor[kConnection] = undefined;
  142. }
  143. function checkServer(monitor, callback) {
  144. let start = (0, utils_1.now)();
  145. monitor.emit(server_1.Server.SERVER_HEARTBEAT_STARTED, new events_1.ServerHeartbeatStartedEvent(monitor.address));
  146. function failureHandler(err) {
  147. var _a;
  148. (_a = monitor[kConnection]) === null || _a === void 0 ? void 0 : _a.destroy({ force: true });
  149. monitor[kConnection] = undefined;
  150. monitor.emit(server_1.Server.SERVER_HEARTBEAT_FAILED, new events_1.ServerHeartbeatFailedEvent(monitor.address, (0, utils_1.calculateDurationInMs)(start), err));
  151. monitor.emit('resetServer', err);
  152. monitor.emit('resetConnectionPool');
  153. callback(err);
  154. }
  155. const connection = monitor[kConnection];
  156. if (connection && !connection.closed) {
  157. const { serverApi, helloOk } = connection;
  158. const connectTimeoutMS = monitor.options.connectTimeoutMS;
  159. const maxAwaitTimeMS = monitor.options.heartbeatFrequencyMS;
  160. const topologyVersion = monitor[kServer].description.topologyVersion;
  161. const isAwaitable = topologyVersion != null;
  162. const cmd = {
  163. [(serverApi === null || serverApi === void 0 ? void 0 : serverApi.version) || helloOk ? 'hello' : constants_1.LEGACY_HELLO_COMMAND]: true,
  164. ...(isAwaitable && topologyVersion
  165. ? { maxAwaitTimeMS, topologyVersion: makeTopologyVersion(topologyVersion) }
  166. : {})
  167. };
  168. const options = isAwaitable
  169. ? {
  170. socketTimeoutMS: connectTimeoutMS ? connectTimeoutMS + maxAwaitTimeMS : 0,
  171. exhaustAllowed: true
  172. }
  173. : { socketTimeoutMS: connectTimeoutMS };
  174. if (isAwaitable && monitor[kRTTPinger] == null) {
  175. monitor[kRTTPinger] = new RTTPinger(monitor[kCancellationToken], Object.assign({ heartbeatFrequencyMS: monitor.options.heartbeatFrequencyMS }, monitor.connectOptions));
  176. }
  177. connection.command((0, utils_1.ns)('admin.$cmd'), cmd, options, (err, hello) => {
  178. var _a;
  179. if (err) {
  180. return failureHandler(err);
  181. }
  182. if (!('isWritablePrimary' in hello)) {
  183. // Provide hello-style response document.
  184. hello.isWritablePrimary = hello[constants_1.LEGACY_HELLO_COMMAND];
  185. }
  186. const rttPinger = monitor[kRTTPinger];
  187. const duration = isAwaitable && rttPinger ? rttPinger.roundTripTime : (0, utils_1.calculateDurationInMs)(start);
  188. monitor.emit(server_1.Server.SERVER_HEARTBEAT_SUCCEEDED, new events_1.ServerHeartbeatSucceededEvent(monitor.address, duration, hello));
  189. // if we are using the streaming protocol then we immediately issue another `started`
  190. // event, otherwise the "check" is complete and return to the main monitor loop
  191. if (isAwaitable && hello.topologyVersion) {
  192. monitor.emit(server_1.Server.SERVER_HEARTBEAT_STARTED, new events_1.ServerHeartbeatStartedEvent(monitor.address));
  193. start = (0, utils_1.now)();
  194. }
  195. else {
  196. (_a = monitor[kRTTPinger]) === null || _a === void 0 ? void 0 : _a.close();
  197. monitor[kRTTPinger] = undefined;
  198. callback(undefined, hello);
  199. }
  200. });
  201. return;
  202. }
  203. // connecting does an implicit `hello`
  204. (0, connect_1.connect)(monitor.connectOptions, (err, conn) => {
  205. if (err) {
  206. monitor[kConnection] = undefined;
  207. // we already reset the connection pool on network errors in all cases
  208. if (!(err instanceof error_1.MongoNetworkError)) {
  209. monitor.emit('resetConnectionPool');
  210. }
  211. failureHandler(err);
  212. return;
  213. }
  214. if (conn) {
  215. // Tell the connection that we are using the streaming protocol so that the
  216. // connection's message stream will only read the last hello on the buffer.
  217. conn.isMonitoringConnection = true;
  218. if (isInCloseState(monitor)) {
  219. conn.destroy({ force: true });
  220. return;
  221. }
  222. monitor[kConnection] = conn;
  223. monitor.emit(server_1.Server.SERVER_HEARTBEAT_SUCCEEDED, new events_1.ServerHeartbeatSucceededEvent(monitor.address, (0, utils_1.calculateDurationInMs)(start), conn.hello));
  224. callback(undefined, conn.hello);
  225. }
  226. });
  227. }
  228. function monitorServer(monitor) {
  229. return (callback) => {
  230. stateTransition(monitor, STATE_MONITORING);
  231. function done() {
  232. if (!isInCloseState(monitor)) {
  233. stateTransition(monitor, STATE_IDLE);
  234. }
  235. callback();
  236. }
  237. checkServer(monitor, (err, hello) => {
  238. if (err) {
  239. // otherwise an error occurred on initial discovery, also bail
  240. if (monitor[kServer].description.type === common_1.ServerType.Unknown) {
  241. monitor.emit('resetServer', err);
  242. return done();
  243. }
  244. }
  245. // if the check indicates streaming is supported, immediately reschedule monitoring
  246. if (hello && hello.topologyVersion) {
  247. (0, timers_1.setTimeout)(() => {
  248. var _a;
  249. if (!isInCloseState(monitor)) {
  250. (_a = monitor[kMonitorId]) === null || _a === void 0 ? void 0 : _a.wake();
  251. }
  252. }, 0);
  253. }
  254. done();
  255. });
  256. };
  257. }
  258. function makeTopologyVersion(tv) {
  259. return {
  260. processId: tv.processId,
  261. // tests mock counter as just number, but in a real situation counter should always be a Long
  262. counter: bson_1.Long.isLong(tv.counter) ? tv.counter : bson_1.Long.fromNumber(tv.counter)
  263. };
  264. }
  265. /** @internal */
  266. class RTTPinger {
  267. constructor(cancellationToken, options) {
  268. this[kConnection] = undefined;
  269. this[kCancellationToken] = cancellationToken;
  270. this[kRoundTripTime] = 0;
  271. this.closed = false;
  272. const heartbeatFrequencyMS = options.heartbeatFrequencyMS;
  273. this[kMonitorId] = (0, timers_1.setTimeout)(() => measureRoundTripTime(this, options), heartbeatFrequencyMS);
  274. }
  275. get roundTripTime() {
  276. return this[kRoundTripTime];
  277. }
  278. close() {
  279. var _a;
  280. this.closed = true;
  281. clearTimeout(this[kMonitorId]);
  282. (_a = this[kConnection]) === null || _a === void 0 ? void 0 : _a.destroy({ force: true });
  283. this[kConnection] = undefined;
  284. }
  285. }
  286. exports.RTTPinger = RTTPinger;
  287. function measureRoundTripTime(rttPinger, options) {
  288. const start = (0, utils_1.now)();
  289. options.cancellationToken = rttPinger[kCancellationToken];
  290. const heartbeatFrequencyMS = options.heartbeatFrequencyMS;
  291. if (rttPinger.closed) {
  292. return;
  293. }
  294. function measureAndReschedule(conn) {
  295. if (rttPinger.closed) {
  296. conn === null || conn === void 0 ? void 0 : conn.destroy({ force: true });
  297. return;
  298. }
  299. if (rttPinger[kConnection] == null) {
  300. rttPinger[kConnection] = conn;
  301. }
  302. rttPinger[kRoundTripTime] = (0, utils_1.calculateDurationInMs)(start);
  303. rttPinger[kMonitorId] = (0, timers_1.setTimeout)(() => measureRoundTripTime(rttPinger, options), heartbeatFrequencyMS);
  304. }
  305. const connection = rttPinger[kConnection];
  306. if (connection == null) {
  307. (0, connect_1.connect)(options, (err, conn) => {
  308. if (err) {
  309. rttPinger[kConnection] = undefined;
  310. rttPinger[kRoundTripTime] = 0;
  311. return;
  312. }
  313. measureAndReschedule(conn);
  314. });
  315. return;
  316. }
  317. connection.command((0, utils_1.ns)('admin.$cmd'), { [constants_1.LEGACY_HELLO_COMMAND]: 1 }, undefined, err => {
  318. if (err) {
  319. rttPinger[kConnection] = undefined;
  320. rttPinger[kRoundTripTime] = 0;
  321. return;
  322. }
  323. measureAndReschedule();
  324. });
  325. }
  326. //# sourceMappingURL=monitor.js.map