monitor.js 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.RTTPinger = exports.Monitor = void 0;
  4. const bson_1 = require("../bson");
  5. const connect_1 = require("../cmap/connect");
  6. const connection_1 = require("../cmap/connection");
  7. const constants_1 = require("../constants");
  8. const error_1 = require("../error");
  9. const mongo_types_1 = require("../mongo_types");
  10. const utils_1 = require("../utils");
  11. const common_1 = require("./common");
  12. const events_1 = require("./events");
  13. const server_1 = require("./server");
  14. /** @internal */
  15. const kServer = Symbol('server');
  16. /** @internal */
  17. const kMonitorId = Symbol('monitorId');
  18. /** @internal */
  19. const kConnection = Symbol('connection');
  20. /** @internal */
  21. const kCancellationToken = Symbol('cancellationToken');
  22. /** @internal */
  23. const kRTTPinger = Symbol('rttPinger');
  24. /** @internal */
  25. const kRoundTripTime = Symbol('roundTripTime');
  26. const STATE_IDLE = 'idle';
  27. const STATE_MONITORING = 'monitoring';
  28. const stateTransition = (0, utils_1.makeStateMachine)({
  29. [common_1.STATE_CLOSING]: [common_1.STATE_CLOSING, STATE_IDLE, common_1.STATE_CLOSED],
  30. [common_1.STATE_CLOSED]: [common_1.STATE_CLOSED, STATE_MONITORING],
  31. [STATE_IDLE]: [STATE_IDLE, STATE_MONITORING, common_1.STATE_CLOSING],
  32. [STATE_MONITORING]: [STATE_MONITORING, STATE_IDLE, common_1.STATE_CLOSING]
  33. });
  34. const INVALID_REQUEST_CHECK_STATES = new Set([common_1.STATE_CLOSING, common_1.STATE_CLOSED, STATE_MONITORING]);
  35. function isInCloseState(monitor) {
  36. return monitor.s.state === common_1.STATE_CLOSED || monitor.s.state === common_1.STATE_CLOSING;
  37. }
  38. /** @internal */
  39. class Monitor extends mongo_types_1.TypedEventEmitter {
  40. constructor(server, options) {
  41. var _a, _b, _c;
  42. super();
  43. this[kServer] = server;
  44. this[kConnection] = undefined;
  45. this[kCancellationToken] = new mongo_types_1.CancellationToken();
  46. this[kCancellationToken].setMaxListeners(Infinity);
  47. this[kMonitorId] = undefined;
  48. this.s = {
  49. state: common_1.STATE_CLOSED
  50. };
  51. this.address = server.description.address;
  52. this.options = Object.freeze({
  53. connectTimeoutMS: (_a = options.connectTimeoutMS) !== null && _a !== void 0 ? _a : 10000,
  54. heartbeatFrequencyMS: (_b = options.heartbeatFrequencyMS) !== null && _b !== void 0 ? _b : 10000,
  55. minHeartbeatFrequencyMS: (_c = options.minHeartbeatFrequencyMS) !== null && _c !== void 0 ? _c : 500
  56. });
  57. const cancellationToken = this[kCancellationToken];
  58. // TODO: refactor this to pull it directly from the pool, requires new ConnectionPool integration
  59. const connectOptions = Object.assign({
  60. id: '<monitor>',
  61. generation: server.s.pool.generation,
  62. connectionType: connection_1.Connection,
  63. cancellationToken,
  64. hostAddress: server.description.hostAddress
  65. }, options,
  66. // force BSON serialization options
  67. {
  68. raw: false,
  69. promoteLongs: true,
  70. promoteValues: true,
  71. promoteBuffers: true
  72. });
  73. // ensure no authentication is used for monitoring
  74. delete connectOptions.credentials;
  75. if (connectOptions.autoEncrypter) {
  76. delete connectOptions.autoEncrypter;
  77. }
  78. this.connectOptions = Object.freeze(connectOptions);
  79. }
  80. connect() {
  81. if (this.s.state !== common_1.STATE_CLOSED) {
  82. return;
  83. }
  84. // start
  85. const heartbeatFrequencyMS = this.options.heartbeatFrequencyMS;
  86. const minHeartbeatFrequencyMS = this.options.minHeartbeatFrequencyMS;
  87. this[kMonitorId] = (0, utils_1.makeInterruptibleAsyncInterval)(monitorServer(this), {
  88. interval: heartbeatFrequencyMS,
  89. minInterval: minHeartbeatFrequencyMS,
  90. immediate: true
  91. });
  92. }
  93. requestCheck() {
  94. var _a;
  95. if (INVALID_REQUEST_CHECK_STATES.has(this.s.state)) {
  96. return;
  97. }
  98. (_a = this[kMonitorId]) === null || _a === void 0 ? void 0 : _a.wake();
  99. }
  100. reset() {
  101. const topologyVersion = this[kServer].description.topologyVersion;
  102. if (isInCloseState(this) || topologyVersion == null) {
  103. return;
  104. }
  105. stateTransition(this, common_1.STATE_CLOSING);
  106. resetMonitorState(this);
  107. // restart monitor
  108. stateTransition(this, STATE_IDLE);
  109. // restart monitoring
  110. const heartbeatFrequencyMS = this.options.heartbeatFrequencyMS;
  111. const minHeartbeatFrequencyMS = this.options.minHeartbeatFrequencyMS;
  112. this[kMonitorId] = (0, utils_1.makeInterruptibleAsyncInterval)(monitorServer(this), {
  113. interval: heartbeatFrequencyMS,
  114. minInterval: minHeartbeatFrequencyMS
  115. });
  116. }
  117. close() {
  118. if (isInCloseState(this)) {
  119. return;
  120. }
  121. stateTransition(this, common_1.STATE_CLOSING);
  122. resetMonitorState(this);
  123. // close monitor
  124. this.emit('close');
  125. stateTransition(this, common_1.STATE_CLOSED);
  126. }
  127. }
  128. exports.Monitor = Monitor;
  129. function resetMonitorState(monitor) {
  130. var _a, _b, _c;
  131. (_a = monitor[kMonitorId]) === null || _a === void 0 ? void 0 : _a.stop();
  132. monitor[kMonitorId] = undefined;
  133. (_b = monitor[kRTTPinger]) === null || _b === void 0 ? void 0 : _b.close();
  134. monitor[kRTTPinger] = undefined;
  135. monitor[kCancellationToken].emit('cancel');
  136. (_c = monitor[kConnection]) === null || _c === void 0 ? void 0 : _c.destroy({ force: true });
  137. monitor[kConnection] = undefined;
  138. }
  139. function checkServer(monitor, callback) {
  140. let start = (0, utils_1.now)();
  141. monitor.emit(server_1.Server.SERVER_HEARTBEAT_STARTED, new events_1.ServerHeartbeatStartedEvent(monitor.address));
  142. function failureHandler(err) {
  143. var _a;
  144. (_a = monitor[kConnection]) === null || _a === void 0 ? void 0 : _a.destroy({ force: true });
  145. monitor[kConnection] = undefined;
  146. monitor.emit(server_1.Server.SERVER_HEARTBEAT_FAILED, new events_1.ServerHeartbeatFailedEvent(monitor.address, (0, utils_1.calculateDurationInMs)(start), err));
  147. monitor.emit('resetServer', err);
  148. monitor.emit('resetConnectionPool');
  149. callback(err);
  150. }
  151. const connection = monitor[kConnection];
  152. if (connection && !connection.closed) {
  153. const { serverApi, helloOk } = connection;
  154. const connectTimeoutMS = monitor.options.connectTimeoutMS;
  155. const maxAwaitTimeMS = monitor.options.heartbeatFrequencyMS;
  156. const topologyVersion = monitor[kServer].description.topologyVersion;
  157. const isAwaitable = topologyVersion != null;
  158. const cmd = {
  159. [(serverApi === null || serverApi === void 0 ? void 0 : serverApi.version) || helloOk ? 'hello' : constants_1.LEGACY_HELLO_COMMAND]: true,
  160. ...(isAwaitable && topologyVersion
  161. ? { maxAwaitTimeMS, topologyVersion: makeTopologyVersion(topologyVersion) }
  162. : {})
  163. };
  164. const options = isAwaitable
  165. ? {
  166. socketTimeoutMS: connectTimeoutMS ? connectTimeoutMS + maxAwaitTimeMS : 0,
  167. exhaustAllowed: true
  168. }
  169. : { socketTimeoutMS: connectTimeoutMS };
  170. if (isAwaitable && monitor[kRTTPinger] == null) {
  171. monitor[kRTTPinger] = new RTTPinger(monitor[kCancellationToken], Object.assign({ heartbeatFrequencyMS: monitor.options.heartbeatFrequencyMS }, monitor.connectOptions));
  172. }
  173. connection.command((0, utils_1.ns)('admin.$cmd'), cmd, options, (err, hello) => {
  174. var _a;
  175. if (err) {
  176. failureHandler(err);
  177. return;
  178. }
  179. if (!('isWritablePrimary' in hello)) {
  180. // Provide hello-style response document.
  181. hello.isWritablePrimary = hello[constants_1.LEGACY_HELLO_COMMAND];
  182. }
  183. const rttPinger = monitor[kRTTPinger];
  184. const duration = isAwaitable && rttPinger ? rttPinger.roundTripTime : (0, utils_1.calculateDurationInMs)(start);
  185. monitor.emit(server_1.Server.SERVER_HEARTBEAT_SUCCEEDED, new events_1.ServerHeartbeatSucceededEvent(monitor.address, duration, hello));
  186. // if we are using the streaming protocol then we immediately issue another `started`
  187. // event, otherwise the "check" is complete and return to the main monitor loop
  188. if (isAwaitable && hello.topologyVersion) {
  189. monitor.emit(server_1.Server.SERVER_HEARTBEAT_STARTED, new events_1.ServerHeartbeatStartedEvent(monitor.address));
  190. start = (0, utils_1.now)();
  191. }
  192. else {
  193. (_a = monitor[kRTTPinger]) === null || _a === void 0 ? void 0 : _a.close();
  194. monitor[kRTTPinger] = undefined;
  195. callback(undefined, hello);
  196. }
  197. });
  198. return;
  199. }
  200. // connecting does an implicit `hello`
  201. (0, connect_1.connect)(monitor.connectOptions, (err, conn) => {
  202. if (err) {
  203. monitor[kConnection] = undefined;
  204. // we already reset the connection pool on network errors in all cases
  205. if (!(err instanceof error_1.MongoNetworkError)) {
  206. monitor.emit('resetConnectionPool');
  207. }
  208. failureHandler(err);
  209. return;
  210. }
  211. if (conn) {
  212. if (isInCloseState(monitor)) {
  213. conn.destroy({ force: true });
  214. return;
  215. }
  216. monitor[kConnection] = conn;
  217. monitor.emit(server_1.Server.SERVER_HEARTBEAT_SUCCEEDED, new events_1.ServerHeartbeatSucceededEvent(monitor.address, (0, utils_1.calculateDurationInMs)(start), conn.hello));
  218. callback(undefined, conn.hello);
  219. }
  220. });
  221. }
  222. function monitorServer(monitor) {
  223. return (callback) => {
  224. stateTransition(monitor, STATE_MONITORING);
  225. function done() {
  226. if (!isInCloseState(monitor)) {
  227. stateTransition(monitor, STATE_IDLE);
  228. }
  229. callback();
  230. }
  231. checkServer(monitor, (err, hello) => {
  232. if (err) {
  233. // otherwise an error occurred on initial discovery, also bail
  234. if (monitor[kServer].description.type === common_1.ServerType.Unknown) {
  235. monitor.emit('resetServer', err);
  236. return done();
  237. }
  238. }
  239. // if the check indicates streaming is supported, immediately reschedule monitoring
  240. if (hello && hello.topologyVersion) {
  241. setTimeout(() => {
  242. var _a;
  243. if (!isInCloseState(monitor)) {
  244. (_a = monitor[kMonitorId]) === null || _a === void 0 ? void 0 : _a.wake();
  245. }
  246. }, 0);
  247. }
  248. done();
  249. });
  250. };
  251. }
  252. function makeTopologyVersion(tv) {
  253. return {
  254. processId: tv.processId,
  255. // tests mock counter as just number, but in a real situation counter should always be a Long
  256. counter: bson_1.Long.isLong(tv.counter) ? tv.counter : bson_1.Long.fromNumber(tv.counter)
  257. };
  258. }
  259. /** @internal */
  260. class RTTPinger {
  261. constructor(cancellationToken, options) {
  262. this[kConnection] = undefined;
  263. this[kCancellationToken] = cancellationToken;
  264. this[kRoundTripTime] = 0;
  265. this.closed = false;
  266. const heartbeatFrequencyMS = options.heartbeatFrequencyMS;
  267. this[kMonitorId] = setTimeout(() => measureRoundTripTime(this, options), heartbeatFrequencyMS);
  268. }
  269. get roundTripTime() {
  270. return this[kRoundTripTime];
  271. }
  272. close() {
  273. var _a;
  274. this.closed = true;
  275. clearTimeout(this[kMonitorId]);
  276. (_a = this[kConnection]) === null || _a === void 0 ? void 0 : _a.destroy({ force: true });
  277. this[kConnection] = undefined;
  278. }
  279. }
  280. exports.RTTPinger = RTTPinger;
  281. function measureRoundTripTime(rttPinger, options) {
  282. const start = (0, utils_1.now)();
  283. options.cancellationToken = rttPinger[kCancellationToken];
  284. const heartbeatFrequencyMS = options.heartbeatFrequencyMS;
  285. if (rttPinger.closed) {
  286. return;
  287. }
  288. function measureAndReschedule(conn) {
  289. if (rttPinger.closed) {
  290. conn === null || conn === void 0 ? void 0 : conn.destroy({ force: true });
  291. return;
  292. }
  293. if (rttPinger[kConnection] == null) {
  294. rttPinger[kConnection] = conn;
  295. }
  296. rttPinger[kRoundTripTime] = (0, utils_1.calculateDurationInMs)(start);
  297. rttPinger[kMonitorId] = setTimeout(() => measureRoundTripTime(rttPinger, options), heartbeatFrequencyMS);
  298. }
  299. const connection = rttPinger[kConnection];
  300. if (connection == null) {
  301. (0, connect_1.connect)(options, (err, conn) => {
  302. if (err) {
  303. rttPinger[kConnection] = undefined;
  304. rttPinger[kRoundTripTime] = 0;
  305. return;
  306. }
  307. measureAndReschedule(conn);
  308. });
  309. return;
  310. }
  311. connection.command((0, utils_1.ns)('admin.$cmd'), { [constants_1.LEGACY_HELLO_COMMAND]: 1 }, undefined, err => {
  312. if (err) {
  313. rttPinger[kConnection] = undefined;
  314. rttPinger[kRoundTripTime] = 0;
  315. return;
  316. }
  317. measureAndReschedule();
  318. });
  319. }
  320. //# sourceMappingURL=monitor.js.map