collection_ops.js 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353
  1. 'use strict';
  2. const applyWriteConcern = require('../utils').applyWriteConcern;
  3. const Code = require('../core').BSON.Code;
  4. const createIndexDb = require('./db_ops').createIndex;
  5. const decorateWithCollation = require('../utils').decorateWithCollation;
  6. const decorateWithReadConcern = require('../utils').decorateWithReadConcern;
  7. const ensureIndexDb = require('./db_ops').ensureIndex;
  8. const evaluate = require('./db_ops').evaluate;
  9. const executeCommand = require('./db_ops').executeCommand;
  10. const handleCallback = require('../utils').handleCallback;
  11. const indexInformationDb = require('./db_ops').indexInformation;
  12. const Long = require('../core').BSON.Long;
  13. const MongoError = require('../core').MongoError;
  14. const ReadPreference = require('../core').ReadPreference;
  15. const insertDocuments = require('./common_functions').insertDocuments;
  16. const updateDocuments = require('./common_functions').updateDocuments;
  17. /**
  18. * Group function helper
  19. * @ignore
  20. */
  21. // var groupFunction = function () {
  22. // var c = db[ns].find(condition);
  23. // var map = new Map();
  24. // var reduce_function = reduce;
  25. //
  26. // while (c.hasNext()) {
  27. // var obj = c.next();
  28. // var key = {};
  29. //
  30. // for (var i = 0, len = keys.length; i < len; ++i) {
  31. // var k = keys[i];
  32. // key[k] = obj[k];
  33. // }
  34. //
  35. // var aggObj = map.get(key);
  36. //
  37. // if (aggObj == null) {
  38. // var newObj = Object.extend({}, key);
  39. // aggObj = Object.extend(newObj, initial);
  40. // map.put(key, aggObj);
  41. // }
  42. //
  43. // reduce_function(obj, aggObj);
  44. // }
  45. //
  46. // return { "result": map.values() };
  47. // }.toString();
  48. const groupFunction =
  49. 'function () {\nvar c = db[ns].find(condition);\nvar map = new Map();\nvar reduce_function = reduce;\n\nwhile (c.hasNext()) {\nvar obj = c.next();\nvar key = {};\n\nfor (var i = 0, len = keys.length; i < len; ++i) {\nvar k = keys[i];\nkey[k] = obj[k];\n}\n\nvar aggObj = map.get(key);\n\nif (aggObj == null) {\nvar newObj = Object.extend({}, key);\naggObj = Object.extend(newObj, initial);\nmap.put(key, aggObj);\n}\n\nreduce_function(obj, aggObj);\n}\n\nreturn { "result": map.values() };\n}';
  50. /**
  51. * Create an index on the db and collection.
  52. *
  53. * @method
  54. * @param {Collection} a Collection instance.
  55. * @param {(string|object)} fieldOrSpec Defines the index.
  56. * @param {object} [options] Optional settings. See Collection.prototype.createIndex for a list of options.
  57. * @param {Collection~resultCallback} [callback] The command result callback
  58. */
  59. function createIndex(coll, fieldOrSpec, options, callback) {
  60. createIndexDb(coll.s.db, coll.collectionName, fieldOrSpec, options, callback);
  61. }
  62. /**
  63. * Create multiple indexes in the collection. This method is only supported for
  64. * MongoDB 2.6 or higher. Earlier version of MongoDB will throw a command not supported
  65. * error. Index specifications are defined at http://docs.mongodb.org/manual/reference/command/createIndexes/.
  66. *
  67. * @method
  68. * @param {Collection} a Collection instance.
  69. * @param {array} indexSpecs An array of index specifications to be created
  70. * @param {Object} [options] Optional settings. See Collection.prototype.createIndexes for a list of options.
  71. * @param {Collection~resultCallback} [callback] The command result callback
  72. */
  73. function createIndexes(coll, indexSpecs, options, callback) {
  74. const capabilities = coll.s.topology.capabilities();
  75. // Ensure we generate the correct name if the parameter is not set
  76. for (let i = 0; i < indexSpecs.length; i++) {
  77. if (indexSpecs[i].name == null) {
  78. const keys = [];
  79. // Did the user pass in a collation, check if our write server supports it
  80. if (indexSpecs[i].collation && capabilities && !capabilities.commandsTakeCollation) {
  81. return callback(new MongoError('server/primary/mongos does not support collation'));
  82. }
  83. for (let name in indexSpecs[i].key) {
  84. keys.push(`${name}_${indexSpecs[i].key[name]}`);
  85. }
  86. // Set the name
  87. indexSpecs[i].name = keys.join('_');
  88. }
  89. }
  90. options = Object.assign({}, options, { readPreference: ReadPreference.PRIMARY });
  91. // Execute the index
  92. executeCommand(
  93. coll.s.db,
  94. {
  95. createIndexes: coll.collectionName,
  96. indexes: indexSpecs
  97. },
  98. options,
  99. callback
  100. );
  101. }
  102. /**
  103. * Ensure that an index exists. If the index does not exist, this function creates it.
  104. *
  105. * @method
  106. * @param {Collection} a Collection instance.
  107. * @param {(string|object)} fieldOrSpec Defines the index.
  108. * @param {object} [options] Optional settings. See Collection.prototype.ensureIndex for a list of options.
  109. * @param {Collection~resultCallback} [callback] The command result callback
  110. */
  111. function ensureIndex(coll, fieldOrSpec, options, callback) {
  112. ensureIndexDb(coll.s.db, coll.collectionName, fieldOrSpec, options, callback);
  113. }
  114. /**
  115. * Run a group command across a collection.
  116. *
  117. * @method
  118. * @param {Collection} a Collection instance.
  119. * @param {(object|array|function|code)} keys An object, array or function expressing the keys to group by.
  120. * @param {object} condition An optional condition that must be true for a row to be considered.
  121. * @param {object} initial Initial value of the aggregation counter object.
  122. * @param {(function|Code)} reduce The reduce function aggregates (reduces) the objects iterated
  123. * @param {(function|Code)} finalize An optional function to be run on each item in the result set just before the item is returned.
  124. * @param {boolean} command Specify if you wish to run using the internal group command or using eval, default is true.
  125. * @param {object} [options] Optional settings. See Collection.prototype.group for a list of options.
  126. * @param {Collection~resultCallback} [callback] The command result callback
  127. * @deprecated MongoDB 3.6 or higher will no longer support the group command. We recommend rewriting using the aggregation framework.
  128. */
  129. function group(coll, keys, condition, initial, reduce, finalize, command, options, callback) {
  130. // Execute using the command
  131. if (command) {
  132. const reduceFunction = reduce && reduce._bsontype === 'Code' ? reduce : new Code(reduce);
  133. const selector = {
  134. group: {
  135. ns: coll.collectionName,
  136. $reduce: reduceFunction,
  137. cond: condition,
  138. initial: initial,
  139. out: 'inline'
  140. }
  141. };
  142. // if finalize is defined
  143. if (finalize != null) selector.group['finalize'] = finalize;
  144. // Set up group selector
  145. if ('function' === typeof keys || (keys && keys._bsontype === 'Code')) {
  146. selector.group.$keyf = keys && keys._bsontype === 'Code' ? keys : new Code(keys);
  147. } else {
  148. const hash = {};
  149. keys.forEach(key => {
  150. hash[key] = 1;
  151. });
  152. selector.group.key = hash;
  153. }
  154. options = Object.assign({}, options);
  155. // Ensure we have the right read preference inheritance
  156. options.readPreference = ReadPreference.resolve(coll, options);
  157. // Do we have a readConcern specified
  158. decorateWithReadConcern(selector, coll, options);
  159. // Have we specified collation
  160. try {
  161. decorateWithCollation(selector, coll, options);
  162. } catch (err) {
  163. return callback(err, null);
  164. }
  165. // Execute command
  166. executeCommand(coll.s.db, selector, options, (err, result) => {
  167. if (err) return handleCallback(callback, err, null);
  168. handleCallback(callback, null, result.retval);
  169. });
  170. } else {
  171. // Create execution scope
  172. const scope = reduce != null && reduce._bsontype === 'Code' ? reduce.scope : {};
  173. scope.ns = coll.collectionName;
  174. scope.keys = keys;
  175. scope.condition = condition;
  176. scope.initial = initial;
  177. // Pass in the function text to execute within mongodb.
  178. const groupfn = groupFunction.replace(/ reduce;/, reduce.toString() + ';');
  179. evaluate(coll.s.db, new Code(groupfn, scope), null, options, (err, results) => {
  180. if (err) return handleCallback(callback, err, null);
  181. handleCallback(callback, null, results.result || results);
  182. });
  183. }
  184. }
  185. /**
  186. * Retrieve all the indexes on the collection.
  187. *
  188. * @method
  189. * @param {Collection} a Collection instance.
  190. * @param {Object} [options] Optional settings. See Collection.prototype.indexes for a list of options.
  191. * @param {Collection~resultCallback} [callback] The command result callback
  192. */
  193. function indexes(coll, options, callback) {
  194. options = Object.assign({}, { full: true }, options);
  195. indexInformationDb(coll.s.db, coll.collectionName, options, callback);
  196. }
  197. /**
  198. * Check if one or more indexes exist on the collection. This fails on the first index that doesn't exist.
  199. *
  200. * @method
  201. * @param {Collection} a Collection instance.
  202. * @param {(string|array)} indexes One or more index names to check.
  203. * @param {Object} [options] Optional settings. See Collection.prototype.indexExists for a list of options.
  204. * @param {Collection~resultCallback} [callback] The command result callback
  205. */
  206. function indexExists(coll, indexes, options, callback) {
  207. indexInformation(coll, options, (err, indexInformation) => {
  208. // If we have an error return
  209. if (err != null) return handleCallback(callback, err, null);
  210. // Let's check for the index names
  211. if (!Array.isArray(indexes))
  212. return handleCallback(callback, null, indexInformation[indexes] != null);
  213. // Check in list of indexes
  214. for (let i = 0; i < indexes.length; i++) {
  215. if (indexInformation[indexes[i]] == null) {
  216. return handleCallback(callback, null, false);
  217. }
  218. }
  219. // All keys found return true
  220. return handleCallback(callback, null, true);
  221. });
  222. }
  223. /**
  224. * Retrieve this collection's index info.
  225. *
  226. * @method
  227. * @param {Collection} a Collection instance.
  228. * @param {object} [options] Optional settings. See Collection.prototype.indexInformation for a list of options.
  229. * @param {Collection~resultCallback} [callback] The command result callback
  230. */
  231. function indexInformation(coll, options, callback) {
  232. indexInformationDb(coll.s.db, coll.collectionName, options, callback);
  233. }
  234. /**
  235. * Return N parallel cursors for a collection to allow parallel reading of the entire collection. There are
  236. * no ordering guarantees for returned results.
  237. *
  238. * @method
  239. * @param {Collection} a Collection instance.
  240. * @param {object} [options] Optional settings. See Collection.prototype.parallelCollectionScan for a list of options.
  241. * @param {Collection~parallelCollectionScanCallback} [callback] The command result callback
  242. */
  243. function parallelCollectionScan(coll, options, callback) {
  244. // Create command object
  245. const commandObject = {
  246. parallelCollectionScan: coll.collectionName,
  247. numCursors: options.numCursors
  248. };
  249. // Do we have a readConcern specified
  250. decorateWithReadConcern(commandObject, coll, options);
  251. // Store the raw value
  252. const raw = options.raw;
  253. delete options['raw'];
  254. // Execute the command
  255. executeCommand(coll.s.db, commandObject, options, (err, result) => {
  256. if (err) return handleCallback(callback, err, null);
  257. if (result == null)
  258. return handleCallback(
  259. callback,
  260. new Error('no result returned for parallelCollectionScan'),
  261. null
  262. );
  263. options = Object.assign({ explicitlyIgnoreSession: true }, options);
  264. const cursors = [];
  265. // Add the raw back to the option
  266. if (raw) options.raw = raw;
  267. // Create command cursors for each item
  268. for (let i = 0; i < result.cursors.length; i++) {
  269. const rawId = result.cursors[i].cursor.id;
  270. // Convert cursorId to Long if needed
  271. const cursorId = typeof rawId === 'number' ? Long.fromNumber(rawId) : rawId;
  272. // Add a command cursor
  273. cursors.push(coll.s.topology.cursor(coll.namespace, cursorId, options));
  274. }
  275. handleCallback(callback, null, cursors);
  276. });
  277. }
  278. /**
  279. * Save a document.
  280. *
  281. * @method
  282. * @param {Collection} a Collection instance.
  283. * @param {object} doc Document to save
  284. * @param {object} [options] Optional settings. See Collection.prototype.save for a list of options.
  285. * @param {Collection~writeOpCallback} [callback] The command result callback
  286. * @deprecated use insertOne, insertMany, updateOne or updateMany
  287. */
  288. function save(coll, doc, options, callback) {
  289. // Get the write concern options
  290. const finalOptions = applyWriteConcern(
  291. Object.assign({}, options),
  292. { db: coll.s.db, collection: coll },
  293. options
  294. );
  295. // Establish if we need to perform an insert or update
  296. if (doc._id != null) {
  297. finalOptions.upsert = true;
  298. return updateDocuments(coll, { _id: doc._id }, doc, finalOptions, callback);
  299. }
  300. // Insert the document
  301. insertDocuments(coll, [doc], finalOptions, (err, result) => {
  302. if (callback == null) return;
  303. if (doc == null) return handleCallback(callback, null, null);
  304. if (err) return handleCallback(callback, err, null);
  305. handleCallback(callback, null, result);
  306. });
  307. }
  308. module.exports = {
  309. createIndex,
  310. createIndexes,
  311. ensureIndex,
  312. group,
  313. indexes,
  314. indexExists,
  315. indexInformation,
  316. parallelCollectionScan,
  317. save
  318. };