execute_operation.js 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.executeOperation = void 0;
  4. const error_1 = require("../error");
  5. const read_preference_1 = require("../read_preference");
  6. const server_selection_1 = require("../sdam/server_selection");
  7. const utils_1 = require("../utils");
  8. const operation_1 = require("./operation");
  9. const MMAPv1_RETRY_WRITES_ERROR_CODE = error_1.MONGODB_ERROR_CODES.IllegalOperation;
  10. const MMAPv1_RETRY_WRITES_ERROR_MESSAGE = 'This MongoDB deployment does not support retryable writes. Please add retryWrites=false to your connection string.';
  11. function executeOperation(topology, operation, callback) {
  12. if (!(operation instanceof operation_1.AbstractOperation)) {
  13. // TODO(NODE-3483)
  14. throw new error_1.MongoRuntimeError('This method requires a valid operation instance');
  15. }
  16. return (0, utils_1.maybePromise)(callback, cb => {
  17. if (topology.shouldCheckForSessionSupport()) {
  18. return topology.selectServer(read_preference_1.ReadPreference.primaryPreferred, err => {
  19. if (err)
  20. return cb(err);
  21. executeOperation(topology, operation, cb);
  22. });
  23. }
  24. // The driver sessions spec mandates that we implicitly create sessions for operations
  25. // that are not explicitly provided with a session.
  26. let session = operation.session;
  27. let owner;
  28. if (topology.hasSessionSupport()) {
  29. if (session == null) {
  30. owner = Symbol();
  31. session = topology.startSession({ owner, explicit: false });
  32. }
  33. else if (session.hasEnded) {
  34. return cb(new error_1.MongoExpiredSessionError('Use of expired sessions is not permitted'));
  35. }
  36. else if (session.snapshotEnabled && !topology.capabilities.supportsSnapshotReads) {
  37. return cb(new error_1.MongoCompatibilityError('Snapshot reads require MongoDB 5.0 or later'));
  38. }
  39. }
  40. else if (session) {
  41. // If the user passed an explicit session and we are still, after server selection,
  42. // trying to run against a topology that doesn't support sessions we error out.
  43. return cb(new error_1.MongoCompatibilityError('Current topology does not support sessions'));
  44. }
  45. try {
  46. executeWithServerSelection(topology, session, operation, (err, result) => {
  47. if (session && session.owner && session.owner === owner) {
  48. return session.endSession(err2 => cb(err2 || err, result));
  49. }
  50. cb(err, result);
  51. });
  52. }
  53. catch (e) {
  54. if (session && session.owner && session.owner === owner) {
  55. session.endSession();
  56. }
  57. throw e;
  58. }
  59. });
  60. }
  61. exports.executeOperation = executeOperation;
  62. function supportsRetryableReads(server) {
  63. return (0, utils_1.maxWireVersion)(server) >= 6;
  64. }
  65. function executeWithServerSelection(topology, session, operation, callback) {
  66. var _a;
  67. const readPreference = operation.readPreference || read_preference_1.ReadPreference.primary;
  68. const inTransaction = session && session.inTransaction();
  69. if (inTransaction && !readPreference.equals(read_preference_1.ReadPreference.primary)) {
  70. callback(new error_1.MongoTransactionError(`Read preference in a transaction must be primary, not: ${readPreference.mode}`));
  71. return;
  72. }
  73. if (session &&
  74. session.isPinned &&
  75. session.transaction.isCommitted &&
  76. !operation.bypassPinningCheck) {
  77. session.unpin();
  78. }
  79. let selector;
  80. if (operation.hasAspect(operation_1.Aspect.CURSOR_ITERATING)) {
  81. // Get more operations must always select the same server, but run through
  82. // server selection to potentially force monitor checks if the server is
  83. // in an unknown state.
  84. selector = (0, server_selection_1.sameServerSelector)((_a = operation.server) === null || _a === void 0 ? void 0 : _a.description);
  85. }
  86. else if (operation.trySecondaryWrite) {
  87. // If operation should try to write to secondary use the custom server selector
  88. // otherwise provide the read preference.
  89. selector = (0, server_selection_1.secondaryWritableServerSelector)(topology.commonWireVersion, readPreference);
  90. }
  91. else {
  92. selector = readPreference;
  93. }
  94. const serverSelectionOptions = { session };
  95. function callbackWithRetry(err, result) {
  96. if (err == null) {
  97. return callback(undefined, result);
  98. }
  99. const hasReadAspect = operation.hasAspect(operation_1.Aspect.READ_OPERATION);
  100. const hasWriteAspect = operation.hasAspect(operation_1.Aspect.WRITE_OPERATION);
  101. const itShouldRetryWrite = shouldRetryWrite(err);
  102. if ((hasReadAspect && !(0, error_1.isRetryableError)(err)) || (hasWriteAspect && !itShouldRetryWrite)) {
  103. return callback(err);
  104. }
  105. if (hasWriteAspect &&
  106. itShouldRetryWrite &&
  107. err.code === MMAPv1_RETRY_WRITES_ERROR_CODE &&
  108. err.errmsg.match(/Transaction numbers/)) {
  109. callback(new error_1.MongoServerError({
  110. message: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
  111. errmsg: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
  112. originalError: err
  113. }));
  114. return;
  115. }
  116. // select a new server, and attempt to retry the operation
  117. topology.selectServer(selector, serverSelectionOptions, (e, server) => {
  118. if (e ||
  119. (operation.hasAspect(operation_1.Aspect.READ_OPERATION) && !supportsRetryableReads(server)) ||
  120. (operation.hasAspect(operation_1.Aspect.WRITE_OPERATION) && !(0, utils_1.supportsRetryableWrites)(server))) {
  121. callback(e);
  122. return;
  123. }
  124. // If we have a cursor and the initial command fails with a network error,
  125. // we can retry it on another connection. So we need to check it back in, clear the
  126. // pool for the service id, and retry again.
  127. if (err &&
  128. err instanceof error_1.MongoNetworkError &&
  129. server.loadBalanced &&
  130. session &&
  131. session.isPinned &&
  132. !session.inTransaction() &&
  133. operation.hasAspect(operation_1.Aspect.CURSOR_CREATING)) {
  134. session.unpin({ force: true, forceClear: true });
  135. }
  136. operation.execute(server, session, callback);
  137. });
  138. }
  139. if (readPreference &&
  140. !readPreference.equals(read_preference_1.ReadPreference.primary) &&
  141. session &&
  142. session.inTransaction()) {
  143. callback(new error_1.MongoTransactionError(`Read preference in a transaction must be primary, not: ${readPreference.mode}`));
  144. return;
  145. }
  146. // select a server, and execute the operation against it
  147. topology.selectServer(selector, serverSelectionOptions, (err, server) => {
  148. if (err) {
  149. callback(err);
  150. return;
  151. }
  152. if (session && operation.hasAspect(operation_1.Aspect.RETRYABLE)) {
  153. const willRetryRead = topology.s.options.retryReads !== false &&
  154. !inTransaction &&
  155. supportsRetryableReads(server) &&
  156. operation.canRetryRead;
  157. const willRetryWrite = topology.s.options.retryWrites === true &&
  158. !inTransaction &&
  159. (0, utils_1.supportsRetryableWrites)(server) &&
  160. operation.canRetryWrite;
  161. const hasReadAspect = operation.hasAspect(operation_1.Aspect.READ_OPERATION);
  162. const hasWriteAspect = operation.hasAspect(operation_1.Aspect.WRITE_OPERATION);
  163. if ((hasReadAspect && willRetryRead) || (hasWriteAspect && willRetryWrite)) {
  164. if (hasWriteAspect && willRetryWrite) {
  165. operation.options.willRetryWrite = true;
  166. session.incrementTransactionNumber();
  167. }
  168. operation.execute(server, session, callbackWithRetry);
  169. return;
  170. }
  171. }
  172. operation.execute(server, session, callback);
  173. });
  174. }
  175. function shouldRetryWrite(err) {
  176. return err instanceof error_1.MongoError && err.hasErrorLabel('RetryableWriteError');
  177. }
  178. //# sourceMappingURL=execute_operation.js.map