execute_operation.js 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.executeOperation = void 0;
  4. const error_1 = require("../error");
  5. const read_preference_1 = require("../read_preference");
  6. const server_selection_1 = require("../sdam/server_selection");
  7. const utils_1 = require("../utils");
  8. const operation_1 = require("./operation");
  9. const MMAPv1_RETRY_WRITES_ERROR_CODE = error_1.MONGODB_ERROR_CODES.IllegalOperation;
  10. const MMAPv1_RETRY_WRITES_ERROR_MESSAGE = 'This MongoDB deployment does not support retryable writes. Please add retryWrites=false to your connection string.';
  11. function executeOperation(client, operation, callback) {
  12. if (!(operation instanceof operation_1.AbstractOperation)) {
  13. // TODO(NODE-3483): Extend MongoRuntimeError
  14. throw new error_1.MongoRuntimeError('This method requires a valid operation instance');
  15. }
  16. return (0, utils_1.maybePromise)(callback, callback => {
  17. const topology = client.topology;
  18. if (topology == null) {
  19. if (client.s.hasBeenClosed) {
  20. return callback(new error_1.MongoNotConnectedError('Client must be connected before running operations'));
  21. }
  22. client.s.options[Symbol.for('@@mdb.skipPingOnConnect')] = true;
  23. return client.connect(error => {
  24. delete client.s.options[Symbol.for('@@mdb.skipPingOnConnect')];
  25. if (error) {
  26. return callback(error);
  27. }
  28. return executeOperation(client, operation, callback);
  29. });
  30. }
  31. if (topology.shouldCheckForSessionSupport()) {
  32. return topology.selectServer(read_preference_1.ReadPreference.primaryPreferred, {}, err => {
  33. if (err)
  34. return callback(err);
  35. executeOperation(client, operation, callback);
  36. });
  37. }
  38. // The driver sessions spec mandates that we implicitly create sessions for operations
  39. // that are not explicitly provided with a session.
  40. let session = operation.session;
  41. let owner;
  42. if (topology.hasSessionSupport()) {
  43. if (session == null) {
  44. owner = Symbol();
  45. session = topology.startSession({ owner, explicit: false });
  46. }
  47. else if (session.hasEnded) {
  48. return callback(new error_1.MongoExpiredSessionError('Use of expired sessions is not permitted'));
  49. }
  50. else if (session.snapshotEnabled && !topology.capabilities.supportsSnapshotReads) {
  51. return callback(new error_1.MongoCompatibilityError('Snapshot reads require MongoDB 5.0 or later'));
  52. }
  53. }
  54. else if (session) {
  55. // If the user passed an explicit session and we are still, after server selection,
  56. // trying to run against a topology that doesn't support sessions we error out.
  57. return callback(new error_1.MongoCompatibilityError('Current topology does not support sessions'));
  58. }
  59. try {
  60. executeWithServerSelection(topology, session, operation, (error, result) => {
  61. if ((session === null || session === void 0 ? void 0 : session.owner) != null && session.owner === owner) {
  62. return session.endSession(endSessionError => callback(endSessionError !== null && endSessionError !== void 0 ? endSessionError : error, result));
  63. }
  64. callback(error, result);
  65. });
  66. }
  67. catch (error) {
  68. if ((session === null || session === void 0 ? void 0 : session.owner) != null && session.owner === owner) {
  69. session.endSession();
  70. }
  71. throw error;
  72. }
  73. });
  74. }
  75. exports.executeOperation = executeOperation;
  76. function executeWithServerSelection(topology, session, operation, callback) {
  77. var _a, _b;
  78. const readPreference = (_a = operation.readPreference) !== null && _a !== void 0 ? _a : read_preference_1.ReadPreference.primary;
  79. const inTransaction = !!(session === null || session === void 0 ? void 0 : session.inTransaction());
  80. if (inTransaction && !readPreference.equals(read_preference_1.ReadPreference.primary)) {
  81. return callback(new error_1.MongoTransactionError(`Read preference in a transaction must be primary, not: ${readPreference.mode}`));
  82. }
  83. if ((session === null || session === void 0 ? void 0 : session.isPinned) && session.transaction.isCommitted && !operation.bypassPinningCheck) {
  84. session.unpin();
  85. }
  86. let selector;
  87. if (operation.hasAspect(operation_1.Aspect.CURSOR_ITERATING)) {
  88. // Get more operations must always select the same server, but run through
  89. // server selection to potentially force monitor checks if the server is
  90. // in an unknown state.
  91. selector = (0, server_selection_1.sameServerSelector)((_b = operation.server) === null || _b === void 0 ? void 0 : _b.description);
  92. }
  93. else if (operation.trySecondaryWrite) {
  94. // If operation should try to write to secondary use the custom server selector
  95. // otherwise provide the read preference.
  96. selector = (0, server_selection_1.secondaryWritableServerSelector)(topology.commonWireVersion, readPreference);
  97. }
  98. else {
  99. selector = readPreference;
  100. }
  101. const serverSelectionOptions = { session };
  102. function retryOperation(originalError) {
  103. const isWriteOperation = operation.hasAspect(operation_1.Aspect.WRITE_OPERATION);
  104. const isReadOperation = operation.hasAspect(operation_1.Aspect.READ_OPERATION);
  105. if (isWriteOperation && originalError.code === MMAPv1_RETRY_WRITES_ERROR_CODE) {
  106. return callback(new error_1.MongoServerError({
  107. message: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
  108. errmsg: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
  109. originalError
  110. }));
  111. }
  112. if (isWriteOperation && !(0, error_1.isRetryableWriteError)(originalError)) {
  113. return callback(originalError);
  114. }
  115. if (isReadOperation && !(0, error_1.isRetryableReadError)(originalError)) {
  116. return callback(originalError);
  117. }
  118. if (originalError instanceof error_1.MongoNetworkError &&
  119. (session === null || session === void 0 ? void 0 : session.isPinned) &&
  120. !session.inTransaction() &&
  121. operation.hasAspect(operation_1.Aspect.CURSOR_CREATING)) {
  122. // If we have a cursor and the initial command fails with a network error,
  123. // we can retry it on another connection. So we need to check it back in, clear the
  124. // pool for the service id, and retry again.
  125. session.unpin({ force: true, forceClear: true });
  126. }
  127. // select a new server, and attempt to retry the operation
  128. topology.selectServer(selector, serverSelectionOptions, (error, server) => {
  129. if (!error && isWriteOperation && !(0, utils_1.supportsRetryableWrites)(server)) {
  130. return callback(new error_1.MongoUnexpectedServerResponseError('Selected server does not support retryable writes'));
  131. }
  132. if (error || !server) {
  133. return callback(error !== null && error !== void 0 ? error : new error_1.MongoUnexpectedServerResponseError('Server selection failed without error'));
  134. }
  135. operation.execute(server, session, callback);
  136. });
  137. }
  138. if (readPreference &&
  139. !readPreference.equals(read_preference_1.ReadPreference.primary) &&
  140. (session === null || session === void 0 ? void 0 : session.inTransaction())) {
  141. callback(new error_1.MongoTransactionError(`Read preference in a transaction must be primary, not: ${readPreference.mode}`));
  142. return;
  143. }
  144. // select a server, and execute the operation against it
  145. topology.selectServer(selector, serverSelectionOptions, (error, server) => {
  146. if (error || !server) {
  147. return callback(error);
  148. }
  149. if (session && operation.hasAspect(operation_1.Aspect.RETRYABLE)) {
  150. const willRetryRead = topology.s.options.retryReads && !inTransaction && operation.canRetryRead;
  151. const willRetryWrite = topology.s.options.retryWrites &&
  152. !inTransaction &&
  153. (0, utils_1.supportsRetryableWrites)(server) &&
  154. operation.canRetryWrite;
  155. const hasReadAspect = operation.hasAspect(operation_1.Aspect.READ_OPERATION);
  156. const hasWriteAspect = operation.hasAspect(operation_1.Aspect.WRITE_OPERATION);
  157. if ((hasReadAspect && willRetryRead) || (hasWriteAspect && willRetryWrite)) {
  158. if (hasWriteAspect && willRetryWrite) {
  159. operation.options.willRetryWrite = true;
  160. session.incrementTransactionNumber();
  161. }
  162. return operation.execute(server, session, (error, result) => {
  163. if (error instanceof error_1.MongoError) {
  164. return retryOperation(error);
  165. }
  166. else if (error) {
  167. return callback(error);
  168. }
  169. callback(undefined, result);
  170. });
  171. }
  172. }
  173. return operation.execute(server, session, callback);
  174. });
  175. }
  176. //# sourceMappingURL=execute_operation.js.map