123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417 |
- 'use strict';
- const EventEmitter = require('events'),
- MongoError = require('../core').MongoError,
- f = require('util').format,
- translateReadPreference = require('../utils').translateReadPreference,
- ClientSession = require('../core').Sessions.ClientSession;
- // The store of ops
- var Store = function(topology, storeOptions) {
- var self = this;
- var storedOps = [];
- storeOptions = storeOptions || { force: false, bufferMaxEntries: -1 };
- // Internal state
- this.s = {
- storedOps: storedOps,
- storeOptions: storeOptions,
- topology: topology
- };
- Object.defineProperty(this, 'length', {
- enumerable: true,
- get: function() {
- return self.s.storedOps.length;
- }
- });
- };
- Store.prototype.add = function(opType, ns, ops, options, callback) {
- if (this.s.storeOptions.force) {
- return callback(MongoError.create({ message: 'db closed by application', driver: true }));
- }
- if (this.s.storeOptions.bufferMaxEntries === 0) {
- return callback(
- MongoError.create({
- message: f(
- 'no connection available for operation and number of stored operation > %s',
- this.s.storeOptions.bufferMaxEntries
- ),
- driver: true
- })
- );
- }
- if (
- this.s.storeOptions.bufferMaxEntries > 0 &&
- this.s.storedOps.length > this.s.storeOptions.bufferMaxEntries
- ) {
- while (this.s.storedOps.length > 0) {
- var op = this.s.storedOps.shift();
- op.c(
- MongoError.create({
- message: f(
- 'no connection available for operation and number of stored operation > %s',
- this.s.storeOptions.bufferMaxEntries
- ),
- driver: true
- })
- );
- }
- return;
- }
- this.s.storedOps.push({ t: opType, n: ns, o: ops, op: options, c: callback });
- };
- Store.prototype.addObjectAndMethod = function(opType, object, method, params, callback) {
- if (this.s.storeOptions.force) {
- return callback(MongoError.create({ message: 'db closed by application', driver: true }));
- }
- if (this.s.storeOptions.bufferMaxEntries === 0) {
- return callback(
- MongoError.create({
- message: f(
- 'no connection available for operation and number of stored operation > %s',
- this.s.storeOptions.bufferMaxEntries
- ),
- driver: true
- })
- );
- }
- if (
- this.s.storeOptions.bufferMaxEntries > 0 &&
- this.s.storedOps.length > this.s.storeOptions.bufferMaxEntries
- ) {
- while (this.s.storedOps.length > 0) {
- var op = this.s.storedOps.shift();
- op.c(
- MongoError.create({
- message: f(
- 'no connection available for operation and number of stored operation > %s',
- this.s.storeOptions.bufferMaxEntries
- ),
- driver: true
- })
- );
- }
- return;
- }
- this.s.storedOps.push({ t: opType, m: method, o: object, p: params, c: callback });
- };
- Store.prototype.flush = function(err) {
- while (this.s.storedOps.length > 0) {
- this.s.storedOps
- .shift()
- .c(
- err ||
- MongoError.create({ message: f('no connection available for operation'), driver: true })
- );
- }
- };
- var primaryOptions = ['primary', 'primaryPreferred', 'nearest', 'secondaryPreferred'];
- var secondaryOptions = ['secondary', 'secondaryPreferred'];
- Store.prototype.execute = function(options) {
- options = options || {};
- // Get current ops
- var ops = this.s.storedOps;
- // Reset the ops
- this.s.storedOps = [];
- // Unpack options
- var executePrimary = typeof options.executePrimary === 'boolean' ? options.executePrimary : true;
- var executeSecondary =
- typeof options.executeSecondary === 'boolean' ? options.executeSecondary : true;
- // Execute all the stored ops
- while (ops.length > 0) {
- var op = ops.shift();
- if (op.t === 'cursor') {
- if (executePrimary && executeSecondary) {
- op.o[op.m].apply(op.o, op.p);
- } else if (
- executePrimary &&
- op.o.options &&
- op.o.options.readPreference &&
- primaryOptions.indexOf(op.o.options.readPreference.mode) !== -1
- ) {
- op.o[op.m].apply(op.o, op.p);
- } else if (
- !executePrimary &&
- executeSecondary &&
- op.o.options &&
- op.o.options.readPreference &&
- secondaryOptions.indexOf(op.o.options.readPreference.mode) !== -1
- ) {
- op.o[op.m].apply(op.o, op.p);
- }
- } else if (op.t === 'auth') {
- this.s.topology[op.t].apply(this.s.topology, op.o);
- } else {
- if (executePrimary && executeSecondary) {
- this.s.topology[op.t](op.n, op.o, op.op, op.c);
- } else if (
- executePrimary &&
- op.op &&
- op.op.readPreference &&
- primaryOptions.indexOf(op.op.readPreference.mode) !== -1
- ) {
- this.s.topology[op.t](op.n, op.o, op.op, op.c);
- } else if (
- !executePrimary &&
- executeSecondary &&
- op.op &&
- op.op.readPreference &&
- secondaryOptions.indexOf(op.op.readPreference.mode) !== -1
- ) {
- this.s.topology[op.t](op.n, op.o, op.op, op.c);
- }
- }
- }
- };
- Store.prototype.all = function() {
- return this.s.storedOps;
- };
- // Server capabilities
- var ServerCapabilities = function(ismaster) {
- var setup_get_property = function(object, name, value) {
- Object.defineProperty(object, name, {
- enumerable: true,
- get: function() {
- return value;
- }
- });
- };
- // Capabilities
- var aggregationCursor = false;
- var writeCommands = false;
- var textSearch = false;
- var authCommands = false;
- var listCollections = false;
- var listIndexes = false;
- var maxNumberOfDocsInBatch = ismaster.maxWriteBatchSize || 1000;
- var commandsTakeWriteConcern = false;
- var commandsTakeCollation = false;
- if (ismaster.minWireVersion >= 0) {
- textSearch = true;
- }
- if (ismaster.maxWireVersion >= 1) {
- aggregationCursor = true;
- authCommands = true;
- }
- if (ismaster.maxWireVersion >= 2) {
- writeCommands = true;
- }
- if (ismaster.maxWireVersion >= 3) {
- listCollections = true;
- listIndexes = true;
- }
- if (ismaster.maxWireVersion >= 5) {
- commandsTakeWriteConcern = true;
- commandsTakeCollation = true;
- }
- // If no min or max wire version set to 0
- if (ismaster.minWireVersion == null) {
- ismaster.minWireVersion = 0;
- }
- if (ismaster.maxWireVersion == null) {
- ismaster.maxWireVersion = 0;
- }
- // Map up read only parameters
- setup_get_property(this, 'hasAggregationCursor', aggregationCursor);
- setup_get_property(this, 'hasWriteCommands', writeCommands);
- setup_get_property(this, 'hasTextSearch', textSearch);
- setup_get_property(this, 'hasAuthCommands', authCommands);
- setup_get_property(this, 'hasListCollectionsCommand', listCollections);
- setup_get_property(this, 'hasListIndexesCommand', listIndexes);
- setup_get_property(this, 'minWireVersion', ismaster.minWireVersion);
- setup_get_property(this, 'maxWireVersion', ismaster.maxWireVersion);
- setup_get_property(this, 'maxNumberOfDocsInBatch', maxNumberOfDocsInBatch);
- setup_get_property(this, 'commandsTakeWriteConcern', commandsTakeWriteConcern);
- setup_get_property(this, 'commandsTakeCollation', commandsTakeCollation);
- };
- class TopologyBase extends EventEmitter {
- constructor() {
- super();
- this.setMaxListeners(Infinity);
- }
- // Sessions related methods
- hasSessionSupport() {
- return this.logicalSessionTimeoutMinutes != null;
- }
- startSession(options, clientOptions) {
- const session = new ClientSession(this, this.s.sessionPool, options, clientOptions);
- session.once('ended', () => {
- this.s.sessions.delete(session);
- });
- this.s.sessions.add(session);
- return session;
- }
- endSessions(sessions, callback) {
- return this.s.coreTopology.endSessions(sessions, callback);
- }
- get clientMetadata() {
- return this.s.coreTopology.s.options.metadata;
- }
- // Server capabilities
- capabilities() {
- if (this.s.sCapabilities) return this.s.sCapabilities;
- if (this.s.coreTopology.lastIsMaster() == null) return null;
- this.s.sCapabilities = new ServerCapabilities(this.s.coreTopology.lastIsMaster());
- return this.s.sCapabilities;
- }
- // Command
- command(ns, cmd, options, callback) {
- this.s.coreTopology.command(ns.toString(), cmd, translateReadPreference(options), callback);
- }
- // Insert
- insert(ns, ops, options, callback) {
- this.s.coreTopology.insert(ns.toString(), ops, options, callback);
- }
- // Update
- update(ns, ops, options, callback) {
- this.s.coreTopology.update(ns.toString(), ops, options, callback);
- }
- // Remove
- remove(ns, ops, options, callback) {
- this.s.coreTopology.remove(ns.toString(), ops, options, callback);
- }
- // IsConnected
- isConnected(options) {
- options = options || {};
- options = translateReadPreference(options);
- return this.s.coreTopology.isConnected(options);
- }
- // IsDestroyed
- isDestroyed() {
- return this.s.coreTopology.isDestroyed();
- }
- // Cursor
- cursor(ns, cmd, options) {
- options = options || {};
- options = translateReadPreference(options);
- options.disconnectHandler = this.s.store;
- options.topology = this;
- return this.s.coreTopology.cursor(ns, cmd, options);
- }
- lastIsMaster() {
- return this.s.coreTopology.lastIsMaster();
- }
- selectServer(selector, options, callback) {
- return this.s.coreTopology.selectServer(selector, options, callback);
- }
- /**
- * Unref all sockets
- * @method
- */
- unref() {
- return this.s.coreTopology.unref();
- }
- /**
- * All raw connections
- * @method
- * @return {array}
- */
- connections() {
- return this.s.coreTopology.connections();
- }
- close(forceClosed, callback) {
- // If we have sessions, we want to individually move them to the session pool,
- // and then send a single endSessions call.
- this.s.sessions.forEach(session => session.endSession());
- if (this.s.sessionPool) {
- this.s.sessionPool.endAllPooledSessions();
- }
- // We need to wash out all stored processes
- if (forceClosed === true) {
- this.s.storeOptions.force = forceClosed;
- this.s.store.flush();
- }
- this.s.coreTopology.destroy(
- {
- force: typeof forceClosed === 'boolean' ? forceClosed : false
- },
- callback
- );
- }
- }
- // Properties
- Object.defineProperty(TopologyBase.prototype, 'bson', {
- enumerable: true,
- get: function() {
- return this.s.coreTopology.s.bson;
- }
- });
- Object.defineProperty(TopologyBase.prototype, 'parserType', {
- enumerable: true,
- get: function() {
- return this.s.coreTopology.parserType;
- }
- });
- Object.defineProperty(TopologyBase.prototype, 'logicalSessionTimeoutMinutes', {
- enumerable: true,
- get: function() {
- return this.s.coreTopology.logicalSessionTimeoutMinutes;
- }
- });
- Object.defineProperty(TopologyBase.prototype, 'type', {
- enumerable: true,
- get: function() {
- return this.s.coreTopology.type;
- }
- });
- exports.Store = Store;
- exports.ServerCapabilities = ServerCapabilities;
- exports.TopologyBase = TopologyBase;
|