execute_operation.js 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. 'use strict';
  2. const MongoError = require('../core/error').MongoError;
  3. const Aspect = require('./operation').Aspect;
  4. const OperationBase = require('./operation').OperationBase;
  5. const ReadPreference = require('../core/topologies/read_preference');
  6. const isRetryableError = require('../core/error').isRetryableError;
  7. const maxWireVersion = require('../core/utils').maxWireVersion;
  8. const isUnifiedTopology = require('../core/utils').isUnifiedTopology;
  9. /**
  10. * Executes the given operation with provided arguments.
  11. *
  12. * This method reduces large amounts of duplication in the entire codebase by providing
  13. * a single point for determining whether callbacks or promises should be used. Additionally
  14. * it allows for a single point of entry to provide features such as implicit sessions, which
  15. * are required by the Driver Sessions specification in the event that a ClientSession is
  16. * not provided
  17. *
  18. * @param {object} topology The topology to execute this operation on
  19. * @param {Operation} operation The operation to execute
  20. * @param {function} callback The command result callback
  21. */
  22. function executeOperation(topology, operation, callback) {
  23. if (topology == null) {
  24. throw new TypeError('This method requires a valid topology instance');
  25. }
  26. if (!(operation instanceof OperationBase)) {
  27. throw new TypeError('This method requires a valid operation instance');
  28. }
  29. if (isUnifiedTopology(topology) && topology.shouldCheckForSessionSupport()) {
  30. return selectServerForSessionSupport(topology, operation, callback);
  31. }
  32. const Promise = topology.s.promiseLibrary;
  33. // The driver sessions spec mandates that we implicitly create sessions for operations
  34. // that are not explicitly provided with a session.
  35. let session, owner;
  36. if (topology.hasSessionSupport()) {
  37. if (operation.session == null) {
  38. owner = Symbol();
  39. session = topology.startSession({ owner });
  40. operation.session = session;
  41. } else if (operation.session.hasEnded) {
  42. throw new MongoError('Use of expired sessions is not permitted');
  43. }
  44. }
  45. let result;
  46. if (typeof callback !== 'function') {
  47. result = new Promise((resolve, reject) => {
  48. callback = (err, res) => {
  49. if (err) return reject(err);
  50. resolve(res);
  51. };
  52. });
  53. }
  54. function executeCallback(err, result) {
  55. if (session && session.owner === owner) {
  56. session.endSession();
  57. if (operation.session === session) {
  58. operation.clearSession();
  59. }
  60. }
  61. callback(err, result);
  62. }
  63. try {
  64. if (operation.hasAspect(Aspect.EXECUTE_WITH_SELECTION)) {
  65. executeWithServerSelection(topology, operation, executeCallback);
  66. } else {
  67. operation.execute(executeCallback);
  68. }
  69. } catch (e) {
  70. if (session && session.owner === owner) {
  71. session.endSession();
  72. if (operation.session === session) {
  73. operation.clearSession();
  74. }
  75. }
  76. throw e;
  77. }
  78. return result;
  79. }
  80. function supportsRetryableReads(server) {
  81. return maxWireVersion(server) >= 6;
  82. }
  83. function executeWithServerSelection(topology, operation, callback) {
  84. const readPreference = operation.readPreference || ReadPreference.primary;
  85. const inTransaction = operation.session && operation.session.inTransaction();
  86. if (inTransaction && !readPreference.equals(ReadPreference.primary)) {
  87. callback(
  88. new MongoError(
  89. `Read preference in a transaction must be primary, not: ${readPreference.mode}`
  90. )
  91. );
  92. return;
  93. }
  94. const serverSelectionOptions = {
  95. readPreference,
  96. session: operation.session
  97. };
  98. function callbackWithRetry(err, result) {
  99. if (err == null) {
  100. return callback(null, result);
  101. }
  102. if (!isRetryableError(err)) {
  103. return callback(err);
  104. }
  105. // select a new server, and attempt to retry the operation
  106. topology.selectServer(serverSelectionOptions, (err, server) => {
  107. if (err || !supportsRetryableReads(server)) {
  108. callback(err, null);
  109. return;
  110. }
  111. operation.execute(server, callback);
  112. });
  113. }
  114. // select a server, and execute the operation against it
  115. topology.selectServer(serverSelectionOptions, (err, server) => {
  116. if (err) {
  117. callback(err, null);
  118. return;
  119. }
  120. const shouldRetryReads =
  121. topology.s.options.retryReads !== false &&
  122. operation.session &&
  123. !inTransaction &&
  124. supportsRetryableReads(server) &&
  125. operation.canRetryRead;
  126. if (operation.hasAspect(Aspect.RETRYABLE) && shouldRetryReads) {
  127. operation.execute(server, callbackWithRetry);
  128. return;
  129. }
  130. operation.execute(server, callback);
  131. });
  132. }
  133. // TODO: This is only supported for unified topology, it should go away once
  134. // we remove support for legacy topology types.
  135. function selectServerForSessionSupport(topology, operation, callback) {
  136. const Promise = topology.s.promiseLibrary;
  137. let result;
  138. if (typeof callback !== 'function') {
  139. result = new Promise((resolve, reject) => {
  140. callback = (err, result) => {
  141. if (err) return reject(err);
  142. resolve(result);
  143. };
  144. });
  145. }
  146. topology.selectServer(ReadPreference.primaryPreferred, err => {
  147. if (err) {
  148. callback(err);
  149. return;
  150. }
  151. executeOperation(topology, operation, callback);
  152. });
  153. return result;
  154. }
  155. module.exports = executeOperation;