aggregate.js 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. 'use strict';
  2. const CommandOperationV2 = require('./command_v2');
  3. const MongoError = require('../core').MongoError;
  4. const maxWireVersion = require('../core/utils').maxWireVersion;
  5. const ReadPreference = require('../core').ReadPreference;
  6. const Aspect = require('./operation').Aspect;
  7. const defineAspects = require('./operation').defineAspects;
  8. const DB_AGGREGATE_COLLECTION = 1;
  9. const MIN_WIRE_VERSION_$OUT_READ_CONCERN_SUPPORT = 8;
  10. class AggregateOperation extends CommandOperationV2 {
  11. constructor(parent, pipeline, options) {
  12. super(parent, options, { fullResponse: true });
  13. this.target =
  14. parent.s.namespace && parent.s.namespace.collection
  15. ? parent.s.namespace.collection
  16. : DB_AGGREGATE_COLLECTION;
  17. this.pipeline = pipeline;
  18. // determine if we have a write stage, override read preference if so
  19. this.hasWriteStage = false;
  20. if (typeof options.out === 'string') {
  21. this.pipeline = this.pipeline.concat({ $out: options.out });
  22. this.hasWriteStage = true;
  23. } else if (pipeline.length > 0) {
  24. const finalStage = pipeline[pipeline.length - 1];
  25. if (finalStage.$out || finalStage.$merge) {
  26. this.hasWriteStage = true;
  27. }
  28. }
  29. if (this.hasWriteStage) {
  30. this.readPreference = ReadPreference.primary;
  31. }
  32. if (this.explain && this.writeConcern) {
  33. throw new MongoError('"explain" cannot be used on an aggregate call with writeConcern');
  34. }
  35. if (options.cursor != null && typeof options.cursor !== 'object') {
  36. throw new MongoError('cursor options must be an object');
  37. }
  38. }
  39. get canRetryRead() {
  40. return !this.hasWriteStage;
  41. }
  42. addToPipeline(stage) {
  43. this.pipeline.push(stage);
  44. }
  45. execute(server, callback) {
  46. const options = this.options;
  47. const serverWireVersion = maxWireVersion(server);
  48. const command = { aggregate: this.target, pipeline: this.pipeline };
  49. if (this.hasWriteStage && serverWireVersion < MIN_WIRE_VERSION_$OUT_READ_CONCERN_SUPPORT) {
  50. this.readConcern = null;
  51. }
  52. if (serverWireVersion >= 5) {
  53. if (this.hasWriteStage && this.writeConcern) {
  54. Object.assign(command, { writeConcern: this.writeConcern });
  55. }
  56. }
  57. if (options.bypassDocumentValidation === true) {
  58. command.bypassDocumentValidation = options.bypassDocumentValidation;
  59. }
  60. if (typeof options.allowDiskUse === 'boolean') {
  61. command.allowDiskUse = options.allowDiskUse;
  62. }
  63. if (options.hint) {
  64. command.hint = options.hint;
  65. }
  66. if (this.explain) {
  67. options.full = false;
  68. }
  69. command.cursor = options.cursor || {};
  70. if (options.batchSize && !this.hasWriteStage) {
  71. command.cursor.batchSize = options.batchSize;
  72. }
  73. super.executeCommand(server, command, callback);
  74. }
  75. }
  76. defineAspects(AggregateOperation, [
  77. Aspect.READ_OPERATION,
  78. Aspect.RETRYABLE,
  79. Aspect.EXECUTE_WITH_SELECTION,
  80. Aspect.EXPLAINABLE
  81. ]);
  82. module.exports = AggregateOperation;