'use strict'; const applyWriteConcern = require('../utils').applyWriteConcern; const Code = require('../core').BSON.Code; const decorateWithCollation = require('../utils').decorateWithCollation; const decorateWithReadConcern = require('../utils').decorateWithReadConcern; const executeCommand = require('./db_ops').executeCommand; const handleCallback = require('../utils').handleCallback; const isObject = require('../utils').isObject; const loadDb = require('../dynamic_loaders').loadDb; const OperationBase = require('./operation').OperationBase; const ReadPreference = require('../core').ReadPreference; const toError = require('../utils').toError; const Aspect = require('./operation').Aspect; const defineAspects = require('./operation').defineAspects; const decorateWithExplain = require('../utils').decorateWithExplain; const maxWireVersion = require('../core/utils').maxWireVersion; const MongoError = require('../error').MongoError; const exclusionList = [ 'explain', 'readPreference', 'session', 'bypassDocumentValidation', 'w', 'wtimeout', 'j', 'writeConcern' ]; /** * Run Map Reduce across a collection. Be aware that the inline option for out will return an array of results not a collection. * * @class * @property {Collection} a Collection instance. * @property {(function|string)} map The mapping function. * @property {(function|string)} reduce The reduce function. * @property {object} [options] Optional settings. See Collection.prototype.mapReduce for a list of options. */ class MapReduceOperation extends OperationBase { /** * Constructs a MapReduce operation. * * @param {Collection} a Collection instance. * @param {(function|string)} map The mapping function. * @param {(function|string)} reduce The reduce function. * @param {object} [options] Optional settings. See Collection.prototype.mapReduce for a list of options. */ constructor(collection, map, reduce, options) { super(options); this.collection = collection; this.map = map; this.reduce = reduce; } /** * Execute the operation. * * @param {Collection~resultCallback} [callback] The command result callback */ execute(callback) { const coll = this.collection; const map = this.map; const reduce = this.reduce; let options = this.options; let mapCommandHash = { mapReduce: coll.collectionName, map: map, reduce: reduce }; // Add any other options passed in for (let n in options) { if ('scope' === n) { mapCommandHash[n] = processScope(options[n]); } else { // Only include if not in exclusion list if (exclusionList.indexOf(n) === -1) { mapCommandHash[n] = options[n]; } } } options = Object.assign({}, options); // Ensure we have the right read preference inheritance options.readPreference = ReadPreference.resolve(coll, options); // If we have a read preference and inline is not set as output fail hard if ( options.readPreference !== false && options.readPreference !== 'primary' && options['out'] && options['out'].inline !== 1 && options['out'] !== 'inline' ) { // Force readPreference to primary options.readPreference = 'primary'; // Decorate command with writeConcern if supported applyWriteConcern(mapCommandHash, { db: coll.s.db, collection: coll }, options); } else { decorateWithReadConcern(mapCommandHash, coll, options); } // Is bypassDocumentValidation specified if (options.bypassDocumentValidation === true) { mapCommandHash.bypassDocumentValidation = options.bypassDocumentValidation; } // Have we specified collation try { decorateWithCollation(mapCommandHash, coll, options); } catch (err) { return callback(err, null); } if (this.explain) { if (maxWireVersion(coll.s.topology) < 9) { callback(new MongoError(`server does not support explain on mapReduce`)); return; } mapCommandHash = decorateWithExplain(mapCommandHash, this.explain); } // Execute command executeCommand(coll.s.db, mapCommandHash, options, (err, result) => { if (err) return handleCallback(callback, err); // Check if we have an error if (1 !== result.ok || result.err || result.errmsg) { return handleCallback(callback, toError(result)); } // If an explain operation was executed, don't process the server results if (this.explain) return callback(undefined, result); // Create statistics value const stats = {}; if (result.timeMillis) stats['processtime'] = result.timeMillis; if (result.counts) stats['counts'] = result.counts; if (result.timing) stats['timing'] = result.timing; // invoked with inline? if (result.results) { // If we wish for no verbosity if (options['verbose'] == null || !options['verbose']) { return handleCallback(callback, null, result.results); } return handleCallback(callback, null, { results: result.results, stats: stats }); } // The returned collection let collection = null; // If we have an object it's a different db if (result.result != null && typeof result.result === 'object') { const doc = result.result; // Return a collection from another db let Db = loadDb(); collection = new Db(doc.db, coll.s.db.s.topology, coll.s.db.s.options).collection( doc.collection ); } else { // Create a collection object that wraps the result collection collection = coll.s.db.collection(result.result); } // If we wish for no verbosity if (options['verbose'] == null || !options['verbose']) { return handleCallback(callback, err, collection); } // Return stats as third set of values handleCallback(callback, err, { collection: collection, stats: stats }); }); } } /** * Functions that are passed as scope args must * be converted to Code instances. * @ignore */ function processScope(scope) { if (!isObject(scope) || scope._bsontype === 'ObjectID') { return scope; } const keys = Object.keys(scope); let key; const new_scope = {}; for (let i = keys.length - 1; i >= 0; i--) { key = keys[i]; if ('function' === typeof scope[key]) { new_scope[key] = new Code(String(scope[key])); } else { new_scope[key] = processScope(scope[key]); } } return new_scope; } defineAspects(MapReduceOperation, [Aspect.EXPLAINABLE]); module.exports = MapReduceOperation;