execute_operation.js 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. 'use strict';
  2. const maybePromise = require('../utils').maybePromise;
  3. const MongoError = require('../core/error').MongoError;
  4. const Aspect = require('./operation').Aspect;
  5. const OperationBase = require('./operation').OperationBase;
  6. const ReadPreference = require('../core/topologies/read_preference');
  7. const isRetryableError = require('../core/error').isRetryableError;
  8. const maxWireVersion = require('../core/utils').maxWireVersion;
  9. const isUnifiedTopology = require('../core/utils').isUnifiedTopology;
  10. /**
  11. * Executes the given operation with provided arguments.
  12. *
  13. * This method reduces large amounts of duplication in the entire codebase by providing
  14. * a single point for determining whether callbacks or promises should be used. Additionally
  15. * it allows for a single point of entry to provide features such as implicit sessions, which
  16. * are required by the Driver Sessions specification in the event that a ClientSession is
  17. * not provided
  18. *
  19. * @param {object} topology The topology to execute this operation on
  20. * @param {Operation} operation The operation to execute
  21. * @param {function} callback The command result callback
  22. */
  23. function executeOperation(topology, operation, cb) {
  24. if (topology == null) {
  25. throw new TypeError('This method requires a valid topology instance');
  26. }
  27. if (!(operation instanceof OperationBase)) {
  28. throw new TypeError('This method requires a valid operation instance');
  29. }
  30. return maybePromise(topology, cb, callback => {
  31. if (isUnifiedTopology(topology) && topology.shouldCheckForSessionSupport()) {
  32. // Recursive call to executeOperation after a server selection
  33. return selectServerForSessionSupport(topology, operation, callback);
  34. }
  35. // The driver sessions spec mandates that we implicitly create sessions for operations
  36. // that are not explicitly provided with a session.
  37. let session, owner;
  38. if (topology.hasSessionSupport()) {
  39. if (operation.session == null) {
  40. owner = Symbol();
  41. session = topology.startSession({ owner });
  42. operation.session = session;
  43. } else if (operation.session.hasEnded) {
  44. return callback(new MongoError('Use of expired sessions is not permitted'));
  45. }
  46. } else if (operation.session) {
  47. // If the user passed an explicit session and we are still, after server selection,
  48. // trying to run against a topology that doesn't support sessions we error out.
  49. return callback(new MongoError('Current topology does not support sessions'));
  50. }
  51. function executeCallback(err, result) {
  52. if (session && session.owner === owner) {
  53. session.endSession();
  54. if (operation.session === session) {
  55. operation.clearSession();
  56. }
  57. }
  58. callback(err, result);
  59. }
  60. try {
  61. if (operation.hasAspect(Aspect.EXECUTE_WITH_SELECTION)) {
  62. executeWithServerSelection(topology, operation, executeCallback);
  63. } else {
  64. operation.execute(executeCallback);
  65. }
  66. } catch (error) {
  67. if (session && session.owner === owner) {
  68. session.endSession();
  69. if (operation.session === session) {
  70. operation.clearSession();
  71. }
  72. }
  73. callback(error);
  74. }
  75. });
  76. }
  77. function supportsRetryableReads(server) {
  78. return maxWireVersion(server) >= 6;
  79. }
  80. function executeWithServerSelection(topology, operation, callback) {
  81. const readPreference = operation.readPreference || ReadPreference.primary;
  82. const inTransaction = operation.session && operation.session.inTransaction();
  83. if (inTransaction && !readPreference.equals(ReadPreference.primary)) {
  84. callback(
  85. new MongoError(
  86. `Read preference in a transaction must be primary, not: ${readPreference.mode}`
  87. )
  88. );
  89. return;
  90. }
  91. const serverSelectionOptions = {
  92. readPreference,
  93. session: operation.session
  94. };
  95. function callbackWithRetry(err, result) {
  96. if (err == null) {
  97. return callback(null, result);
  98. }
  99. if (!isRetryableError(err)) {
  100. return callback(err);
  101. }
  102. // select a new server, and attempt to retry the operation
  103. topology.selectServer(serverSelectionOptions, (err, server) => {
  104. if (err || !supportsRetryableReads(server)) {
  105. callback(err, null);
  106. return;
  107. }
  108. operation.execute(server, callback);
  109. });
  110. }
  111. // select a server, and execute the operation against it
  112. topology.selectServer(serverSelectionOptions, (err, server) => {
  113. if (err) {
  114. callback(err, null);
  115. return;
  116. }
  117. const shouldRetryReads =
  118. topology.s.options.retryReads !== false &&
  119. operation.session &&
  120. !inTransaction &&
  121. supportsRetryableReads(server) &&
  122. operation.canRetryRead;
  123. if (operation.hasAspect(Aspect.RETRYABLE) && shouldRetryReads) {
  124. operation.execute(server, callbackWithRetry);
  125. return;
  126. }
  127. operation.execute(server, callback);
  128. });
  129. }
  130. // The Unified Topology runs serverSelection before executing every operation
  131. // Session support is determined by the result of a monitoring check triggered by this selection
  132. function selectServerForSessionSupport(topology, operation, callback) {
  133. topology.selectServer(ReadPreference.primaryPreferred, err => {
  134. if (err) {
  135. return callback(err);
  136. }
  137. executeOperation(topology, operation, callback);
  138. });
  139. }
  140. module.exports = executeOperation;