map_reduce.js 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. 'use strict';
  2. const applyWriteConcern = require('../utils').applyWriteConcern;
  3. const Code = require('../core').BSON.Code;
  4. const decorateWithCollation = require('../utils').decorateWithCollation;
  5. const decorateWithReadConcern = require('../utils').decorateWithReadConcern;
  6. const executeCommand = require('./db_ops').executeCommand;
  7. const handleCallback = require('../utils').handleCallback;
  8. const isObject = require('../utils').isObject;
  9. const loadDb = require('../dynamic_loaders').loadDb;
  10. const OperationBase = require('./operation').OperationBase;
  11. const resolveReadPreference = require('../utils').resolveReadPreference;
  12. const toError = require('../utils').toError;
  13. const exclusionList = [
  14. 'readPreference',
  15. 'session',
  16. 'bypassDocumentValidation',
  17. 'w',
  18. 'wtimeout',
  19. 'j',
  20. 'writeConcern'
  21. ];
  22. /**
  23. * Run Map Reduce across a collection. Be aware that the inline option for out will return an array of results not a collection.
  24. *
  25. * @class
  26. * @property {Collection} a Collection instance.
  27. * @property {(function|string)} map The mapping function.
  28. * @property {(function|string)} reduce The reduce function.
  29. * @property {object} [options] Optional settings. See Collection.prototype.mapReduce for a list of options.
  30. */
  31. class MapReduceOperation extends OperationBase {
  32. /**
  33. * Constructs a MapReduce operation.
  34. *
  35. * @param {Collection} a Collection instance.
  36. * @param {(function|string)} map The mapping function.
  37. * @param {(function|string)} reduce The reduce function.
  38. * @param {object} [options] Optional settings. See Collection.prototype.mapReduce for a list of options.
  39. */
  40. constructor(collection, map, reduce, options) {
  41. super(options);
  42. this.collection = collection;
  43. this.map = map;
  44. this.reduce = reduce;
  45. }
  46. /**
  47. * Execute the operation.
  48. *
  49. * @param {Collection~resultCallback} [callback] The command result callback
  50. */
  51. execute(callback) {
  52. const coll = this.collection;
  53. const map = this.map;
  54. const reduce = this.reduce;
  55. let options = this.options;
  56. const mapCommandHash = {
  57. mapreduce: coll.collectionName,
  58. map: map,
  59. reduce: reduce
  60. };
  61. // Add any other options passed in
  62. for (let n in options) {
  63. if ('scope' === n) {
  64. mapCommandHash[n] = processScope(options[n]);
  65. } else {
  66. // Only include if not in exclusion list
  67. if (exclusionList.indexOf(n) === -1) {
  68. mapCommandHash[n] = options[n];
  69. }
  70. }
  71. }
  72. options = Object.assign({}, options);
  73. // Ensure we have the right read preference inheritance
  74. options.readPreference = resolveReadPreference(coll, options);
  75. // If we have a read preference and inline is not set as output fail hard
  76. if (
  77. options.readPreference !== false &&
  78. options.readPreference !== 'primary' &&
  79. options['out'] &&
  80. options['out'].inline !== 1 &&
  81. options['out'] !== 'inline'
  82. ) {
  83. // Force readPreference to primary
  84. options.readPreference = 'primary';
  85. // Decorate command with writeConcern if supported
  86. applyWriteConcern(mapCommandHash, { db: coll.s.db, collection: coll }, options);
  87. } else {
  88. decorateWithReadConcern(mapCommandHash, coll, options);
  89. }
  90. // Is bypassDocumentValidation specified
  91. if (options.bypassDocumentValidation === true) {
  92. mapCommandHash.bypassDocumentValidation = options.bypassDocumentValidation;
  93. }
  94. // Have we specified collation
  95. try {
  96. decorateWithCollation(mapCommandHash, coll, options);
  97. } catch (err) {
  98. return callback(err, null);
  99. }
  100. // Execute command
  101. executeCommand(coll.s.db, mapCommandHash, options, (err, result) => {
  102. if (err) return handleCallback(callback, err);
  103. // Check if we have an error
  104. if (1 !== result.ok || result.err || result.errmsg) {
  105. return handleCallback(callback, toError(result));
  106. }
  107. // Create statistics value
  108. const stats = {};
  109. if (result.timeMillis) stats['processtime'] = result.timeMillis;
  110. if (result.counts) stats['counts'] = result.counts;
  111. if (result.timing) stats['timing'] = result.timing;
  112. // invoked with inline?
  113. if (result.results) {
  114. // If we wish for no verbosity
  115. if (options['verbose'] == null || !options['verbose']) {
  116. return handleCallback(callback, null, result.results);
  117. }
  118. return handleCallback(callback, null, { results: result.results, stats: stats });
  119. }
  120. // The returned collection
  121. let collection = null;
  122. // If we have an object it's a different db
  123. if (result.result != null && typeof result.result === 'object') {
  124. const doc = result.result;
  125. // Return a collection from another db
  126. let Db = loadDb();
  127. collection = new Db(doc.db, coll.s.db.s.topology, coll.s.db.s.options).collection(
  128. doc.collection
  129. );
  130. } else {
  131. // Create a collection object that wraps the result collection
  132. collection = coll.s.db.collection(result.result);
  133. }
  134. // If we wish for no verbosity
  135. if (options['verbose'] == null || !options['verbose']) {
  136. return handleCallback(callback, err, collection);
  137. }
  138. // Return stats as third set of values
  139. handleCallback(callback, err, { collection: collection, stats: stats });
  140. });
  141. }
  142. }
  143. /**
  144. * Functions that are passed as scope args must
  145. * be converted to Code instances.
  146. * @ignore
  147. */
  148. function processScope(scope) {
  149. if (!isObject(scope) || scope._bsontype === 'ObjectID') {
  150. return scope;
  151. }
  152. const keys = Object.keys(scope);
  153. let key;
  154. const new_scope = {};
  155. for (let i = keys.length - 1; i >= 0; i--) {
  156. key = keys[i];
  157. if ('function' === typeof scope[key]) {
  158. new_scope[key] = new Code(String(scope[key]));
  159. } else {
  160. new_scope[key] = processScope(scope[key]);
  161. }
  162. }
  163. return new_scope;
  164. }
  165. module.exports = MapReduceOperation;