aggregate.js 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. 'use strict';
  2. const CommandOperationV2 = require('./command_v2');
  3. const MongoError = require('../core').MongoError;
  4. const maxWireVersion = require('../core/utils').maxWireVersion;
  5. const ReadPreference = require('../core').ReadPreference;
  6. const Aspect = require('./operation').Aspect;
  7. const defineAspects = require('./operation').defineAspects;
  8. const DB_AGGREGATE_COLLECTION = 1;
  9. const MIN_WIRE_VERSION_$OUT_READ_CONCERN_SUPPORT = 8;
  10. class AggregateOperation extends CommandOperationV2 {
  11. constructor(parent, pipeline, options) {
  12. super(parent, options, { fullResponse: true });
  13. this.target =
  14. parent.s.namespace && parent.s.namespace.collection
  15. ? parent.s.namespace.collection
  16. : DB_AGGREGATE_COLLECTION;
  17. this.pipeline = pipeline;
  18. // determine if we have a write stage, override read preference if so
  19. this.hasWriteStage = false;
  20. if (typeof options.out === 'string') {
  21. this.pipeline = this.pipeline.concat({ $out: options.out });
  22. this.hasWriteStage = true;
  23. } else if (pipeline.length > 0) {
  24. const finalStage = pipeline[pipeline.length - 1];
  25. if (finalStage.$out || finalStage.$merge) {
  26. this.hasWriteStage = true;
  27. }
  28. }
  29. if (this.hasWriteStage) {
  30. this.readPreference = ReadPreference.primary;
  31. }
  32. if (options.explain && (this.readConcern || this.writeConcern)) {
  33. throw new MongoError(
  34. '"explain" cannot be used on an aggregate call with readConcern/writeConcern'
  35. );
  36. }
  37. if (options.cursor != null && typeof options.cursor !== 'object') {
  38. throw new MongoError('cursor options must be an object');
  39. }
  40. }
  41. get canRetryRead() {
  42. return !this.hasWriteStage;
  43. }
  44. addToPipeline(stage) {
  45. this.pipeline.push(stage);
  46. }
  47. execute(server, callback) {
  48. const options = this.options;
  49. const serverWireVersion = maxWireVersion(server);
  50. const command = { aggregate: this.target, pipeline: this.pipeline };
  51. if (this.hasWriteStage && serverWireVersion < MIN_WIRE_VERSION_$OUT_READ_CONCERN_SUPPORT) {
  52. this.readConcern = null;
  53. }
  54. if (serverWireVersion >= 5) {
  55. if (this.hasWriteStage && this.writeConcern) {
  56. Object.assign(command, { writeConcern: this.writeConcern });
  57. }
  58. }
  59. if (options.bypassDocumentValidation === true) {
  60. command.bypassDocumentValidation = options.bypassDocumentValidation;
  61. }
  62. if (typeof options.allowDiskUse === 'boolean') {
  63. command.allowDiskUse = options.allowDiskUse;
  64. }
  65. if (options.hint) {
  66. command.hint = options.hint;
  67. }
  68. if (options.explain) {
  69. options.full = false;
  70. command.explain = options.explain;
  71. }
  72. command.cursor = options.cursor || {};
  73. if (options.batchSize && !this.hasWriteStage) {
  74. command.cursor.batchSize = options.batchSize;
  75. }
  76. super.executeCommand(server, command, callback);
  77. }
  78. }
  79. defineAspects(AggregateOperation, [
  80. Aspect.READ_OPERATION,
  81. Aspect.RETRYABLE,
  82. Aspect.EXECUTE_WITH_SELECTION
  83. ]);
  84. module.exports = AggregateOperation;