server_selection.js 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.readPreferenceServerSelector = exports.secondaryWritableServerSelector = exports.sameServerSelector = exports.writableServerSelector = exports.MIN_SECONDARY_WRITE_WIRE_VERSION = void 0;
  4. const error_1 = require("../error");
  5. const read_preference_1 = require("../read_preference");
  6. const common_1 = require("./common");
  7. // max staleness constants
  8. const IDLE_WRITE_PERIOD = 10000;
  9. const SMALLEST_MAX_STALENESS_SECONDS = 90;
  10. // Minimum version to try writes on secondaries.
  11. exports.MIN_SECONDARY_WRITE_WIRE_VERSION = 13;
  12. /**
  13. * Returns a server selector that selects for writable servers
  14. */
  15. function writableServerSelector() {
  16. return (topologyDescription, servers) => latencyWindowReducer(topologyDescription, servers.filter((s) => s.isWritable));
  17. }
  18. exports.writableServerSelector = writableServerSelector;
  19. /**
  20. * The purpose of this selector is to select the same server, only
  21. * if it is in a state that it can have commands sent to it.
  22. */
  23. function sameServerSelector(description) {
  24. return (topologyDescription, servers) => {
  25. if (!description)
  26. return [];
  27. // Filter the servers to match the provided description only if
  28. // the type is not unknown.
  29. return servers.filter(sd => {
  30. return sd.address === description.address && sd.type !== common_1.ServerType.Unknown;
  31. });
  32. };
  33. }
  34. exports.sameServerSelector = sameServerSelector;
  35. /**
  36. * Returns a server selector that uses a read preference to select a
  37. * server potentially for a write on a secondary.
  38. */
  39. function secondaryWritableServerSelector(wireVersion, readPreference) {
  40. // If server version < 5.0, read preference always primary.
  41. // If server version >= 5.0...
  42. // - If read preference is supplied, use that.
  43. // - If no read preference is supplied, use primary.
  44. if (!readPreference ||
  45. !wireVersion ||
  46. (wireVersion && wireVersion < exports.MIN_SECONDARY_WRITE_WIRE_VERSION)) {
  47. return readPreferenceServerSelector(read_preference_1.ReadPreference.primary);
  48. }
  49. return readPreferenceServerSelector(readPreference);
  50. }
  51. exports.secondaryWritableServerSelector = secondaryWritableServerSelector;
  52. /**
  53. * Reduces the passed in array of servers by the rules of the "Max Staleness" specification
  54. * found here: https://github.com/mongodb/specifications/blob/master/source/max-staleness/max-staleness.rst
  55. *
  56. * @param readPreference - The read preference providing max staleness guidance
  57. * @param topologyDescription - The topology description
  58. * @param servers - The list of server descriptions to be reduced
  59. * @returns The list of servers that satisfy the requirements of max staleness
  60. */
  61. function maxStalenessReducer(readPreference, topologyDescription, servers) {
  62. if (readPreference.maxStalenessSeconds == null || readPreference.maxStalenessSeconds < 0) {
  63. return servers;
  64. }
  65. const maxStaleness = readPreference.maxStalenessSeconds;
  66. const maxStalenessVariance = (topologyDescription.heartbeatFrequencyMS + IDLE_WRITE_PERIOD) / 1000;
  67. if (maxStaleness < maxStalenessVariance) {
  68. throw new error_1.MongoInvalidArgumentError(`Option "maxStalenessSeconds" must be at least ${maxStalenessVariance} seconds`);
  69. }
  70. if (maxStaleness < SMALLEST_MAX_STALENESS_SECONDS) {
  71. throw new error_1.MongoInvalidArgumentError(`Option "maxStalenessSeconds" must be at least ${SMALLEST_MAX_STALENESS_SECONDS} seconds`);
  72. }
  73. if (topologyDescription.type === common_1.TopologyType.ReplicaSetWithPrimary) {
  74. const primary = Array.from(topologyDescription.servers.values()).filter(primaryFilter)[0];
  75. return servers.reduce((result, server) => {
  76. var _a;
  77. const stalenessMS = server.lastUpdateTime -
  78. server.lastWriteDate -
  79. (primary.lastUpdateTime - primary.lastWriteDate) +
  80. topologyDescription.heartbeatFrequencyMS;
  81. const staleness = stalenessMS / 1000;
  82. const maxStalenessSeconds = (_a = readPreference.maxStalenessSeconds) !== null && _a !== void 0 ? _a : 0;
  83. if (staleness <= maxStalenessSeconds) {
  84. result.push(server);
  85. }
  86. return result;
  87. }, []);
  88. }
  89. if (topologyDescription.type === common_1.TopologyType.ReplicaSetNoPrimary) {
  90. if (servers.length === 0) {
  91. return servers;
  92. }
  93. const sMax = servers.reduce((max, s) => s.lastWriteDate > max.lastWriteDate ? s : max);
  94. return servers.reduce((result, server) => {
  95. var _a;
  96. const stalenessMS = sMax.lastWriteDate - server.lastWriteDate + topologyDescription.heartbeatFrequencyMS;
  97. const staleness = stalenessMS / 1000;
  98. const maxStalenessSeconds = (_a = readPreference.maxStalenessSeconds) !== null && _a !== void 0 ? _a : 0;
  99. if (staleness <= maxStalenessSeconds) {
  100. result.push(server);
  101. }
  102. return result;
  103. }, []);
  104. }
  105. return servers;
  106. }
  107. /**
  108. * Determines whether a server's tags match a given set of tags
  109. *
  110. * @param tagSet - The requested tag set to match
  111. * @param serverTags - The server's tags
  112. */
  113. function tagSetMatch(tagSet, serverTags) {
  114. const keys = Object.keys(tagSet);
  115. const serverTagKeys = Object.keys(serverTags);
  116. for (let i = 0; i < keys.length; ++i) {
  117. const key = keys[i];
  118. if (serverTagKeys.indexOf(key) === -1 || serverTags[key] !== tagSet[key]) {
  119. return false;
  120. }
  121. }
  122. return true;
  123. }
  124. /**
  125. * Reduces a set of server descriptions based on tags requested by the read preference
  126. *
  127. * @param readPreference - The read preference providing the requested tags
  128. * @param servers - The list of server descriptions to reduce
  129. * @returns The list of servers matching the requested tags
  130. */
  131. function tagSetReducer(readPreference, servers) {
  132. if (readPreference.tags == null ||
  133. (Array.isArray(readPreference.tags) && readPreference.tags.length === 0)) {
  134. return servers;
  135. }
  136. for (let i = 0; i < readPreference.tags.length; ++i) {
  137. const tagSet = readPreference.tags[i];
  138. const serversMatchingTagset = servers.reduce((matched, server) => {
  139. if (tagSetMatch(tagSet, server.tags))
  140. matched.push(server);
  141. return matched;
  142. }, []);
  143. if (serversMatchingTagset.length) {
  144. return serversMatchingTagset;
  145. }
  146. }
  147. return [];
  148. }
  149. /**
  150. * Reduces a list of servers to ensure they fall within an acceptable latency window. This is
  151. * further specified in the "Server Selection" specification, found here:
  152. * https://github.com/mongodb/specifications/blob/master/source/server-selection/server-selection.rst
  153. *
  154. * @param topologyDescription - The topology description
  155. * @param servers - The list of servers to reduce
  156. * @returns The servers which fall within an acceptable latency window
  157. */
  158. function latencyWindowReducer(topologyDescription, servers) {
  159. const low = servers.reduce((min, server) => min === -1 ? server.roundTripTime : Math.min(server.roundTripTime, min), -1);
  160. const high = low + topologyDescription.localThresholdMS;
  161. return servers.reduce((result, server) => {
  162. if (server.roundTripTime <= high && server.roundTripTime >= low)
  163. result.push(server);
  164. return result;
  165. }, []);
  166. }
  167. // filters
  168. function primaryFilter(server) {
  169. return server.type === common_1.ServerType.RSPrimary;
  170. }
  171. function secondaryFilter(server) {
  172. return server.type === common_1.ServerType.RSSecondary;
  173. }
  174. function nearestFilter(server) {
  175. return server.type === common_1.ServerType.RSSecondary || server.type === common_1.ServerType.RSPrimary;
  176. }
  177. function knownFilter(server) {
  178. return server.type !== common_1.ServerType.Unknown;
  179. }
  180. function loadBalancerFilter(server) {
  181. return server.type === common_1.ServerType.LoadBalancer;
  182. }
  183. /**
  184. * Returns a function which selects servers based on a provided read preference
  185. *
  186. * @param readPreference - The read preference to select with
  187. */
  188. function readPreferenceServerSelector(readPreference) {
  189. if (!readPreference.isValid()) {
  190. throw new error_1.MongoInvalidArgumentError('Invalid read preference specified');
  191. }
  192. return (topologyDescription, servers) => {
  193. const commonWireVersion = topologyDescription.commonWireVersion;
  194. if (commonWireVersion &&
  195. readPreference.minWireVersion &&
  196. readPreference.minWireVersion > commonWireVersion) {
  197. throw new error_1.MongoCompatibilityError(`Minimum wire version '${readPreference.minWireVersion}' required, but found '${commonWireVersion}'`);
  198. }
  199. if (topologyDescription.type === common_1.TopologyType.LoadBalanced) {
  200. return servers.filter(loadBalancerFilter);
  201. }
  202. if (topologyDescription.type === common_1.TopologyType.Unknown) {
  203. return [];
  204. }
  205. if (topologyDescription.type === common_1.TopologyType.Single ||
  206. topologyDescription.type === common_1.TopologyType.Sharded) {
  207. return latencyWindowReducer(topologyDescription, servers.filter(knownFilter));
  208. }
  209. const mode = readPreference.mode;
  210. if (mode === read_preference_1.ReadPreference.PRIMARY) {
  211. return servers.filter(primaryFilter);
  212. }
  213. if (mode === read_preference_1.ReadPreference.PRIMARY_PREFERRED) {
  214. const result = servers.filter(primaryFilter);
  215. if (result.length) {
  216. return result;
  217. }
  218. }
  219. const filter = mode === read_preference_1.ReadPreference.NEAREST ? nearestFilter : secondaryFilter;
  220. const selectedServers = latencyWindowReducer(topologyDescription, tagSetReducer(readPreference, maxStalenessReducer(readPreference, topologyDescription, servers.filter(filter))));
  221. if (mode === read_preference_1.ReadPreference.SECONDARY_PREFERRED && selectedServers.length === 0) {
  222. return servers.filter(primaryFilter);
  223. }
  224. return selectedServers;
  225. };
  226. }
  227. exports.readPreferenceServerSelector = readPreferenceServerSelector;
  228. //# sourceMappingURL=server_selection.js.map