topology_base.js 11 KB


  1. 'use strict';
  2. const EventEmitter = require('events'),
  3. MongoError = require('../core').MongoError,
  4. f = require('util').format,
  5. translateReadPreference = require('../utils').translateReadPreference,
  6. ClientSession = require('../core').Sessions.ClientSession;
  7. // The store of ops
  8. var Store = function(topology, storeOptions) {
  9. var self = this;
  10. var storedOps = [];
  11. storeOptions = storeOptions || { force: false, bufferMaxEntries: -1 };
  12. // Internal state
  13. this.s = {
  14. storedOps: storedOps,
  15. storeOptions: storeOptions,
  16. topology: topology
  17. };
  18. Object.defineProperty(this, 'length', {
  19. enumerable: true,
  20. get: function() {
  21. return self.s.storedOps.length;
  22. }
  23. });
  24. };
  25. Store.prototype.add = function(opType, ns, ops, options, callback) {
  26. if (this.s.storeOptions.force) {
  27. return callback(MongoError.create({ message: 'db closed by application', driver: true }));
  28. }
  29. if (this.s.storeOptions.bufferMaxEntries === 0) {
  30. return callback(
  31. MongoError.create({
  32. message: f(
  33. 'no connection available for operation and number of stored operation > %s',
  34. this.s.storeOptions.bufferMaxEntries
  35. ),
  36. driver: true
  37. })
  38. );
  39. }
  40. if (
  41. this.s.storeOptions.bufferMaxEntries > 0 &&
  42. this.s.storedOps.length > this.s.storeOptions.bufferMaxEntries
  43. ) {
  44. while (this.s.storedOps.length > 0) {
  45. var op = this.s.storedOps.shift();
  46. op.c(
  47. MongoError.create({
  48. message: f(
  49. 'no connection available for operation and number of stored operation > %s',
  50. this.s.storeOptions.bufferMaxEntries
  51. ),
  52. driver: true
  53. })
  54. );
  55. }
  56. return;
  57. }
  58. this.s.storedOps.push({ t: opType, n: ns, o: ops, op: options, c: callback });
  59. };
  60. Store.prototype.addObjectAndMethod = function(opType, object, method, params, callback) {
  61. if (this.s.storeOptions.force) {
  62. return callback(MongoError.create({ message: 'db closed by application', driver: true }));
  63. }
  64. if (this.s.storeOptions.bufferMaxEntries === 0) {
  65. return callback(
  66. MongoError.create({
  67. message: f(
  68. 'no connection available for operation and number of stored operation > %s',
  69. this.s.storeOptions.bufferMaxEntries
  70. ),
  71. driver: true
  72. })
  73. );
  74. }
  75. if (
  76. this.s.storeOptions.bufferMaxEntries > 0 &&
  77. this.s.storedOps.length > this.s.storeOptions.bufferMaxEntries
  78. ) {
  79. while (this.s.storedOps.length > 0) {
  80. var op = this.s.storedOps.shift();
  81. op.c(
  82. MongoError.create({
  83. message: f(
  84. 'no connection available for operation and number of stored operation > %s',
  85. this.s.storeOptions.bufferMaxEntries
  86. ),
  87. driver: true
  88. })
  89. );
  90. }
  91. return;
  92. }
  93. this.s.storedOps.push({ t: opType, m: method, o: object, p: params, c: callback });
  94. };
  95. Store.prototype.flush = function(err) {
  96. while (this.s.storedOps.length > 0) {
  97. this.s.storedOps
  98. .shift()
  99. .c(
  100. err ||
  101. MongoError.create({ message: f('no connection available for operation'), driver: true })
  102. );
  103. }
  104. };
  105. var primaryOptions = ['primary', 'primaryPreferred', 'nearest', 'secondaryPreferred'];
  106. var secondaryOptions = ['secondary', 'secondaryPreferred'];
  107. Store.prototype.execute = function(options) {
  108. options = options || {};
  109. // Get current ops
  110. var ops = this.s.storedOps;
  111. // Reset the ops
  112. this.s.storedOps = [];
  113. // Unpack options
  114. var executePrimary = typeof options.executePrimary === 'boolean' ? options.executePrimary : true;
  115. var executeSecondary =
  116. typeof options.executeSecondary === 'boolean' ? options.executeSecondary : true;
  117. // Execute all the stored ops
  118. while (ops.length > 0) {
  119. var op = ops.shift();
  120. if (op.t === 'cursor') {
  121. if (executePrimary && executeSecondary) {
  122. op.o[op.m].apply(op.o, op.p);
  123. } else if (
  124. executePrimary &&
  125. op.o.options &&
  126. op.o.options.readPreference &&
  127. primaryOptions.indexOf(op.o.options.readPreference.mode) !== -1
  128. ) {
  129. op.o[op.m].apply(op.o, op.p);
  130. } else if (
  131. !executePrimary &&
  132. executeSecondary &&
  133. op.o.options &&
  134. op.o.options.readPreference &&
  135. secondaryOptions.indexOf(op.o.options.readPreference.mode) !== -1
  136. ) {
  137. op.o[op.m].apply(op.o, op.p);
  138. }
  139. } else if (op.t === 'auth') {
  140. this.s.topology[op.t].apply(this.s.topology, op.o);
  141. } else {
  142. if (executePrimary && executeSecondary) {
  143. this.s.topology[op.t](op.n, op.o, op.op, op.c);
  144. } else if (
  145. executePrimary &&
  146. op.op &&
  147. op.op.readPreference &&
  148. primaryOptions.indexOf(op.op.readPreference.mode) !== -1
  149. ) {
  150. this.s.topology[op.t](op.n, op.o, op.op, op.c);
  151. } else if (
  152. !executePrimary &&
  153. executeSecondary &&
  154. op.op &&
  155. op.op.readPreference &&
  156. secondaryOptions.indexOf(op.op.readPreference.mode) !== -1
  157. ) {
  158. this.s.topology[op.t](op.n, op.o, op.op, op.c);
  159. }
  160. }
  161. }
  162. };
  163. Store.prototype.all = function() {
  164. return this.s.storedOps;
  165. };
  166. // Server capabilities
  167. var ServerCapabilities = function(ismaster) {
  168. var setup_get_property = function(object, name, value) {
  169. Object.defineProperty(object, name, {
  170. enumerable: true,
  171. get: function() {
  172. return value;
  173. }
  174. });
  175. };
  176. // Capabilities
  177. var aggregationCursor = false;
  178. var writeCommands = false;
  179. var textSearch = false;
  180. var authCommands = false;
  181. var listCollections = false;
  182. var listIndexes = false;
  183. var maxNumberOfDocsInBatch = ismaster.maxWriteBatchSize || 1000;
  184. var commandsTakeWriteConcern = false;
  185. var commandsTakeCollation = false;
  186. if (ismaster.minWireVersion >= 0) {
  187. textSearch = true;
  188. }
  189. if (ismaster.maxWireVersion >= 1) {
  190. aggregationCursor = true;
  191. authCommands = true;
  192. }
  193. if (ismaster.maxWireVersion >= 2) {
  194. writeCommands = true;
  195. }
  196. if (ismaster.maxWireVersion >= 3) {
  197. listCollections = true;
  198. listIndexes = true;
  199. }
  200. if (ismaster.maxWireVersion >= 5) {
  201. commandsTakeWriteConcern = true;
  202. commandsTakeCollation = true;
  203. }
  204. // If no min or max wire version set to 0
  205. if (ismaster.minWireVersion == null) {
  206. ismaster.minWireVersion = 0;
  207. }
  208. if (ismaster.maxWireVersion == null) {
  209. ismaster.maxWireVersion = 0;
  210. }
  211. // Map up read only parameters
  212. setup_get_property(this, 'hasAggregationCursor', aggregationCursor);
  213. setup_get_property(this, 'hasWriteCommands', writeCommands);
  214. setup_get_property(this, 'hasTextSearch', textSearch);
  215. setup_get_property(this, 'hasAuthCommands', authCommands);
  216. setup_get_property(this, 'hasListCollectionsCommand', listCollections);
  217. setup_get_property(this, 'hasListIndexesCommand', listIndexes);
  218. setup_get_property(this, 'minWireVersion', ismaster.minWireVersion);
  219. setup_get_property(this, 'maxWireVersion', ismaster.maxWireVersion);
  220. setup_get_property(this, 'maxNumberOfDocsInBatch', maxNumberOfDocsInBatch);
  221. setup_get_property(this, 'commandsTakeWriteConcern', commandsTakeWriteConcern);
  222. setup_get_property(this, 'commandsTakeCollation', commandsTakeCollation);
  223. };
  224. class TopologyBase extends EventEmitter {
  225. constructor() {
  226. super();
  227. this.setMaxListeners(Infinity);
  228. }
  229. // Sessions related methods
  230. hasSessionSupport() {
  231. return this.logicalSessionTimeoutMinutes != null;
  232. }
  233. startSession(options, clientOptions) {
  234. const session = new ClientSession(this, this.s.sessionPool, options, clientOptions);
  235. session.once('ended', () => {
  236. this.s.sessions.delete(session);
  237. });
  238. this.s.sessions.add(session);
  239. return session;
  240. }
  241. endSessions(sessions, callback) {
  242. return this.s.coreTopology.endSessions(sessions, callback);
  243. }
  244. get clientMetadata() {
  245. return this.s.coreTopology.s.options.metadata;
  246. }
  247. // Server capabilities
  248. capabilities() {
  249. if (this.s.sCapabilities) return this.s.sCapabilities;
  250. if (this.s.coreTopology.lastIsMaster() == null) return null;
  251. this.s.sCapabilities = new ServerCapabilities(this.s.coreTopology.lastIsMaster());
  252. return this.s.sCapabilities;
  253. }
  254. // Command
  255. command(ns, cmd, options, callback) {
  256. this.s.coreTopology.command(ns.toString(), cmd, translateReadPreference(options), callback);
  257. }
  258. // Insert
  259. insert(ns, ops, options, callback) {
  260. this.s.coreTopology.insert(ns.toString(), ops, options, callback);
  261. }
  262. // Update
  263. update(ns, ops, options, callback) {
  264. this.s.coreTopology.update(ns.toString(), ops, options, callback);
  265. }
  266. // Remove
  267. remove(ns, ops, options, callback) {
  268. this.s.coreTopology.remove(ns.toString(), ops, options, callback);
  269. }
  270. // IsConnected
  271. isConnected(options) {
  272. options = options || {};
  273. options = translateReadPreference(options);
  274. return this.s.coreTopology.isConnected(options);
  275. }
  276. // IsDestroyed
  277. isDestroyed() {
  278. return this.s.coreTopology.isDestroyed();
  279. }
  280. // Cursor
  281. cursor(ns, cmd, options) {
  282. options = options || {};
  283. options = translateReadPreference(options);
  284. options.disconnectHandler = this.s.store;
  285. options.topology = this;
  286. return this.s.coreTopology.cursor(ns, cmd, options);
  287. }
  288. lastIsMaster() {
  289. return this.s.coreTopology.lastIsMaster();
  290. }
  291. selectServer(selector, options, callback) {
  292. return this.s.coreTopology.selectServer(selector, options, callback);
  293. }
  294. /**
  295. * Unref all sockets
  296. * @method
  297. */
  298. unref() {
  299. return this.s.coreTopology.unref();
  300. }
  301. /**
  302. * All raw connections
  303. * @method
  304. * @return {array}
  305. */
  306. connections() {
  307. return this.s.coreTopology.connections();
  308. }
  309. close(forceClosed, callback) {
  310. // If we have sessions, we want to individually move them to the session pool,
  311. // and then send a single endSessions call.
  312. this.s.sessions.forEach(session => session.endSession());
  313. if (this.s.sessionPool) {
  314. this.s.sessionPool.endAllPooledSessions();
  315. }
  316. // We need to wash out all stored processes
  317. if (forceClosed === true) {
  318. this.s.storeOptions.force = forceClosed;
  319. this.s.store.flush();
  320. }
  321. this.s.coreTopology.destroy(
  322. {
  323. force: typeof forceClosed === 'boolean' ? forceClosed : false
  324. },
  325. callback
  326. );
  327. }
  328. }
  329. // Properties
  330. Object.defineProperty(TopologyBase.prototype, 'bson', {
  331. enumerable: true,
  332. get: function() {
  333. return this.s.coreTopology.s.bson;
  334. }
  335. });
  336. Object.defineProperty(TopologyBase.prototype, 'parserType', {
  337. enumerable: true,
  338. get: function() {
  339. return this.s.coreTopology.parserType;
  340. }
  341. });
  342. Object.defineProperty(TopologyBase.prototype, 'logicalSessionTimeoutMinutes', {
  343. enumerable: true,
  344. get: function() {
  345. return this.s.coreTopology.logicalSessionTimeoutMinutes;
  346. }
  347. });
  348. Object.defineProperty(TopologyBase.prototype, 'type', {
  349. enumerable: true,
  350. get: function() {
  351. return this.s.coreTopology.type;
  352. }
  353. });
  354. exports.Store = Store;
  355. exports.ServerCapabilities = ServerCapabilities;
  356. exports.TopologyBase = TopologyBase;