map_reduce.js 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. 'use strict';
  2. const applyWriteConcern = require('../utils').applyWriteConcern;
  3. const Code = require('../core').BSON.Code;
  4. const decorateWithCollation = require('../utils').decorateWithCollation;
  5. const decorateWithReadConcern = require('../utils').decorateWithReadConcern;
  6. const executeCommand = require('./db_ops').executeCommand;
  7. const handleCallback = require('../utils').handleCallback;
  8. const isObject = require('../utils').isObject;
  9. const loadDb = require('../dynamic_loaders').loadDb;
  10. const OperationBase = require('./operation').OperationBase;
  11. const ReadPreference = require('../core').ReadPreference;
  12. const toError = require('../utils').toError;
  13. const Aspect = require('./operation').Aspect;
  14. const defineAspects = require('./operation').defineAspects;
  15. const decorateWithExplain = require('../utils').decorateWithExplain;
  16. const maxWireVersion = require('../core/utils').maxWireVersion;
  17. const MongoError = require('../error').MongoError;
  18. const exclusionList = [
  19. 'explain',
  20. 'readPreference',
  21. 'session',
  22. 'bypassDocumentValidation',
  23. 'w',
  24. 'wtimeout',
  25. 'j',
  26. 'writeConcern'
  27. ];
  28. /**
  29. * Run Map Reduce across a collection. Be aware that the inline option for out will return an array of results not a collection.
  30. *
  31. * @class
  32. * @property {Collection} a Collection instance.
  33. * @property {(function|string)} map The mapping function.
  34. * @property {(function|string)} reduce The reduce function.
  35. * @property {object} [options] Optional settings. See Collection.prototype.mapReduce for a list of options.
  36. */
  37. class MapReduceOperation extends OperationBase {
  38. /**
  39. * Constructs a MapReduce operation.
  40. *
  41. * @param {Collection} a Collection instance.
  42. * @param {(function|string)} map The mapping function.
  43. * @param {(function|string)} reduce The reduce function.
  44. * @param {object} [options] Optional settings. See Collection.prototype.mapReduce for a list of options.
  45. */
  46. constructor(collection, map, reduce, options) {
  47. super(options);
  48. this.collection = collection;
  49. this.map = map;
  50. this.reduce = reduce;
  51. }
  52. /**
  53. * Execute the operation.
  54. *
  55. * @param {Collection~resultCallback} [callback] The command result callback
  56. */
  57. execute(callback) {
  58. const coll = this.collection;
  59. const map = this.map;
  60. const reduce = this.reduce;
  61. let options = this.options;
  62. let mapCommandHash = {
  63. mapReduce: coll.collectionName,
  64. map: map,
  65. reduce: reduce
  66. };
  67. // Add any other options passed in
  68. for (let n in options) {
  69. if ('scope' === n) {
  70. mapCommandHash[n] = processScope(options[n]);
  71. } else {
  72. // Only include if not in exclusion list
  73. if (exclusionList.indexOf(n) === -1) {
  74. mapCommandHash[n] = options[n];
  75. }
  76. }
  77. }
  78. options = Object.assign({}, options);
  79. // Ensure we have the right read preference inheritance
  80. options.readPreference = ReadPreference.resolve(coll, options);
  81. // If we have a read preference and inline is not set as output fail hard
  82. if (
  83. options.readPreference !== false &&
  84. options.readPreference !== 'primary' &&
  85. options['out'] &&
  86. options['out'].inline !== 1 &&
  87. options['out'] !== 'inline'
  88. ) {
  89. // Force readPreference to primary
  90. options.readPreference = 'primary';
  91. // Decorate command with writeConcern if supported
  92. applyWriteConcern(mapCommandHash, { db: coll.s.db, collection: coll }, options);
  93. } else {
  94. decorateWithReadConcern(mapCommandHash, coll, options);
  95. }
  96. // Is bypassDocumentValidation specified
  97. if (options.bypassDocumentValidation === true) {
  98. mapCommandHash.bypassDocumentValidation = options.bypassDocumentValidation;
  99. }
  100. // Have we specified collation
  101. try {
  102. decorateWithCollation(mapCommandHash, coll, options);
  103. } catch (err) {
  104. return callback(err, null);
  105. }
  106. if (this.explain) {
  107. if (maxWireVersion(coll.s.topology) < 9) {
  108. callback(new MongoError(`server does not support explain on mapReduce`));
  109. return;
  110. }
  111. mapCommandHash = decorateWithExplain(mapCommandHash, this.explain);
  112. }
  113. // Execute command
  114. executeCommand(coll.s.db, mapCommandHash, options, (err, result) => {
  115. if (err) return handleCallback(callback, err);
  116. // Check if we have an error
  117. if (1 !== result.ok || result.err || result.errmsg) {
  118. return handleCallback(callback, toError(result));
  119. }
  120. // If an explain operation was executed, don't process the server results
  121. if (this.explain) return callback(undefined, result);
  122. // Create statistics value
  123. const stats = {};
  124. if (result.timeMillis) stats['processtime'] = result.timeMillis;
  125. if (result.counts) stats['counts'] = result.counts;
  126. if (result.timing) stats['timing'] = result.timing;
  127. // invoked with inline?
  128. if (result.results) {
  129. // If we wish for no verbosity
  130. if (options['verbose'] == null || !options['verbose']) {
  131. return handleCallback(callback, null, result.results);
  132. }
  133. return handleCallback(callback, null, { results: result.results, stats: stats });
  134. }
  135. // The returned collection
  136. let collection = null;
  137. // If we have an object it's a different db
  138. if (result.result != null && typeof result.result === 'object') {
  139. const doc = result.result;
  140. // Return a collection from another db
  141. let Db = loadDb();
  142. collection = new Db(doc.db, coll.s.db.s.topology, coll.s.db.s.options).collection(
  143. doc.collection
  144. );
  145. } else {
  146. // Create a collection object that wraps the result collection
  147. collection = coll.s.db.collection(result.result);
  148. }
  149. // If we wish for no verbosity
  150. if (options['verbose'] == null || !options['verbose']) {
  151. return handleCallback(callback, err, collection);
  152. }
  153. // Return stats as third set of values
  154. handleCallback(callback, err, { collection: collection, stats: stats });
  155. });
  156. }
  157. }
  158. /**
  159. * Functions that are passed as scope args must
  160. * be converted to Code instances.
  161. * @ignore
  162. */
  163. function processScope(scope) {
  164. if (!isObject(scope) || scope._bsontype === 'ObjectID') {
  165. return scope;
  166. }
  167. const keys = Object.keys(scope);
  168. let key;
  169. const new_scope = {};
  170. for (let i = keys.length - 1; i >= 0; i--) {
  171. key = keys[i];
  172. if ('function' === typeof scope[key]) {
  173. new_scope[key] = new Code(String(scope[key]));
  174. } else {
  175. new_scope[key] = processScope(scope[key]);
  176. }
  177. }
  178. return new_scope;
  179. }
  180. defineAspects(MapReduceOperation, [Aspect.EXPLAINABLE]);
  181. module.exports = MapReduceOperation;