123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228 |
- "use strict";
- Object.defineProperty(exports, "__esModule", { value: true });
- exports.readPreferenceServerSelector = exports.secondaryWritableServerSelector = exports.sameServerSelector = exports.writableServerSelector = exports.MIN_SECONDARY_WRITE_WIRE_VERSION = void 0;
- const error_1 = require("../error");
- const read_preference_1 = require("../read_preference");
- const common_1 = require("./common");
- // max staleness constants
- const IDLE_WRITE_PERIOD = 10000;
- const SMALLEST_MAX_STALENESS_SECONDS = 90;
- // Minimum version to try writes on secondaries.
- exports.MIN_SECONDARY_WRITE_WIRE_VERSION = 13;
- /**
- * Returns a server selector that selects for writable servers
- */
- function writableServerSelector() {
- return (topologyDescription, servers) => latencyWindowReducer(topologyDescription, servers.filter((s) => s.isWritable));
- }
- exports.writableServerSelector = writableServerSelector;
- /**
- * The purpose of this selector is to select the same server, only
- * if it is in a state that it can have commands sent to it.
- */
- function sameServerSelector(description) {
- return (topologyDescription, servers) => {
- if (!description)
- return [];
- // Filter the servers to match the provided description only if
- // the type is not unknown.
- return servers.filter(sd => {
- return sd.address === description.address && sd.type !== common_1.ServerType.Unknown;
- });
- };
- }
- exports.sameServerSelector = sameServerSelector;
- /**
- * Returns a server selector that uses a read preference to select a
- * server potentially for a write on a secondary.
- */
- function secondaryWritableServerSelector(wireVersion, readPreference) {
- // If server version < 5.0, read preference always primary.
- // If server version >= 5.0...
- // - If read preference is supplied, use that.
- // - If no read preference is supplied, use primary.
- if (!readPreference ||
- !wireVersion ||
- (wireVersion && wireVersion < exports.MIN_SECONDARY_WRITE_WIRE_VERSION)) {
- return readPreferenceServerSelector(read_preference_1.ReadPreference.primary);
- }
- return readPreferenceServerSelector(readPreference);
- }
- exports.secondaryWritableServerSelector = secondaryWritableServerSelector;
- /**
- * Reduces the passed in array of servers by the rules of the "Max Staleness" specification
- * found here: https://github.com/mongodb/specifications/blob/master/source/max-staleness/max-staleness.rst
- *
- * @param readPreference - The read preference providing max staleness guidance
- * @param topologyDescription - The topology description
- * @param servers - The list of server descriptions to be reduced
- * @returns The list of servers that satisfy the requirements of max staleness
- */
- function maxStalenessReducer(readPreference, topologyDescription, servers) {
- if (readPreference.maxStalenessSeconds == null || readPreference.maxStalenessSeconds < 0) {
- return servers;
- }
- const maxStaleness = readPreference.maxStalenessSeconds;
- const maxStalenessVariance = (topologyDescription.heartbeatFrequencyMS + IDLE_WRITE_PERIOD) / 1000;
- if (maxStaleness < maxStalenessVariance) {
- throw new error_1.MongoInvalidArgumentError(`Option "maxStalenessSeconds" must be at least ${maxStalenessVariance} seconds`);
- }
- if (maxStaleness < SMALLEST_MAX_STALENESS_SECONDS) {
- throw new error_1.MongoInvalidArgumentError(`Option "maxStalenessSeconds" must be at least ${SMALLEST_MAX_STALENESS_SECONDS} seconds`);
- }
- if (topologyDescription.type === common_1.TopologyType.ReplicaSetWithPrimary) {
- const primary = Array.from(topologyDescription.servers.values()).filter(primaryFilter)[0];
- return servers.reduce((result, server) => {
- var _a;
- const stalenessMS = server.lastUpdateTime -
- server.lastWriteDate -
- (primary.lastUpdateTime - primary.lastWriteDate) +
- topologyDescription.heartbeatFrequencyMS;
- const staleness = stalenessMS / 1000;
- const maxStalenessSeconds = (_a = readPreference.maxStalenessSeconds) !== null && _a !== void 0 ? _a : 0;
- if (staleness <= maxStalenessSeconds) {
- result.push(server);
- }
- return result;
- }, []);
- }
- if (topologyDescription.type === common_1.TopologyType.ReplicaSetNoPrimary) {
- if (servers.length === 0) {
- return servers;
- }
- const sMax = servers.reduce((max, s) => s.lastWriteDate > max.lastWriteDate ? s : max);
- return servers.reduce((result, server) => {
- var _a;
- const stalenessMS = sMax.lastWriteDate - server.lastWriteDate + topologyDescription.heartbeatFrequencyMS;
- const staleness = stalenessMS / 1000;
- const maxStalenessSeconds = (_a = readPreference.maxStalenessSeconds) !== null && _a !== void 0 ? _a : 0;
- if (staleness <= maxStalenessSeconds) {
- result.push(server);
- }
- return result;
- }, []);
- }
- return servers;
- }
- /**
- * Determines whether a server's tags match a given set of tags
- *
- * @param tagSet - The requested tag set to match
- * @param serverTags - The server's tags
- */
- function tagSetMatch(tagSet, serverTags) {
- const keys = Object.keys(tagSet);
- const serverTagKeys = Object.keys(serverTags);
- for (let i = 0; i < keys.length; ++i) {
- const key = keys[i];
- if (serverTagKeys.indexOf(key) === -1 || serverTags[key] !== tagSet[key]) {
- return false;
- }
- }
- return true;
- }
- /**
- * Reduces a set of server descriptions based on tags requested by the read preference
- *
- * @param readPreference - The read preference providing the requested tags
- * @param servers - The list of server descriptions to reduce
- * @returns The list of servers matching the requested tags
- */
- function tagSetReducer(readPreference, servers) {
- if (readPreference.tags == null ||
- (Array.isArray(readPreference.tags) && readPreference.tags.length === 0)) {
- return servers;
- }
- for (let i = 0; i < readPreference.tags.length; ++i) {
- const tagSet = readPreference.tags[i];
- const serversMatchingTagset = servers.reduce((matched, server) => {
- if (tagSetMatch(tagSet, server.tags))
- matched.push(server);
- return matched;
- }, []);
- if (serversMatchingTagset.length) {
- return serversMatchingTagset;
- }
- }
- return [];
- }
- /**
- * Reduces a list of servers to ensure they fall within an acceptable latency window. This is
- * further specified in the "Server Selection" specification, found here:
- * https://github.com/mongodb/specifications/blob/master/source/server-selection/server-selection.rst
- *
- * @param topologyDescription - The topology description
- * @param servers - The list of servers to reduce
- * @returns The servers which fall within an acceptable latency window
- */
- function latencyWindowReducer(topologyDescription, servers) {
- const low = servers.reduce((min, server) => min === -1 ? server.roundTripTime : Math.min(server.roundTripTime, min), -1);
- const high = low + topologyDescription.localThresholdMS;
- return servers.reduce((result, server) => {
- if (server.roundTripTime <= high && server.roundTripTime >= low)
- result.push(server);
- return result;
- }, []);
- }
- // filters
- function primaryFilter(server) {
- return server.type === common_1.ServerType.RSPrimary;
- }
- function secondaryFilter(server) {
- return server.type === common_1.ServerType.RSSecondary;
- }
- function nearestFilter(server) {
- return server.type === common_1.ServerType.RSSecondary || server.type === common_1.ServerType.RSPrimary;
- }
- function knownFilter(server) {
- return server.type !== common_1.ServerType.Unknown;
- }
- function loadBalancerFilter(server) {
- return server.type === common_1.ServerType.LoadBalancer;
- }
- /**
- * Returns a function which selects servers based on a provided read preference
- *
- * @param readPreference - The read preference to select with
- */
- function readPreferenceServerSelector(readPreference) {
- if (!readPreference.isValid()) {
- throw new error_1.MongoInvalidArgumentError('Invalid read preference specified');
- }
- return (topologyDescription, servers) => {
- const commonWireVersion = topologyDescription.commonWireVersion;
- if (commonWireVersion &&
- readPreference.minWireVersion &&
- readPreference.minWireVersion > commonWireVersion) {
- throw new error_1.MongoCompatibilityError(`Minimum wire version '${readPreference.minWireVersion}' required, but found '${commonWireVersion}'`);
- }
- if (topologyDescription.type === common_1.TopologyType.LoadBalanced) {
- return servers.filter(loadBalancerFilter);
- }
- if (topologyDescription.type === common_1.TopologyType.Unknown) {
- return [];
- }
- if (topologyDescription.type === common_1.TopologyType.Single ||
- topologyDescription.type === common_1.TopologyType.Sharded) {
- return latencyWindowReducer(topologyDescription, servers.filter(knownFilter));
- }
- const mode = readPreference.mode;
- if (mode === read_preference_1.ReadPreference.PRIMARY) {
- return servers.filter(primaryFilter);
- }
- if (mode === read_preference_1.ReadPreference.PRIMARY_PREFERRED) {
- const result = servers.filter(primaryFilter);
- if (result.length) {
- return result;
- }
- }
- const filter = mode === read_preference_1.ReadPreference.NEAREST ? nearestFilter : secondaryFilter;
- const selectedServers = latencyWindowReducer(topologyDescription, tagSetReducer(readPreference, maxStalenessReducer(readPreference, topologyDescription, servers.filter(filter))));
- if (mode === read_preference_1.ReadPreference.SECONDARY_PREFERRED && selectedServers.length === 0) {
- return servers.filter(primaryFilter);
- }
- return selectedServers;
- };
- }
- exports.readPreferenceServerSelector = readPreferenceServerSelector;
- //# sourceMappingURL=server_selection.js.map
|