aggregate.js 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131
  1. 'use strict';
  2. /*!
  3. * Module dependencies
  4. */
  5. const AggregationCursor = require('./cursor/AggregationCursor');
  6. const Query = require('./query');
  7. const applyGlobalMaxTimeMS = require('./helpers/query/applyGlobalMaxTimeMS');
  8. const getConstructorName = require('./helpers/getConstructorName');
  9. const prepareDiscriminatorPipeline = require('./helpers/aggregate/prepareDiscriminatorPipeline');
  10. const promiseOrCallback = require('./helpers/promiseOrCallback');
  11. const stringifyFunctionOperators = require('./helpers/aggregate/stringifyFunctionOperators');
  12. const utils = require('./utils');
  13. const read = Query.prototype.read;
  14. const readConcern = Query.prototype.readConcern;
  15. /**
  16. * Aggregate constructor used for building aggregation pipelines. Do not
  17. * instantiate this class directly, use [Model.aggregate()](/docs/api.html#model_Model.aggregate) instead.
  18. *
  19. * ####Example:
  20. *
  21. * const aggregate = Model.aggregate([
  22. * { $project: { a: 1, b: 1 } },
  23. * { $skip: 5 }
  24. * ]);
  25. *
  26. * Model.
  27. * aggregate([{ $match: { age: { $gte: 21 }}}]).
  28. * unwind('tags').
  29. * exec(callback);
  30. *
  31. * ####Note:
  32. *
  33. * - The documents returned are plain javascript objects, not mongoose documents (since any shape of document can be returned).
  34. * - Mongoose does **not** cast pipeline stages. The below will **not** work unless `_id` is a string in the database
  35. *
  36. * ```javascript
  37. * new Aggregate([{ $match: { _id: '00000000000000000000000a' } }]);
  38. * // Do this instead to cast to an ObjectId
  39. * new Aggregate([{ $match: { _id: new mongoose.Types.ObjectId('00000000000000000000000a') } }]);
  40. * ```
  41. *
  42. * @see MongoDB http://docs.mongodb.org/manual/applications/aggregation/
  43. * @see driver http://mongodb.github.com/node-mongodb-native/api-generated/collection.html#aggregate
  44. * @param {Array} [pipeline] aggregation pipeline as an array of objects
  45. * @param {Model} [model] the model to use with this aggregate.
  46. * @api public
  47. */
  48. function Aggregate(pipeline, model) {
  49. this._pipeline = [];
  50. this._model = model;
  51. this.options = {};
  52. if (arguments.length === 1 && Array.isArray(pipeline)) {
  53. this.append.apply(this, pipeline);
  54. }
  55. }
  56. /**
  57. * Contains options passed down to the [aggregate command](https://docs.mongodb.com/manual/reference/command/aggregate/).
  58. * Supported options are:
  59. *
  60. * - `readPreference`
  61. * - [`cursor`](./api.html#aggregate_Aggregate-cursor)
  62. * - [`explain`](./api.html#aggregate_Aggregate-explain)
  63. * - [`allowDiskUse`](./api.html#aggregate_Aggregate-allowDiskUse)
  64. * - `maxTimeMS`
  65. * - `bypassDocumentValidation`
  66. * - `raw`
  67. * - `promoteLongs`
  68. * - `promoteValues`
  69. * - `promoteBuffers`
  70. * - [`collation`](./api.html#aggregate_Aggregate-collation)
  71. * - `comment`
  72. * - [`session`](./api.html#aggregate_Aggregate-session)
  73. *
  74. * @property options
  75. * @memberOf Aggregate
  76. * @api public
  77. */
  78. Aggregate.prototype.options;
  79. /**
  80. * Get/set the model that this aggregation will execute on.
  81. *
  82. * ####Example:
  83. * const aggregate = MyModel.aggregate([{ $match: { answer: 42 } }]);
  84. * aggregate.model() === MyModel; // true
  85. *
  86. * // Change the model. There's rarely any reason to do this.
  87. * aggregate.model(SomeOtherModel);
  88. * aggregate.model() === SomeOtherModel; // true
  89. *
  90. * @param {Model} [model] set the model associated with this aggregate.
  91. * @return {Model}
  92. * @api public
  93. */
  94. Aggregate.prototype.model = function(model) {
  95. if (arguments.length === 0) {
  96. return this._model;
  97. }
  98. this._model = model;
  99. if (model.schema != null) {
  100. if (this.options.readPreference == null &&
  101. model.schema.options.read != null) {
  102. this.options.readPreference = model.schema.options.read;
  103. }
  104. if (this.options.collation == null &&
  105. model.schema.options.collation != null) {
  106. this.options.collation = model.schema.options.collation;
  107. }
  108. }
  109. return model;
  110. };
  111. /**
  112. * Appends new operators to this aggregate pipeline
  113. *
  114. * ####Examples:
  115. *
  116. * aggregate.append({ $project: { field: 1 }}, { $limit: 2 });
  117. *
  118. * // or pass an array
  119. * const pipeline = [{ $match: { daw: 'Logic Audio X' }} ];
  120. * aggregate.append(pipeline);
  121. *
  122. * @param {Object} ops operator(s) to append
  123. * @return {Aggregate}
  124. * @api public
  125. */
  126. Aggregate.prototype.append = function() {
  127. const args = (arguments.length === 1 && Array.isArray(arguments[0]))
  128. ? arguments[0]
  129. : [...arguments];
  130. if (!args.every(isOperator)) {
  131. throw new Error('Arguments must be aggregate pipeline operators');
  132. }
  133. this._pipeline = this._pipeline.concat(args);
  134. return this;
  135. };
  136. /**
  137. * Appends a new $addFields operator to this aggregate pipeline.
  138. * Requires MongoDB v3.4+ to work
  139. *
  140. * ####Examples:
  141. *
  142. * // adding new fields based on existing fields
  143. * aggregate.addFields({
  144. * newField: '$b.nested'
  145. * , plusTen: { $add: ['$val', 10]}
  146. * , sub: {
  147. * name: '$a'
  148. * }
  149. * })
  150. *
  151. * // etc
  152. * aggregate.addFields({ salary_k: { $divide: [ "$salary", 1000 ] } });
  153. *
  154. * @param {Object} arg field specification
  155. * @see $addFields https://docs.mongodb.com/manual/reference/operator/aggregation/addFields/
  156. * @return {Aggregate}
  157. * @api public
  158. */
  159. Aggregate.prototype.addFields = function(arg) {
  160. const fields = {};
  161. if (typeof arg === 'object' && !Array.isArray(arg)) {
  162. Object.keys(arg).forEach(function(field) {
  163. fields[field] = arg[field];
  164. });
  165. } else {
  166. throw new Error('Invalid addFields() argument. Must be an object');
  167. }
  168. return this.append({ $addFields: fields });
  169. };
  170. /**
  171. * Appends a new $project operator to this aggregate pipeline.
  172. *
  173. * Mongoose query [selection syntax](#query_Query-select) is also supported.
  174. *
  175. * ####Examples:
  176. *
  177. * // include a, include b, exclude _id
  178. * aggregate.project("a b -_id");
  179. *
  180. * // or you may use object notation, useful when
  181. * // you have keys already prefixed with a "-"
  182. * aggregate.project({a: 1, b: 1, _id: 0});
  183. *
  184. * // reshaping documents
  185. * aggregate.project({
  186. * newField: '$b.nested'
  187. * , plusTen: { $add: ['$val', 10]}
  188. * , sub: {
  189. * name: '$a'
  190. * }
  191. * })
  192. *
  193. * // etc
  194. * aggregate.project({ salary_k: { $divide: [ "$salary", 1000 ] } });
  195. *
  196. * @param {Object|String} arg field specification
  197. * @see projection http://docs.mongodb.org/manual/reference/aggregation/project/
  198. * @return {Aggregate}
  199. * @api public
  200. */
  201. Aggregate.prototype.project = function(arg) {
  202. const fields = {};
  203. if (typeof arg === 'object' && !Array.isArray(arg)) {
  204. Object.keys(arg).forEach(function(field) {
  205. fields[field] = arg[field];
  206. });
  207. } else if (arguments.length === 1 && typeof arg === 'string') {
  208. arg.split(/\s+/).forEach(function(field) {
  209. if (!field) {
  210. return;
  211. }
  212. const include = field[0] === '-' ? 0 : 1;
  213. if (include === 0) {
  214. field = field.substring(1);
  215. }
  216. fields[field] = include;
  217. });
  218. } else {
  219. throw new Error('Invalid project() argument. Must be string or object');
  220. }
  221. return this.append({ $project: fields });
  222. };
  223. /**
  224. * Appends a new custom $group operator to this aggregate pipeline.
  225. *
  226. * ####Examples:
  227. *
  228. * aggregate.group({ _id: "$department" });
  229. *
  230. * @see $group http://docs.mongodb.org/manual/reference/aggregation/group/
  231. * @method group
  232. * @memberOf Aggregate
  233. * @instance
  234. * @param {Object} arg $group operator contents
  235. * @return {Aggregate}
  236. * @api public
  237. */
  238. /**
  239. * Appends a new custom $match operator to this aggregate pipeline.
  240. *
  241. * ####Examples:
  242. *
  243. * aggregate.match({ department: { $in: [ "sales", "engineering" ] } });
  244. *
  245. * @see $match http://docs.mongodb.org/manual/reference/aggregation/match/
  246. * @method match
  247. * @memberOf Aggregate
  248. * @instance
  249. * @param {Object} arg $match operator contents
  250. * @return {Aggregate}
  251. * @api public
  252. */
  253. /**
  254. * Appends a new $skip operator to this aggregate pipeline.
  255. *
  256. * ####Examples:
  257. *
  258. * aggregate.skip(10);
  259. *
  260. * @see $skip http://docs.mongodb.org/manual/reference/aggregation/skip/
  261. * @method skip
  262. * @memberOf Aggregate
  263. * @instance
  264. * @param {Number} num number of records to skip before next stage
  265. * @return {Aggregate}
  266. * @api public
  267. */
  268. /**
  269. * Appends a new $limit operator to this aggregate pipeline.
  270. *
  271. * ####Examples:
  272. *
  273. * aggregate.limit(10);
  274. *
  275. * @see $limit http://docs.mongodb.org/manual/reference/aggregation/limit/
  276. * @method limit
  277. * @memberOf Aggregate
  278. * @instance
  279. * @param {Number} num maximum number of records to pass to the next stage
  280. * @return {Aggregate}
  281. * @api public
  282. */
  283. /**
  284. * Appends a new $geoNear operator to this aggregate pipeline.
  285. *
  286. * ####NOTE:
  287. *
  288. * **MUST** be used as the first operator in the pipeline.
  289. *
  290. * ####Examples:
  291. *
  292. * aggregate.near({
  293. * near: [40.724, -73.997],
  294. * distanceField: "dist.calculated", // required
  295. * maxDistance: 0.008,
  296. * query: { type: "public" },
  297. * includeLocs: "dist.location",
  298. * uniqueDocs: true,
  299. * num: 5
  300. * });
  301. *
  302. * @see $geoNear http://docs.mongodb.org/manual/reference/aggregation/geoNear/
  303. * @method near
  304. * @memberOf Aggregate
  305. * @instance
  306. * @param {Object} arg
  307. * @return {Aggregate}
  308. * @api public
  309. */
  310. Aggregate.prototype.near = function(arg) {
  311. const op = {};
  312. op.$geoNear = arg;
  313. return this.append(op);
  314. };
  315. /*!
  316. * define methods
  317. */
  318. 'group match skip limit out'.split(' ').forEach(function($operator) {
  319. Aggregate.prototype[$operator] = function(arg) {
  320. const op = {};
  321. op['$' + $operator] = arg;
  322. return this.append(op);
  323. };
  324. });
  325. /**
  326. * Appends new custom $unwind operator(s) to this aggregate pipeline.
  327. *
  328. * Note that the `$unwind` operator requires the path name to start with '$'.
  329. * Mongoose will prepend '$' if the specified field doesn't start '$'.
  330. *
  331. * ####Examples:
  332. *
  333. * aggregate.unwind("tags");
  334. * aggregate.unwind("a", "b", "c");
  335. * aggregate.unwind({ path: '$tags', preserveNullAndEmptyArrays: true });
  336. *
  337. * @see $unwind http://docs.mongodb.org/manual/reference/aggregation/unwind/
  338. * @param {String|Object} fields the field(s) to unwind, either as field names or as [objects with options](https://docs.mongodb.com/manual/reference/operator/aggregation/unwind/#document-operand-with-options). If passing a string, prefixing the field name with '$' is optional. If passing an object, `path` must start with '$'.
  339. * @return {Aggregate}
  340. * @api public
  341. */
  342. Aggregate.prototype.unwind = function() {
  343. const args = [...arguments];
  344. const res = [];
  345. for (const arg of args) {
  346. if (arg && typeof arg === 'object') {
  347. res.push({ $unwind: arg });
  348. } else if (typeof arg === 'string') {
  349. res.push({
  350. $unwind: (arg && arg.startsWith('$')) ? arg : '$' + arg
  351. });
  352. } else {
  353. throw new Error('Invalid arg "' + arg + '" to unwind(), ' +
  354. 'must be string or object');
  355. }
  356. }
  357. return this.append.apply(this, res);
  358. };
  359. /**
  360. * Appends a new $replaceRoot operator to this aggregate pipeline.
  361. *
  362. * Note that the `$replaceRoot` operator requires field strings to start with '$'.
  363. * If you are passing in a string Mongoose will prepend '$' if the specified field doesn't start '$'.
  364. * If you are passing in an object the strings in your expression will not be altered.
  365. *
  366. * ####Examples:
  367. *
  368. * aggregate.replaceRoot("user");
  369. *
  370. * aggregate.replaceRoot({ x: { $concat: ['$this', '$that'] } });
  371. *
  372. * @see $replaceRoot https://docs.mongodb.org/manual/reference/operator/aggregation/replaceRoot
  373. * @param {String|Object} the field or document which will become the new root document
  374. * @return {Aggregate}
  375. * @api public
  376. */
  377. Aggregate.prototype.replaceRoot = function(newRoot) {
  378. let ret;
  379. if (typeof newRoot === 'string') {
  380. ret = newRoot.startsWith('$') ? newRoot : '$' + newRoot;
  381. } else {
  382. ret = newRoot;
  383. }
  384. return this.append({
  385. $replaceRoot: {
  386. newRoot: ret
  387. }
  388. });
  389. };
  390. /**
  391. * Appends a new $count operator to this aggregate pipeline.
  392. *
  393. * ####Examples:
  394. *
  395. * aggregate.count("userCount");
  396. *
  397. * @see $count https://docs.mongodb.org/manual/reference/operator/aggregation/count
  398. * @param {String} the name of the count field
  399. * @return {Aggregate}
  400. * @api public
  401. */
  402. Aggregate.prototype.count = function(countName) {
  403. return this.append({ $count: countName });
  404. };
  405. /**
  406. * Appends a new $sortByCount operator to this aggregate pipeline. Accepts either a string field name
  407. * or a pipeline object.
  408. *
  409. * Note that the `$sortByCount` operator requires the new root to start with '$'.
  410. * Mongoose will prepend '$' if the specified field name doesn't start with '$'.
  411. *
  412. * ####Examples:
  413. *
  414. * aggregate.sortByCount('users');
  415. * aggregate.sortByCount({ $mergeObjects: [ "$employee", "$business" ] })
  416. *
  417. * @see $sortByCount https://docs.mongodb.com/manual/reference/operator/aggregation/sortByCount/
  418. * @param {Object|String} arg
  419. * @return {Aggregate} this
  420. * @api public
  421. */
  422. Aggregate.prototype.sortByCount = function(arg) {
  423. if (arg && typeof arg === 'object') {
  424. return this.append({ $sortByCount: arg });
  425. } else if (typeof arg === 'string') {
  426. return this.append({
  427. $sortByCount: (arg && arg.startsWith('$')) ? arg : '$' + arg
  428. });
  429. } else {
  430. throw new TypeError('Invalid arg "' + arg + '" to sortByCount(), ' +
  431. 'must be string or object');
  432. }
  433. };
  434. /**
  435. * Appends new custom $lookup operator to this aggregate pipeline.
  436. *
  437. * ####Examples:
  438. *
  439. * aggregate.lookup({ from: 'users', localField: 'userId', foreignField: '_id', as: 'users' });
  440. *
  441. * @see $lookup https://docs.mongodb.org/manual/reference/operator/aggregation/lookup/#pipe._S_lookup
  442. * @param {Object} options to $lookup as described in the above link
  443. * @return {Aggregate}* @api public
  444. */
  445. Aggregate.prototype.lookup = function(options) {
  446. return this.append({ $lookup: options });
  447. };
  448. /**
  449. * Appends new custom $graphLookup operator(s) to this aggregate pipeline, performing a recursive search on a collection.
  450. *
  451. * Note that graphLookup can only consume at most 100MB of memory, and does not allow disk use even if `{ allowDiskUse: true }` is specified.
  452. *
  453. * #### Examples:
  454. * // Suppose we have a collection of courses, where a document might look like `{ _id: 0, name: 'Calculus', prerequisite: 'Trigonometry'}` and `{ _id: 0, name: 'Trigonometry', prerequisite: 'Algebra' }`
  455. * aggregate.graphLookup({ from: 'courses', startWith: '$prerequisite', connectFromField: 'prerequisite', connectToField: 'name', as: 'prerequisites', maxDepth: 3 }) // this will recursively search the 'courses' collection up to 3 prerequisites
  456. *
  457. * @see $graphLookup https://docs.mongodb.com/manual/reference/operator/aggregation/graphLookup/#pipe._S_graphLookup
  458. * @param {Object} options to $graphLookup as described in the above link
  459. * @return {Aggregate}
  460. * @api public
  461. */
  462. Aggregate.prototype.graphLookup = function(options) {
  463. const cloneOptions = {};
  464. if (options) {
  465. if (!utils.isObject(options)) {
  466. throw new TypeError('Invalid graphLookup() argument. Must be an object.');
  467. }
  468. utils.mergeClone(cloneOptions, options);
  469. const startWith = cloneOptions.startWith;
  470. if (startWith && typeof startWith === 'string') {
  471. cloneOptions.startWith = cloneOptions.startWith.startsWith('$') ?
  472. cloneOptions.startWith :
  473. '$' + cloneOptions.startWith;
  474. }
  475. }
  476. return this.append({ $graphLookup: cloneOptions });
  477. };
  478. /**
  479. * Appends new custom $sample operator to this aggregate pipeline.
  480. *
  481. * ####Examples:
  482. *
  483. * aggregate.sample(3); // Add a pipeline that picks 3 random documents
  484. *
  485. * @see $sample https://docs.mongodb.org/manual/reference/operator/aggregation/sample/#pipe._S_sample
  486. * @param {Number} size number of random documents to pick
  487. * @return {Aggregate}
  488. * @api public
  489. */
  490. Aggregate.prototype.sample = function(size) {
  491. return this.append({ $sample: { size: size } });
  492. };
  493. /**
  494. * Appends a new $sort operator to this aggregate pipeline.
  495. *
  496. * If an object is passed, values allowed are `asc`, `desc`, `ascending`, `descending`, `1`, and `-1`.
  497. *
  498. * If a string is passed, it must be a space delimited list of path names. The sort order of each path is ascending unless the path name is prefixed with `-` which will be treated as descending.
  499. *
  500. * ####Examples:
  501. *
  502. * // these are equivalent
  503. * aggregate.sort({ field: 'asc', test: -1 });
  504. * aggregate.sort('field -test');
  505. *
  506. * @see $sort http://docs.mongodb.org/manual/reference/aggregation/sort/
  507. * @param {Object|String} arg
  508. * @return {Aggregate} this
  509. * @api public
  510. */
  511. Aggregate.prototype.sort = function(arg) {
  512. // TODO refactor to reuse the query builder logic
  513. const sort = {};
  514. if (getConstructorName(arg) === 'Object') {
  515. const desc = ['desc', 'descending', -1];
  516. Object.keys(arg).forEach(function(field) {
  517. // If sorting by text score, skip coercing into 1/-1
  518. if (arg[field] instanceof Object && arg[field].$meta) {
  519. sort[field] = arg[field];
  520. return;
  521. }
  522. sort[field] = desc.indexOf(arg[field]) === -1 ? 1 : -1;
  523. });
  524. } else if (arguments.length === 1 && typeof arg === 'string') {
  525. arg.split(/\s+/).forEach(function(field) {
  526. if (!field) {
  527. return;
  528. }
  529. const ascend = field[0] === '-' ? -1 : 1;
  530. if (ascend === -1) {
  531. field = field.substring(1);
  532. }
  533. sort[field] = ascend;
  534. });
  535. } else {
  536. throw new TypeError('Invalid sort() argument. Must be a string or object.');
  537. }
  538. return this.append({ $sort: sort });
  539. };
  540. /**
  541. * Appends new $unionWith operator to this aggregate pipeline.
  542. *
  543. * ####Examples:
  544. *
  545. * aggregate.unionWith({ coll: 'users', pipeline: [ { $match: { _id: 1 } } ] });
  546. *
  547. * @see $unionWith https://docs.mongodb.com/manual/reference/operator/aggregation/unionWith
  548. * @param {Object} options to $unionWith query as described in the above link
  549. * @return {Aggregate}* @api public
  550. */
  551. Aggregate.prototype.unionWith = function(options) {
  552. return this.append({ $unionWith: options });
  553. };
  554. /**
  555. * Sets the readPreference option for the aggregation query.
  556. *
  557. * ####Example:
  558. *
  559. * await Model.aggregate(pipeline).read('primaryPreferred');
  560. *
  561. * @param {String} pref one of the listed preference options or their aliases
  562. * @param {Array} [tags] optional tags for this query
  563. * @return {Aggregate} this
  564. * @api public
  565. * @see mongodb http://docs.mongodb.org/manual/applications/replication/#read-preference
  566. * @see driver http://mongodb.github.com/node-mongodb-native/driver-articles/anintroductionto1_1and2_2.html#read-preferences
  567. */
  568. Aggregate.prototype.read = function(pref, tags) {
  569. if (!this.options) {
  570. this.options = {};
  571. }
  572. read.call(this, pref, tags);
  573. return this;
  574. };
  575. /**
  576. * Sets the readConcern level for the aggregation query.
  577. *
  578. * ####Example:
  579. *
  580. * await Model.aggregate(pipeline).readConcern('majority');
  581. *
  582. * @param {String} level one of the listed read concern level or their aliases
  583. * @see mongodb https://docs.mongodb.com/manual/reference/read-concern/
  584. * @return {Aggregate} this
  585. * @api public
  586. */
  587. Aggregate.prototype.readConcern = function(level) {
  588. if (!this.options) {
  589. this.options = {};
  590. }
  591. readConcern.call(this, level);
  592. return this;
  593. };
  594. /**
  595. * Appends a new $redact operator to this aggregate pipeline.
  596. *
  597. * If 3 arguments are supplied, Mongoose will wrap them with if-then-else of $cond operator respectively
  598. * If `thenExpr` or `elseExpr` is string, make sure it starts with $$, like `$$DESCEND`, `$$PRUNE` or `$$KEEP`.
  599. *
  600. * ####Example:
  601. *
  602. * await Model.aggregate(pipeline).redact({
  603. * $cond: {
  604. * if: { $eq: [ '$level', 5 ] },
  605. * then: '$$PRUNE',
  606. * else: '$$DESCEND'
  607. * }
  608. * });
  609. *
  610. * // $redact often comes with $cond operator, you can also use the following syntax provided by mongoose
  611. * await Model.aggregate(pipeline).redact({ $eq: [ '$level', 5 ] }, '$$PRUNE', '$$DESCEND');
  612. *
  613. * @param {Object} expression redact options or conditional expression
  614. * @param {String|Object} [thenExpr] true case for the condition
  615. * @param {String|Object} [elseExpr] false case for the condition
  616. * @return {Aggregate} this
  617. * @see $redact https://docs.mongodb.com/manual/reference/operator/aggregation/redact/
  618. * @api public
  619. */
  620. Aggregate.prototype.redact = function(expression, thenExpr, elseExpr) {
  621. if (arguments.length === 3) {
  622. if ((typeof thenExpr === 'string' && !thenExpr.startsWith('$$')) ||
  623. (typeof elseExpr === 'string' && !elseExpr.startsWith('$$'))) {
  624. throw new Error('If thenExpr or elseExpr is string, it must start with $$. e.g. $$DESCEND, $$PRUNE, $$KEEP');
  625. }
  626. expression = {
  627. $cond: {
  628. if: expression,
  629. then: thenExpr,
  630. else: elseExpr
  631. }
  632. };
  633. } else if (arguments.length !== 1) {
  634. throw new TypeError('Invalid arguments');
  635. }
  636. return this.append({ $redact: expression });
  637. };
  638. /**
  639. * Execute the aggregation with explain
  640. *
  641. * ####Example:
  642. *
  643. * Model.aggregate(..).explain(callback)
  644. *
  645. * @param {String} verbosity
  646. * @param {Function} callback
  647. * @return {Promise}
  648. */
  649. Aggregate.prototype.explain = function(verbosity, callback) {
  650. const model = this._model;
  651. if (typeof verbosity === 'function') {
  652. callback = verbosity;
  653. verbosity = null;
  654. }
  655. return promiseOrCallback(callback, cb => {
  656. if (!this._pipeline.length) {
  657. const err = new Error('Aggregate has empty pipeline');
  658. return cb(err);
  659. }
  660. prepareDiscriminatorPipeline(this._pipeline, this._model.schema);
  661. model.hooks.execPre('aggregate', this, error => {
  662. if (error) {
  663. const _opts = { error: error };
  664. return model.hooks.execPost('aggregate', this, [null], _opts, error => {
  665. cb(error);
  666. });
  667. }
  668. model.collection.aggregate(this._pipeline, this.options, (error, cursor) => {
  669. if (error != null) {
  670. const _opts = { error: error };
  671. return model.hooks.execPost('aggregate', this, [null], _opts, error => {
  672. cb(error);
  673. });
  674. }
  675. if (verbosity != null) {
  676. cursor.explain(verbosity, (error, result) => {
  677. const _opts = { error: error };
  678. return model.hooks.execPost('aggregate', this, [result], _opts, error => {
  679. if (error) {
  680. return cb(error);
  681. }
  682. return cb(null, result);
  683. });
  684. });
  685. } else {
  686. cursor.explain((error, result) => {
  687. const _opts = { error: error };
  688. return model.hooks.execPost('aggregate', this, [result], _opts, error => {
  689. if (error) {
  690. return cb(error);
  691. }
  692. return cb(null, result);
  693. });
  694. });
  695. }
  696. });
  697. });
  698. }, model.events);
  699. };
  700. /**
  701. * Sets the allowDiskUse option for the aggregation query (ignored for < 2.6.0)
  702. *
  703. * ####Example:
  704. *
  705. * await Model.aggregate([{ $match: { foo: 'bar' } }]).allowDiskUse(true);
  706. *
  707. * @param {Boolean} value Should tell server it can use hard drive to store data during aggregation.
  708. * @param {Array} [tags] optional tags for this query
  709. * @see mongodb http://docs.mongodb.org/manual/reference/command/aggregate/
  710. */
  711. Aggregate.prototype.allowDiskUse = function(value) {
  712. this.options.allowDiskUse = value;
  713. return this;
  714. };
  715. /**
  716. * Sets the hint option for the aggregation query (ignored for < 3.6.0)
  717. *
  718. * ####Example:
  719. *
  720. * Model.aggregate(..).hint({ qty: 1, category: 1 }).exec(callback)
  721. *
  722. * @param {Object|String} value a hint object or the index name
  723. * @see mongodb http://docs.mongodb.org/manual/reference/command/aggregate/
  724. */
  725. Aggregate.prototype.hint = function(value) {
  726. this.options.hint = value;
  727. return this;
  728. };
  729. /**
  730. * Sets the session for this aggregation. Useful for [transactions](/docs/transactions.html).
  731. *
  732. * ####Example:
  733. *
  734. * const session = await Model.startSession();
  735. * await Model.aggregate(..).session(session);
  736. *
  737. * @param {ClientSession} session
  738. * @see mongodb http://docs.mongodb.org/manual/reference/command/aggregate/
  739. */
  740. Aggregate.prototype.session = function(session) {
  741. if (session == null) {
  742. delete this.options.session;
  743. } else {
  744. this.options.session = session;
  745. }
  746. return this;
  747. };
  748. /**
  749. * Lets you set arbitrary options, for middleware or plugins.
  750. *
  751. * ####Example:
  752. *
  753. * const agg = Model.aggregate(..).option({ allowDiskUse: true }); // Set the `allowDiskUse` option
  754. * agg.options; // `{ allowDiskUse: true }`
  755. *
  756. * @param {Object} options keys to merge into current options
  757. * @param [options.maxTimeMS] number limits the time this aggregation will run, see [MongoDB docs on `maxTimeMS`](https://docs.mongodb.com/manual/reference/operator/meta/maxTimeMS/)
  758. * @param [options.allowDiskUse] boolean if true, the MongoDB server will use the hard drive to store data during this aggregation
  759. * @param [options.collation] object see [`Aggregate.prototype.collation()`](./docs/api.html#aggregate_Aggregate-collation)
  760. * @param [options.session] ClientSession see [`Aggregate.prototype.session()`](./docs/api.html#aggregate_Aggregate-session)
  761. * @see mongodb http://docs.mongodb.org/manual/reference/command/aggregate/
  762. * @return {Aggregate} this
  763. * @api public
  764. */
  765. Aggregate.prototype.option = function(value) {
  766. for (const key in value) {
  767. this.options[key] = value[key];
  768. }
  769. return this;
  770. };
  771. /**
  772. * Sets the `cursor` option and executes this aggregation, returning an aggregation cursor.
  773. * Cursors are useful if you want to process the results of the aggregation one-at-a-time
  774. * because the aggregation result is too big to fit into memory.
  775. *
  776. * ####Example:
  777. *
  778. * const cursor = Model.aggregate(..).cursor({ batchSize: 1000 });
  779. * cursor.eachAsync(function(doc, i) {
  780. * // use doc
  781. * });
  782. *
  783. * @param {Object} options
  784. * @param {Number} options.batchSize set the cursor batch size
  785. * @param {Boolean} [options.useMongooseAggCursor] use experimental mongoose-specific aggregation cursor (for `eachAsync()` and other query cursor semantics)
  786. * @return {AggregationCursor} cursor representing this aggregation
  787. * @api public
  788. * @see mongodb http://mongodb.github.io/node-mongodb-native/2.0/api/AggregationCursor.html
  789. */
  790. Aggregate.prototype.cursor = function(options) {
  791. if (!this.options) {
  792. this.options = {};
  793. }
  794. this.options.cursor = options || {};
  795. return new AggregationCursor(this); // return this;
  796. };
  797. /**
  798. * Adds a collation
  799. *
  800. * ####Example:
  801. *
  802. * const res = await Model.aggregate(pipeline).collation({ locale: 'en_US', strength: 1 });
  803. *
  804. * @param {Object} collation options
  805. * @return {Aggregate} this
  806. * @api public
  807. * @see mongodb http://mongodb.github.io/node-mongodb-native/2.2/api/Collection.html#aggregate
  808. */
  809. Aggregate.prototype.collation = function(collation) {
  810. if (!this.options) {
  811. this.options = {};
  812. }
  813. this.options.collation = collation;
  814. return this;
  815. };
  816. /**
  817. * Combines multiple aggregation pipelines.
  818. *
  819. * ####Example:
  820. *
  821. * const res = await Model.aggregate().facet({
  822. * books: [{ groupBy: '$author' }],
  823. * price: [{ $bucketAuto: { groupBy: '$price', buckets: 2 } }]
  824. * });
  825. *
  826. * // Output: { books: [...], price: [{...}, {...}] }
  827. *
  828. * @param {Object} facet options
  829. * @return {Aggregate} this
  830. * @see $facet https://docs.mongodb.com/v3.4/reference/operator/aggregation/facet/
  831. * @api public
  832. */
  833. Aggregate.prototype.facet = function(options) {
  834. return this.append({ $facet: options });
  835. };
  836. /**
  837. * Helper for [Atlas Text Search](https://docs.atlas.mongodb.com/reference/atlas-search/tutorial/)'s
  838. * `$search` stage.
  839. *
  840. * ####Example:
  841. *
  842. * const res = await Model.aggregate().
  843. * search({
  844. * text: {
  845. * query: 'baseball',
  846. * path: 'plot'
  847. * }
  848. * });
  849. *
  850. * // Output: [{ plot: '...', title: '...' }]
  851. *
  852. * @param {Object} $search options
  853. * @return {Aggregate} this
  854. * @see $search https://docs.atlas.mongodb.com/reference/atlas-search/tutorial/
  855. * @api public
  856. */
  857. Aggregate.prototype.search = function(options) {
  858. return this.append({ $search: options });
  859. };
  860. /**
  861. * Returns the current pipeline
  862. *
  863. * ####Example:
  864. *
  865. * MyModel.aggregate().match({ test: 1 }).pipeline(); // [{ $match: { test: 1 } }]
  866. *
  867. * @return {Array}
  868. * @api public
  869. */
  870. Aggregate.prototype.pipeline = function() {
  871. return this._pipeline;
  872. };
  873. /**
  874. * Executes the aggregate pipeline on the currently bound Model.
  875. *
  876. * ####Example:
  877. *
  878. * aggregate.exec(callback);
  879. *
  880. * // Because a promise is returned, the `callback` is optional.
  881. * const promise = aggregate.exec();
  882. * promise.then(..);
  883. *
  884. * @see Promise #promise_Promise
  885. * @param {Function} [callback]
  886. * @return {Promise}
  887. * @api public
  888. */
  889. Aggregate.prototype.exec = function(callback) {
  890. if (!this._model) {
  891. throw new Error('Aggregate not bound to any Model');
  892. }
  893. const model = this._model;
  894. const collection = this._model.collection;
  895. applyGlobalMaxTimeMS(this.options, model);
  896. if (this.options && this.options.cursor) {
  897. return new AggregationCursor(this);
  898. }
  899. return promiseOrCallback(callback, cb => {
  900. prepareDiscriminatorPipeline(this._pipeline, this._model.schema);
  901. stringifyFunctionOperators(this._pipeline);
  902. model.hooks.execPre('aggregate', this, error => {
  903. if (error) {
  904. const _opts = { error: error };
  905. return model.hooks.execPost('aggregate', this, [null], _opts, error => {
  906. cb(error);
  907. });
  908. }
  909. if (!this._pipeline.length) {
  910. return cb(new Error('Aggregate has empty pipeline'));
  911. }
  912. const options = utils.clone(this.options || {});
  913. collection.aggregate(this._pipeline, options, (err, cursor) => {
  914. if (err != null) {
  915. return cb(err);
  916. }
  917. cursor.toArray((error, result) => {
  918. const _opts = { error: error };
  919. model.hooks.execPost('aggregate', this, [result], _opts, (error, result) => {
  920. if (error) {
  921. return cb(error);
  922. }
  923. cb(null, result);
  924. });
  925. });
  926. });
  927. });
  928. }, model.events);
  929. };
  930. /**
  931. * Provides promise for aggregate.
  932. *
  933. * ####Example:
  934. *
  935. * Model.aggregate(..).then(successCallback, errorCallback);
  936. *
  937. * @see Promise #promise_Promise
  938. * @param {Function} [resolve] successCallback
  939. * @param {Function} [reject] errorCallback
  940. * @return {Promise}
  941. */
  942. Aggregate.prototype.then = function(resolve, reject) {
  943. return this.exec().then(resolve, reject);
  944. };
  945. /**
  946. * Executes the query returning a `Promise` which will be
  947. * resolved with either the doc(s) or rejected with the error.
  948. * Like [`.then()`](#query_Query-then), but only takes a rejection handler.
  949. *
  950. * @param {Function} [reject]
  951. * @return {Promise}
  952. * @api public
  953. */
  954. Aggregate.prototype.catch = function(reject) {
  955. return this.exec().then(null, reject);
  956. };
  957. /**
  958. * Returns an asyncIterator for use with [`for/await/of` loops](https://thecodebarbarian.com/getting-started-with-async-iterators-in-node-js
  959. * You do not need to call this function explicitly, the JavaScript runtime
  960. * will call it for you.
  961. *
  962. * ####Example
  963. *
  964. * const agg = Model.aggregate([{ $match: { age: { $gte: 25 } } }]);
  965. * for await (const doc of agg) {
  966. * console.log(doc.name);
  967. * }
  968. *
  969. * Node.js 10.x supports async iterators natively without any flags. You can
  970. * enable async iterators in Node.js 8.x using the [`--harmony_async_iteration` flag](https://github.com/tc39/proposal-async-iteration/issues/117#issuecomment-346695187).
  971. *
  972. * **Note:** This function is not set if `Symbol.asyncIterator` is undefined. If
  973. * `Symbol.asyncIterator` is undefined, that means your Node.js version does not
  974. * support async iterators.
  975. *
  976. * @method Symbol.asyncIterator
  977. * @memberOf Aggregate
  978. * @instance
  979. * @api public
  980. */
  981. if (Symbol.asyncIterator != null) {
  982. Aggregate.prototype[Symbol.asyncIterator] = function() {
  983. return this.cursor({ useMongooseAggCursor: true }).
  984. transformNull().
  985. _transformForAsyncIterator();
  986. };
  987. }
  988. /*!
  989. * Helpers
  990. */
  991. /**
  992. * Checks whether an object is likely a pipeline operator
  993. *
  994. * @param {Object} obj object to check
  995. * @return {Boolean}
  996. * @api private
  997. */
  998. function isOperator(obj) {
  999. if (typeof obj !== 'object') {
  1000. return false;
  1001. }
  1002. const k = Object.keys(obj);
  1003. return k.length === 1 && k.some(key => { return key[0] === '$'; });
  1004. }
  1005. /*!
  1006. * Adds the appropriate `$match` pipeline step to the top of an aggregate's
  1007. * pipeline, should it's model is a non-root discriminator type. This is
  1008. * analogous to the `prepareDiscriminatorCriteria` function in `lib/query.js`.
  1009. *
  1010. * @param {Aggregate} aggregate Aggregate to prepare
  1011. */
  1012. Aggregate._prepareDiscriminatorPipeline = prepareDiscriminatorPipeline;
  1013. /*!
  1014. * Exports
  1015. */
  1016. module.exports = Aggregate;