aggregate.js 32 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129
  1. 'use strict';
  2. /*!
  3. * Module dependencies
  4. */
  5. const AggregationCursor = require('./cursor/AggregationCursor');
  6. const Query = require('./query');
  7. const applyGlobalMaxTimeMS = require('./helpers/query/applyGlobalMaxTimeMS');
  8. const promiseOrCallback = require('./helpers/promiseOrCallback');
  9. const util = require('util');
  10. const utils = require('./utils');
  11. const read = Query.prototype.read;
  12. const readConcern = Query.prototype.readConcern;
  13. /**
  14. * Aggregate constructor used for building aggregation pipelines. Do not
  15. * instantiate this class directly, use [Model.aggregate()](/docs/api.html#model_Model.aggregate) instead.
  16. *
  17. * ####Example:
  18. *
  19. * const aggregate = Model.aggregate([
  20. * { $project: { a: 1, b: 1 } },
  21. * { $skip: 5 }
  22. * ]);
  23. *
  24. * Model.
  25. * aggregate([{ $match: { age: { $gte: 21 }}}]).
  26. * unwind('tags').
  27. * exec(callback);
  28. *
  29. * ####Note:
  30. *
  31. * - The documents returned are plain javascript objects, not mongoose documents (since any shape of document can be returned).
  32. * - Mongoose does **not** cast pipeline stages. The below will **not** work unless `_id` is a string in the database
  33. *
  34. * ```javascript
  35. * new Aggregate([{ $match: { _id: '00000000000000000000000a' } }]);
  36. * // Do this instead to cast to an ObjectId
  37. * new Aggregate([{ $match: { _id: mongoose.Types.ObjectId('00000000000000000000000a') } }]);
  38. * ```
  39. *
  40. * @see MongoDB http://docs.mongodb.org/manual/applications/aggregation/
  41. * @see driver http://mongodb.github.com/node-mongodb-native/api-generated/collection.html#aggregate
  42. * @param {Array} [pipeline] aggregation pipeline as an array of objects
  43. * @api public
  44. */
  45. function Aggregate(pipeline) {
  46. this._pipeline = [];
  47. this._model = undefined;
  48. this.options = {};
  49. if (arguments.length === 1 && util.isArray(pipeline)) {
  50. this.append.apply(this, pipeline);
  51. }
  52. }
  53. /**
  54. * Contains options passed down to the [aggregate command](https://docs.mongodb.com/manual/reference/command/aggregate/).
  55. * Supported options are:
  56. *
  57. * - `readPreference`
  58. * - [`cursor`](./api.html#aggregate_Aggregate-cursor)
  59. * - [`explain`](./api.html#aggregate_Aggregate-explain)
  60. * - [`allowDiskUse`](./api.html#aggregate_Aggregate-allowDiskUse)
  61. * - `maxTimeMS`
  62. * - `bypassDocumentValidation`
  63. * - `raw`
  64. * - `promoteLongs`
  65. * - `promoteValues`
  66. * - `promoteBuffers`
  67. * - [`collation`](./api.html#aggregate_Aggregate-collation)
  68. * - `comment`
  69. * - [`session`](./api.html#aggregate_Aggregate-session)
  70. *
  71. * @property options
  72. * @memberOf Aggregate
  73. * @api public
  74. */
  75. Aggregate.prototype.options;
  76. /**
  77. * Get/set the model that this aggregation will execute on.
  78. *
  79. * ####Example:
  80. * const aggregate = MyModel.aggregate([{ $match: { answer: 42 } }]);
  81. * aggregate.model() === MyModel; // true
  82. *
  83. * // Change the model. There's rarely any reason to do this.
  84. * aggregate.model(SomeOtherModel);
  85. * aggregate.model() === SomeOtherModel; // true
  86. *
  87. * @param {Model} [model] the model to which the aggregate is to be bound
  88. * @return {Aggregate|Model} if model is passed, will return `this`, otherwise will return the model
  89. * @api public
  90. */
  91. Aggregate.prototype.model = function(model) {
  92. if (arguments.length === 0) {
  93. return this._model;
  94. }
  95. this._model = model;
  96. if (model.schema != null) {
  97. if (this.options.readPreference == null &&
  98. model.schema.options.read != null) {
  99. this.options.readPreference = model.schema.options.read;
  100. }
  101. if (this.options.collation == null &&
  102. model.schema.options.collation != null) {
  103. this.options.collation = model.schema.options.collation;
  104. }
  105. }
  106. return this;
  107. };
  108. /**
  109. * Appends new operators to this aggregate pipeline
  110. *
  111. * ####Examples:
  112. *
  113. * aggregate.append({ $project: { field: 1 }}, { $limit: 2 });
  114. *
  115. * // or pass an array
  116. * var pipeline = [{ $match: { daw: 'Logic Audio X' }} ];
  117. * aggregate.append(pipeline);
  118. *
  119. * @param {Object} ops operator(s) to append
  120. * @return {Aggregate}
  121. * @api public
  122. */
  123. Aggregate.prototype.append = function() {
  124. const args = (arguments.length === 1 && util.isArray(arguments[0]))
  125. ? arguments[0]
  126. : utils.args(arguments);
  127. if (!args.every(isOperator)) {
  128. throw new Error('Arguments must be aggregate pipeline operators');
  129. }
  130. this._pipeline = this._pipeline.concat(args);
  131. return this;
  132. };
  133. /**
  134. * Appends a new $addFields operator to this aggregate pipeline.
  135. * Requires MongoDB v3.4+ to work
  136. *
  137. * ####Examples:
  138. *
  139. * // adding new fields based on existing fields
  140. * aggregate.addFields({
  141. * newField: '$b.nested'
  142. * , plusTen: { $add: ['$val', 10]}
  143. * , sub: {
  144. * name: '$a'
  145. * }
  146. * })
  147. *
  148. * // etc
  149. * aggregate.addFields({ salary_k: { $divide: [ "$salary", 1000 ] } });
  150. *
  151. * @param {Object} arg field specification
  152. * @see $addFields https://docs.mongodb.com/manual/reference/operator/aggregation/addFields/
  153. * @return {Aggregate}
  154. * @api public
  155. */
  156. Aggregate.prototype.addFields = function(arg) {
  157. const fields = {};
  158. if (typeof arg === 'object' && !util.isArray(arg)) {
  159. Object.keys(arg).forEach(function(field) {
  160. fields[field] = arg[field];
  161. });
  162. } else {
  163. throw new Error('Invalid addFields() argument. Must be an object');
  164. }
  165. return this.append({ $addFields: fields });
  166. };
  167. /**
  168. * Appends a new $project operator to this aggregate pipeline.
  169. *
  170. * Mongoose query [selection syntax](#query_Query-select) is also supported.
  171. *
  172. * ####Examples:
  173. *
  174. * // include a, include b, exclude _id
  175. * aggregate.project("a b -_id");
  176. *
  177. * // or you may use object notation, useful when
  178. * // you have keys already prefixed with a "-"
  179. * aggregate.project({a: 1, b: 1, _id: 0});
  180. *
  181. * // reshaping documents
  182. * aggregate.project({
  183. * newField: '$b.nested'
  184. * , plusTen: { $add: ['$val', 10]}
  185. * , sub: {
  186. * name: '$a'
  187. * }
  188. * })
  189. *
  190. * // etc
  191. * aggregate.project({ salary_k: { $divide: [ "$salary", 1000 ] } });
  192. *
  193. * @param {Object|String} arg field specification
  194. * @see projection http://docs.mongodb.org/manual/reference/aggregation/project/
  195. * @return {Aggregate}
  196. * @api public
  197. */
  198. Aggregate.prototype.project = function(arg) {
  199. const fields = {};
  200. if (typeof arg === 'object' && !util.isArray(arg)) {
  201. Object.keys(arg).forEach(function(field) {
  202. fields[field] = arg[field];
  203. });
  204. } else if (arguments.length === 1 && typeof arg === 'string') {
  205. arg.split(/\s+/).forEach(function(field) {
  206. if (!field) {
  207. return;
  208. }
  209. const include = field[0] === '-' ? 0 : 1;
  210. if (include === 0) {
  211. field = field.substring(1);
  212. }
  213. fields[field] = include;
  214. });
  215. } else {
  216. throw new Error('Invalid project() argument. Must be string or object');
  217. }
  218. return this.append({ $project: fields });
  219. };
  220. /**
  221. * Appends a new custom $group operator to this aggregate pipeline.
  222. *
  223. * ####Examples:
  224. *
  225. * aggregate.group({ _id: "$department" });
  226. *
  227. * @see $group http://docs.mongodb.org/manual/reference/aggregation/group/
  228. * @method group
  229. * @memberOf Aggregate
  230. * @instance
  231. * @param {Object} arg $group operator contents
  232. * @return {Aggregate}
  233. * @api public
  234. */
  235. /**
  236. * Appends a new custom $match operator to this aggregate pipeline.
  237. *
  238. * ####Examples:
  239. *
  240. * aggregate.match({ department: { $in: [ "sales", "engineering" ] } });
  241. *
  242. * @see $match http://docs.mongodb.org/manual/reference/aggregation/match/
  243. * @method match
  244. * @memberOf Aggregate
  245. * @instance
  246. * @param {Object} arg $match operator contents
  247. * @return {Aggregate}
  248. * @api public
  249. */
  250. /**
  251. * Appends a new $skip operator to this aggregate pipeline.
  252. *
  253. * ####Examples:
  254. *
  255. * aggregate.skip(10);
  256. *
  257. * @see $skip http://docs.mongodb.org/manual/reference/aggregation/skip/
  258. * @method skip
  259. * @memberOf Aggregate
  260. * @instance
  261. * @param {Number} num number of records to skip before next stage
  262. * @return {Aggregate}
  263. * @api public
  264. */
  265. /**
  266. * Appends a new $limit operator to this aggregate pipeline.
  267. *
  268. * ####Examples:
  269. *
  270. * aggregate.limit(10);
  271. *
  272. * @see $limit http://docs.mongodb.org/manual/reference/aggregation/limit/
  273. * @method limit
  274. * @memberOf Aggregate
  275. * @instance
  276. * @param {Number} num maximum number of records to pass to the next stage
  277. * @return {Aggregate}
  278. * @api public
  279. */
  280. /**
  281. * Appends a new $geoNear operator to this aggregate pipeline.
  282. *
  283. * ####NOTE:
  284. *
  285. * **MUST** be used as the first operator in the pipeline.
  286. *
  287. * ####Examples:
  288. *
  289. * aggregate.near({
  290. * near: [40.724, -73.997],
  291. * distanceField: "dist.calculated", // required
  292. * maxDistance: 0.008,
  293. * query: { type: "public" },
  294. * includeLocs: "dist.location",
  295. * uniqueDocs: true,
  296. * num: 5
  297. * });
  298. *
  299. * @see $geoNear http://docs.mongodb.org/manual/reference/aggregation/geoNear/
  300. * @method near
  301. * @memberOf Aggregate
  302. * @instance
  303. * @param {Object} arg
  304. * @return {Aggregate}
  305. * @api public
  306. */
  307. Aggregate.prototype.near = function(arg) {
  308. const op = {};
  309. op.$geoNear = arg;
  310. return this.append(op);
  311. };
  312. /*!
  313. * define methods
  314. */
  315. 'group match skip limit out'.split(' ').forEach(function($operator) {
  316. Aggregate.prototype[$operator] = function(arg) {
  317. const op = {};
  318. op['$' + $operator] = arg;
  319. return this.append(op);
  320. };
  321. });
  322. /**
  323. * Appends new custom $unwind operator(s) to this aggregate pipeline.
  324. *
  325. * Note that the `$unwind` operator requires the path name to start with '$'.
  326. * Mongoose will prepend '$' if the specified field doesn't start '$'.
  327. *
  328. * ####Examples:
  329. *
  330. * aggregate.unwind("tags");
  331. * aggregate.unwind("a", "b", "c");
  332. * aggregate.unwind({ path: '$tags', preserveNullAndEmptyArrays: true });
  333. *
  334. * @see $unwind http://docs.mongodb.org/manual/reference/aggregation/unwind/
  335. * @param {String|Object} fields the field(s) to unwind, either as field names or as [objects with options](https://docs.mongodb.com/manual/reference/operator/aggregation/unwind/#document-operand-with-options). If passing a string, prefixing the field name with '$' is optional. If passing an object, `path` must start with '$'.
  336. * @return {Aggregate}
  337. * @api public
  338. */
  339. Aggregate.prototype.unwind = function() {
  340. const args = utils.args(arguments);
  341. const res = [];
  342. for (const arg of args) {
  343. if (arg && typeof arg === 'object') {
  344. res.push({ $unwind: arg });
  345. } else if (typeof arg === 'string') {
  346. res.push({
  347. $unwind: (arg && arg.startsWith('$')) ? arg : '$' + arg
  348. });
  349. } else {
  350. throw new Error('Invalid arg "' + arg + '" to unwind(), ' +
  351. 'must be string or object');
  352. }
  353. }
  354. return this.append.apply(this, res);
  355. };
  356. /**
  357. * Appends a new $replaceRoot operator to this aggregate pipeline.
  358. *
  359. * Note that the `$replaceRoot` operator requires field strings to start with '$'.
  360. * If you are passing in a string Mongoose will prepend '$' if the specified field doesn't start '$'.
  361. * If you are passing in an object the strings in your expression will not be altered.
  362. *
  363. * ####Examples:
  364. *
  365. * aggregate.replaceRoot("user");
  366. *
  367. * aggregate.replaceRoot({ x: { $concat: ['$this', '$that'] } });
  368. *
  369. * @see $replaceRoot https://docs.mongodb.org/manual/reference/operator/aggregation/replaceRoot
  370. * @param {String|Object} the field or document which will become the new root document
  371. * @return {Aggregate}
  372. * @api public
  373. */
  374. Aggregate.prototype.replaceRoot = function(newRoot) {
  375. let ret;
  376. if (typeof newRoot === 'string') {
  377. ret = newRoot.startsWith('$') ? newRoot : '$' + newRoot;
  378. } else {
  379. ret = newRoot;
  380. }
  381. return this.append({
  382. $replaceRoot: {
  383. newRoot: ret
  384. }
  385. });
  386. };
  387. /**
  388. * Appends a new $count operator to this aggregate pipeline.
  389. *
  390. * ####Examples:
  391. *
  392. * aggregate.count("userCount");
  393. *
  394. * @see $count https://docs.mongodb.org/manual/reference/operator/aggregation/count
  395. * @param {String} the name of the count field
  396. * @return {Aggregate}
  397. * @api public
  398. */
  399. Aggregate.prototype.count = function(countName) {
  400. return this.append({ $count: countName });
  401. };
  402. /**
  403. * Appends a new $sortByCount operator to this aggregate pipeline. Accepts either a string field name
  404. * or a pipeline object.
  405. *
  406. * Note that the `$sortByCount` operator requires the new root to start with '$'.
  407. * Mongoose will prepend '$' if the specified field name doesn't start with '$'.
  408. *
  409. * ####Examples:
  410. *
  411. * aggregate.sortByCount('users');
  412. * aggregate.sortByCount({ $mergeObjects: [ "$employee", "$business" ] })
  413. *
  414. * @see $sortByCount https://docs.mongodb.com/manual/reference/operator/aggregation/sortByCount/
  415. * @param {Object|String} arg
  416. * @return {Aggregate} this
  417. * @api public
  418. */
  419. Aggregate.prototype.sortByCount = function(arg) {
  420. if (arg && typeof arg === 'object') {
  421. return this.append({ $sortByCount: arg });
  422. } else if (typeof arg === 'string') {
  423. return this.append({
  424. $sortByCount: (arg && arg.startsWith('$')) ? arg : '$' + arg
  425. });
  426. } else {
  427. throw new TypeError('Invalid arg "' + arg + '" to sortByCount(), ' +
  428. 'must be string or object');
  429. }
  430. };
  431. /**
  432. * Appends new custom $lookup operator(s) to this aggregate pipeline.
  433. *
  434. * ####Examples:
  435. *
  436. * aggregate.lookup({ from: 'users', localField: 'userId', foreignField: '_id', as: 'users' });
  437. *
  438. * @see $lookup https://docs.mongodb.org/manual/reference/operator/aggregation/lookup/#pipe._S_lookup
  439. * @param {Object} options to $lookup as described in the above link
  440. * @return {Aggregate}
  441. * @api public
  442. */
  443. Aggregate.prototype.lookup = function(options) {
  444. return this.append({ $lookup: options });
  445. };
  446. /**
  447. * Appends new custom $graphLookup operator(s) to this aggregate pipeline, performing a recursive search on a collection.
  448. *
  449. * Note that graphLookup can only consume at most 100MB of memory, and does not allow disk use even if `{ allowDiskUse: true }` is specified.
  450. *
  451. * #### Examples:
  452. * // Suppose we have a collection of courses, where a document might look like `{ _id: 0, name: 'Calculus', prerequisite: 'Trigonometry'}` and `{ _id: 0, name: 'Trigonometry', prerequisite: 'Algebra' }`
  453. * aggregate.graphLookup({ from: 'courses', startWith: '$prerequisite', connectFromField: 'prerequisite', connectToField: 'name', as: 'prerequisites', maxDepth: 3 }) // this will recursively search the 'courses' collection up to 3 prerequisites
  454. *
  455. * @see $graphLookup https://docs.mongodb.com/manual/reference/operator/aggregation/graphLookup/#pipe._S_graphLookup
  456. * @param {Object} options to $graphLookup as described in the above link
  457. * @return {Aggregate}
  458. * @api public
  459. */
  460. Aggregate.prototype.graphLookup = function(options) {
  461. const cloneOptions = {};
  462. if (options) {
  463. if (!utils.isObject(options)) {
  464. throw new TypeError('Invalid graphLookup() argument. Must be an object.');
  465. }
  466. utils.mergeClone(cloneOptions, options);
  467. const startWith = cloneOptions.startWith;
  468. if (startWith && typeof startWith === 'string') {
  469. cloneOptions.startWith = cloneOptions.startWith.startsWith('$') ?
  470. cloneOptions.startWith :
  471. '$' + cloneOptions.startWith;
  472. }
  473. }
  474. return this.append({ $graphLookup: cloneOptions });
  475. };
  476. /**
  477. * Appends new custom $sample operator(s) to this aggregate pipeline.
  478. *
  479. * ####Examples:
  480. *
  481. * aggregate.sample(3); // Add a pipeline that picks 3 random documents
  482. *
  483. * @see $sample https://docs.mongodb.org/manual/reference/operator/aggregation/sample/#pipe._S_sample
  484. * @param {Number} size number of random documents to pick
  485. * @return {Aggregate}
  486. * @api public
  487. */
  488. Aggregate.prototype.sample = function(size) {
  489. return this.append({ $sample: { size: size } });
  490. };
  491. /**
  492. * Appends a new $sort operator to this aggregate pipeline.
  493. *
  494. * If an object is passed, values allowed are `asc`, `desc`, `ascending`, `descending`, `1`, and `-1`.
  495. *
  496. * If a string is passed, it must be a space delimited list of path names. The sort order of each path is ascending unless the path name is prefixed with `-` which will be treated as descending.
  497. *
  498. * ####Examples:
  499. *
  500. * // these are equivalent
  501. * aggregate.sort({ field: 'asc', test: -1 });
  502. * aggregate.sort('field -test');
  503. *
  504. * @see $sort http://docs.mongodb.org/manual/reference/aggregation/sort/
  505. * @param {Object|String} arg
  506. * @return {Aggregate} this
  507. * @api public
  508. */
  509. Aggregate.prototype.sort = function(arg) {
  510. // TODO refactor to reuse the query builder logic
  511. const sort = {};
  512. if (arg.constructor.name === 'Object') {
  513. const desc = ['desc', 'descending', -1];
  514. Object.keys(arg).forEach(function(field) {
  515. // If sorting by text score, skip coercing into 1/-1
  516. if (arg[field] instanceof Object && arg[field].$meta) {
  517. sort[field] = arg[field];
  518. return;
  519. }
  520. sort[field] = desc.indexOf(arg[field]) === -1 ? 1 : -1;
  521. });
  522. } else if (arguments.length === 1 && typeof arg === 'string') {
  523. arg.split(/\s+/).forEach(function(field) {
  524. if (!field) {
  525. return;
  526. }
  527. const ascend = field[0] === '-' ? -1 : 1;
  528. if (ascend === -1) {
  529. field = field.substring(1);
  530. }
  531. sort[field] = ascend;
  532. });
  533. } else {
  534. throw new TypeError('Invalid sort() argument. Must be a string or object.');
  535. }
  536. return this.append({ $sort: sort });
  537. };
  538. /**
  539. * Sets the readPreference option for the aggregation query.
  540. *
  541. * ####Example:
  542. *
  543. * Model.aggregate(..).read('primaryPreferred').exec(callback)
  544. *
  545. * @param {String} pref one of the listed preference options or their aliases
  546. * @param {Array} [tags] optional tags for this query
  547. * @return {Aggregate} this
  548. * @api public
  549. * @see mongodb http://docs.mongodb.org/manual/applications/replication/#read-preference
  550. * @see driver http://mongodb.github.com/node-mongodb-native/driver-articles/anintroductionto1_1and2_2.html#read-preferences
  551. */
  552. Aggregate.prototype.read = function(pref, tags) {
  553. if (!this.options) {
  554. this.options = {};
  555. }
  556. read.call(this, pref, tags);
  557. return this;
  558. };
  559. /**
  560. * Sets the readConcern level for the aggregation query.
  561. *
  562. * ####Example:
  563. *
  564. * Model.aggregate(..).readConcern('majority').exec(callback)
  565. *
  566. * @param {String} level one of the listed read concern level or their aliases
  567. * @see mongodb https://docs.mongodb.com/manual/reference/read-concern/
  568. * @return {Aggregate} this
  569. * @api public
  570. */
  571. Aggregate.prototype.readConcern = function(level) {
  572. if (!this.options) {
  573. this.options = {};
  574. }
  575. readConcern.call(this, level);
  576. return this;
  577. };
  578. /**
  579. * Appends a new $redact operator to this aggregate pipeline.
  580. *
  581. * If 3 arguments are supplied, Mongoose will wrap them with if-then-else of $cond operator respectively
  582. * If `thenExpr` or `elseExpr` is string, make sure it starts with $$, like `$$DESCEND`, `$$PRUNE` or `$$KEEP`.
  583. *
  584. * ####Example:
  585. *
  586. * Model.aggregate(...)
  587. * .redact({
  588. * $cond: {
  589. * if: { $eq: [ '$level', 5 ] },
  590. * then: '$$PRUNE',
  591. * else: '$$DESCEND'
  592. * }
  593. * })
  594. * .exec();
  595. *
  596. * // $redact often comes with $cond operator, you can also use the following syntax provided by mongoose
  597. * Model.aggregate(...)
  598. * .redact({ $eq: [ '$level', 5 ] }, '$$PRUNE', '$$DESCEND')
  599. * .exec();
  600. *
  601. * @param {Object} expression redact options or conditional expression
  602. * @param {String|Object} [thenExpr] true case for the condition
  603. * @param {String|Object} [elseExpr] false case for the condition
  604. * @return {Aggregate} this
  605. * @see $redact https://docs.mongodb.com/manual/reference/operator/aggregation/redact/
  606. * @api public
  607. */
  608. Aggregate.prototype.redact = function(expression, thenExpr, elseExpr) {
  609. if (arguments.length === 3) {
  610. if ((typeof thenExpr === 'string' && !thenExpr.startsWith('$$')) ||
  611. (typeof elseExpr === 'string' && !elseExpr.startsWith('$$'))) {
  612. throw new Error('If thenExpr or elseExpr is string, it must start with $$. e.g. $$DESCEND, $$PRUNE, $$KEEP');
  613. }
  614. expression = {
  615. $cond: {
  616. if: expression,
  617. then: thenExpr,
  618. else: elseExpr
  619. }
  620. };
  621. } else if (arguments.length !== 1) {
  622. throw new TypeError('Invalid arguments');
  623. }
  624. return this.append({ $redact: expression });
  625. };
  626. /**
  627. * Execute the aggregation with explain
  628. *
  629. * ####Example:
  630. *
  631. * Model.aggregate(..).explain(callback)
  632. *
  633. * @param {Function} callback
  634. * @return {Promise}
  635. */
  636. Aggregate.prototype.explain = function(callback) {
  637. const model = this._model;
  638. return promiseOrCallback(callback, cb => {
  639. if (!this._pipeline.length) {
  640. const err = new Error('Aggregate has empty pipeline');
  641. return cb(err);
  642. }
  643. prepareDiscriminatorPipeline(this);
  644. model.hooks.execPre('aggregate', this, error => {
  645. if (error) {
  646. const _opts = { error: error };
  647. return model.hooks.execPost('aggregate', this, [null], _opts, error => {
  648. cb(error);
  649. });
  650. }
  651. this.options.explain = true;
  652. model.collection.
  653. aggregate(this._pipeline, this.options || {}).
  654. explain((error, result) => {
  655. const _opts = { error: error };
  656. return model.hooks.execPost('aggregate', this, [result], _opts, error => {
  657. if (error) {
  658. return cb(error);
  659. }
  660. return cb(null, result);
  661. });
  662. });
  663. });
  664. }, model.events);
  665. };
  666. /**
  667. * Sets the allowDiskUse option for the aggregation query (ignored for < 2.6.0)
  668. *
  669. * ####Example:
  670. *
  671. * await Model.aggregate([{ $match: { foo: 'bar' } }]).allowDiskUse(true);
  672. *
  673. * @param {Boolean} value Should tell server it can use hard drive to store data during aggregation.
  674. * @param {Array} [tags] optional tags for this query
  675. * @see mongodb http://docs.mongodb.org/manual/reference/command/aggregate/
  676. */
  677. Aggregate.prototype.allowDiskUse = function(value) {
  678. this.options.allowDiskUse = value;
  679. return this;
  680. };
  681. /**
  682. * Sets the hint option for the aggregation query (ignored for < 3.6.0)
  683. *
  684. * ####Example:
  685. *
  686. * Model.aggregate(..).hint({ qty: 1, category: 1 }).exec(callback)
  687. *
  688. * @param {Object|String} value a hint object or the index name
  689. * @see mongodb http://docs.mongodb.org/manual/reference/command/aggregate/
  690. */
  691. Aggregate.prototype.hint = function(value) {
  692. this.options.hint = value;
  693. return this;
  694. };
  695. /**
  696. * Sets the session for this aggregation. Useful for [transactions](/docs/transactions.html).
  697. *
  698. * ####Example:
  699. *
  700. * const session = await Model.startSession();
  701. * await Model.aggregate(..).session(session);
  702. *
  703. * @param {ClientSession} session
  704. * @see mongodb http://docs.mongodb.org/manual/reference/command/aggregate/
  705. */
  706. Aggregate.prototype.session = function(session) {
  707. if (session == null) {
  708. delete this.options.session;
  709. } else {
  710. this.options.session = session;
  711. }
  712. return this;
  713. };
  714. /**
  715. * Lets you set arbitrary options, for middleware or plugins.
  716. *
  717. * ####Example:
  718. *
  719. * var agg = Model.aggregate(..).option({ allowDiskUse: true }); // Set the `allowDiskUse` option
  720. * agg.options; // `{ allowDiskUse: true }`
  721. *
  722. * @param {Object} options keys to merge into current options
  723. * @param [options.maxTimeMS] number limits the time this aggregation will run, see [MongoDB docs on `maxTimeMS`](https://docs.mongodb.com/manual/reference/operator/meta/maxTimeMS/)
  724. * @param [options.allowDiskUse] boolean if true, the MongoDB server will use the hard drive to store data during this aggregation
  725. * @param [options.collation] object see [`Aggregate.prototype.collation()`](./docs/api.html#aggregate_Aggregate-collation)
  726. * @param [options.session] ClientSession see [`Aggregate.prototype.session()`](./docs/api.html#aggregate_Aggregate-session)
  727. * @see mongodb http://docs.mongodb.org/manual/reference/command/aggregate/
  728. * @return {Aggregate} this
  729. * @api public
  730. */
  731. Aggregate.prototype.option = function(value) {
  732. for (const key in value) {
  733. this.options[key] = value[key];
  734. }
  735. return this;
  736. };
  737. /**
  738. * Sets the cursor option option for the aggregation query (ignored for < 2.6.0).
  739. * Note the different syntax below: .exec() returns a cursor object, and no callback
  740. * is necessary.
  741. *
  742. * ####Example:
  743. *
  744. * var cursor = Model.aggregate(..).cursor({ batchSize: 1000 }).exec();
  745. * cursor.eachAsync(function(doc, i) {
  746. * // use doc
  747. * });
  748. *
  749. * @param {Object} options
  750. * @param {Number} options.batchSize set the cursor batch size
  751. * @param {Boolean} [options.useMongooseAggCursor] use experimental mongoose-specific aggregation cursor (for `eachAsync()` and other query cursor semantics)
  752. * @return {Aggregate} this
  753. * @api public
  754. * @see mongodb http://mongodb.github.io/node-mongodb-native/2.0/api/AggregationCursor.html
  755. */
  756. Aggregate.prototype.cursor = function(options) {
  757. if (!this.options) {
  758. this.options = {};
  759. }
  760. this.options.cursor = options || {};
  761. return this;
  762. };
  763. /**
  764. * Sets an option on this aggregation. This function will be deprecated in a
  765. * future release. Use the [`cursor()`](./api.html#aggregate_Aggregate-cursor),
  766. * [`collation()`](./api.html#aggregate_Aggregate-collation), etc. helpers to
  767. * set individual options, or access `agg.options` directly.
  768. *
  769. * Note that MongoDB aggregations [do **not** support the `noCursorTimeout` flag](https://jira.mongodb.org/browse/SERVER-6036),
  770. * if you try setting that flag with this function you will get a "unrecognized field 'noCursorTimeout'" error.
  771. *
  772. * @param {String} flag
  773. * @param {Boolean} value
  774. * @return {Aggregate} this
  775. * @api public
  776. * @deprecated Use [`.option()`](api.html#aggregate_Aggregate-option) instead. Note that MongoDB aggregations do **not** support a `noCursorTimeout` option.
  777. */
  778. Aggregate.prototype.addCursorFlag = util.deprecate(function(flag, value) {
  779. if (!this.options) {
  780. this.options = {};
  781. }
  782. this.options[flag] = value;
  783. return this;
  784. }, 'Mongoose: `Aggregate#addCursorFlag()` is deprecated, use `option()` instead');
  785. /**
  786. * Adds a collation
  787. *
  788. * ####Example:
  789. *
  790. * Model.aggregate(..).collation({ locale: 'en_US', strength: 1 }).exec();
  791. *
  792. * @param {Object} collation options
  793. * @return {Aggregate} this
  794. * @api public
  795. * @see mongodb http://mongodb.github.io/node-mongodb-native/2.2/api/Collection.html#aggregate
  796. */
  797. Aggregate.prototype.collation = function(collation) {
  798. if (!this.options) {
  799. this.options = {};
  800. }
  801. this.options.collation = collation;
  802. return this;
  803. };
  804. /**
  805. * Combines multiple aggregation pipelines.
  806. *
  807. * ####Example:
  808. *
  809. * Model.aggregate(...)
  810. * .facet({
  811. * books: [{ groupBy: '$author' }],
  812. * price: [{ $bucketAuto: { groupBy: '$price', buckets: 2 } }]
  813. * })
  814. * .exec();
  815. *
  816. * // Output: { books: [...], price: [{...}, {...}] }
  817. *
  818. * @param {Object} facet options
  819. * @return {Aggregate} this
  820. * @see $facet https://docs.mongodb.com/v3.4/reference/operator/aggregation/facet/
  821. * @api public
  822. */
  823. Aggregate.prototype.facet = function(options) {
  824. return this.append({ $facet: options });
  825. };
  826. /**
  827. * Returns the current pipeline
  828. *
  829. * ####Example:
  830. *
  831. * MyModel.aggregate().match({ test: 1 }).pipeline(); // [{ $match: { test: 1 } }]
  832. *
  833. * @return {Array}
  834. * @api public
  835. */
  836. Aggregate.prototype.pipeline = function() {
  837. return this._pipeline;
  838. };
  839. /**
  840. * Executes the aggregate pipeline on the currently bound Model.
  841. *
  842. * ####Example:
  843. *
  844. * aggregate.exec(callback);
  845. *
  846. * // Because a promise is returned, the `callback` is optional.
  847. * var promise = aggregate.exec();
  848. * promise.then(..);
  849. *
  850. * @see Promise #promise_Promise
  851. * @param {Function} [callback]
  852. * @return {Promise}
  853. * @api public
  854. */
  855. Aggregate.prototype.exec = function(callback) {
  856. if (!this._model) {
  857. throw new Error('Aggregate not bound to any Model');
  858. }
  859. const model = this._model;
  860. const collection = this._model.collection;
  861. applyGlobalMaxTimeMS(this.options, model);
  862. if (this.options && this.options.cursor) {
  863. return new AggregationCursor(this);
  864. }
  865. return promiseOrCallback(callback, cb => {
  866. prepareDiscriminatorPipeline(this);
  867. model.hooks.execPre('aggregate', this, error => {
  868. if (error) {
  869. const _opts = { error: error };
  870. return model.hooks.execPost('aggregate', this, [null], _opts, error => {
  871. cb(error);
  872. });
  873. }
  874. if (!this._pipeline.length) {
  875. return cb(new Error('Aggregate has empty pipeline'));
  876. }
  877. const options = utils.clone(this.options || {});
  878. collection.aggregate(this._pipeline, options, (error, cursor) => {
  879. if (error) {
  880. const _opts = { error: error };
  881. return model.hooks.execPost('aggregate', this, [null], _opts, error => {
  882. if (error) {
  883. return cb(error);
  884. }
  885. return cb(null);
  886. });
  887. }
  888. cursor.toArray((error, result) => {
  889. const _opts = { error: error };
  890. model.hooks.execPost('aggregate', this, [result], _opts, (error, result) => {
  891. if (error) {
  892. return cb(error);
  893. }
  894. cb(null, result);
  895. });
  896. });
  897. });
  898. });
  899. }, model.events);
  900. };
  901. /**
  902. * Provides promise for aggregate.
  903. *
  904. * ####Example:
  905. *
  906. * Model.aggregate(..).then(successCallback, errorCallback);
  907. *
  908. * @see Promise #promise_Promise
  909. * @param {Function} [resolve] successCallback
  910. * @param {Function} [reject] errorCallback
  911. * @return {Promise}
  912. */
  913. Aggregate.prototype.then = function(resolve, reject) {
  914. return this.exec().then(resolve, reject);
  915. };
  916. /**
  917. * Executes the query returning a `Promise` which will be
  918. * resolved with either the doc(s) or rejected with the error.
  919. * Like [`.then()`](#query_Query-then), but only takes a rejection handler.
  920. *
  921. * @param {Function} [reject]
  922. * @return {Promise}
  923. * @api public
  924. */
  925. Aggregate.prototype.catch = function(reject) {
  926. return this.exec().then(null, reject);
  927. };
  928. /**
  929. * Returns an asyncIterator for use with [`for/await/of` loops](http://bit.ly/async-iterators)
  930. * You do not need to call this function explicitly, the JavaScript runtime
  931. * will call it for you.
  932. *
  933. * ####Example
  934. *
  935. * const agg = Model.aggregate([{ $match: { age: { $gte: 25 } } }]);
  936. * for await (const doc of agg) {
  937. * console.log(doc.name);
  938. * }
  939. *
  940. * Node.js 10.x supports async iterators natively without any flags. You can
  941. * enable async iterators in Node.js 8.x using the [`--harmony_async_iteration` flag](https://github.com/tc39/proposal-async-iteration/issues/117#issuecomment-346695187).
  942. *
  943. * **Note:** This function is not set if `Symbol.asyncIterator` is undefined. If
  944. * `Symbol.asyncIterator` is undefined, that means your Node.js version does not
  945. * support async iterators.
  946. *
  947. * @method Symbol.asyncIterator
  948. * @memberOf Aggregate
  949. * @instance
  950. * @api public
  951. */
  952. if (Symbol.asyncIterator != null) {
  953. Aggregate.prototype[Symbol.asyncIterator] = function() {
  954. return this.cursor({ useMongooseAggCursor: true }).
  955. exec().
  956. transformNull().
  957. map(doc => {
  958. return doc == null ? { done: true } : { value: doc, done: false };
  959. });
  960. };
  961. }
  962. /*!
  963. * Helpers
  964. */
  965. /**
  966. * Checks whether an object is likely a pipeline operator
  967. *
  968. * @param {Object} obj object to check
  969. * @return {Boolean}
  970. * @api private
  971. */
  972. function isOperator(obj) {
  973. if (typeof obj !== 'object') {
  974. return false;
  975. }
  976. const k = Object.keys(obj);
  977. return k.length === 1 && k.some(key => { return key[0] === '$'; });
  978. }
  979. /*!
  980. * Adds the appropriate `$match` pipeline step to the top of an aggregate's
  981. * pipeline, should it's model is a non-root discriminator type. This is
  982. * analogous to the `prepareDiscriminatorCriteria` function in `lib/query.js`.
  983. *
  984. * @param {Aggregate} aggregate Aggregate to prepare
  985. */
  986. Aggregate._prepareDiscriminatorPipeline = prepareDiscriminatorPipeline;
  987. function prepareDiscriminatorPipeline(aggregate) {
  988. const schema = aggregate._model.schema;
  989. const discriminatorMapping = schema && schema.discriminatorMapping;
  990. if (discriminatorMapping && !discriminatorMapping.isRoot) {
  991. const originalPipeline = aggregate._pipeline;
  992. const discriminatorKey = discriminatorMapping.key;
  993. const discriminatorValue = discriminatorMapping.value;
  994. // If the first pipeline stage is a match and it doesn't specify a `__t`
  995. // key, add the discriminator key to it. This allows for potential
  996. // aggregation query optimizations not to be disturbed by this feature.
  997. if (originalPipeline[0] && originalPipeline[0].$match && !originalPipeline[0].$match[discriminatorKey]) {
  998. originalPipeline[0].$match[discriminatorKey] = discriminatorValue;
  999. // `originalPipeline` is a ref, so there's no need for
  1000. // aggregate._pipeline = originalPipeline
  1001. } else if (originalPipeline[0] && originalPipeline[0].$geoNear) {
  1002. originalPipeline[0].$geoNear.query =
  1003. originalPipeline[0].$geoNear.query || {};
  1004. originalPipeline[0].$geoNear.query[discriminatorKey] = discriminatorValue;
  1005. } else {
  1006. const match = {};
  1007. match[discriminatorKey] = discriminatorValue;
  1008. aggregate._pipeline.unshift({ $match: match });
  1009. }
  1010. }
  1011. }
  1012. /*!
  1013. * Exports
  1014. */
  1015. module.exports = Aggregate;