collection_ops.js 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374
  1. 'use strict';
  2. const applyWriteConcern = require('../utils').applyWriteConcern;
  3. const Code = require('../core').BSON.Code;
  4. const createIndexDb = require('./db_ops').createIndex;
  5. const decorateWithCollation = require('../utils').decorateWithCollation;
  6. const decorateWithReadConcern = require('../utils').decorateWithReadConcern;
  7. const ensureIndexDb = require('./db_ops').ensureIndex;
  8. const evaluate = require('./db_ops').evaluate;
  9. const executeCommand = require('./db_ops').executeCommand;
  10. const resolveReadPreference = require('../utils').resolveReadPreference;
  11. const handleCallback = require('../utils').handleCallback;
  12. const indexInformationDb = require('./db_ops').indexInformation;
  13. const Long = require('../core').BSON.Long;
  14. const MongoError = require('../core').MongoError;
  15. const ReadPreference = require('../core').ReadPreference;
  16. const toError = require('../utils').toError;
  17. const insertDocuments = require('./common_functions').insertDocuments;
  18. const updateDocuments = require('./common_functions').updateDocuments;
  19. /**
  20. * Group function helper
  21. * @ignore
  22. */
  23. // var groupFunction = function () {
  24. // var c = db[ns].find(condition);
  25. // var map = new Map();
  26. // var reduce_function = reduce;
  27. //
  28. // while (c.hasNext()) {
  29. // var obj = c.next();
  30. // var key = {};
  31. //
  32. // for (var i = 0, len = keys.length; i < len; ++i) {
  33. // var k = keys[i];
  34. // key[k] = obj[k];
  35. // }
  36. //
  37. // var aggObj = map.get(key);
  38. //
  39. // if (aggObj == null) {
  40. // var newObj = Object.extend({}, key);
  41. // aggObj = Object.extend(newObj, initial);
  42. // map.put(key, aggObj);
  43. // }
  44. //
  45. // reduce_function(obj, aggObj);
  46. // }
  47. //
  48. // return { "result": map.values() };
  49. // }.toString();
  50. const groupFunction =
  51. 'function () {\nvar c = db[ns].find(condition);\nvar map = new Map();\nvar reduce_function = reduce;\n\nwhile (c.hasNext()) {\nvar obj = c.next();\nvar key = {};\n\nfor (var i = 0, len = keys.length; i < len; ++i) {\nvar k = keys[i];\nkey[k] = obj[k];\n}\n\nvar aggObj = map.get(key);\n\nif (aggObj == null) {\nvar newObj = Object.extend({}, key);\naggObj = Object.extend(newObj, initial);\nmap.put(key, aggObj);\n}\n\nreduce_function(obj, aggObj);\n}\n\nreturn { "result": map.values() };\n}';
  52. // Check the update operation to ensure it has atomic operators.
  53. function checkForAtomicOperators(update) {
  54. if (Array.isArray(update)) {
  55. return update.reduce((err, u) => err || checkForAtomicOperators(u), null);
  56. }
  57. const keys = Object.keys(update);
  58. // same errors as the server would give for update doc lacking atomic operators
  59. if (keys.length === 0) {
  60. return toError('The update operation document must contain at least one atomic operator.');
  61. }
  62. if (keys[0][0] !== '$') {
  63. return toError('the update operation document must contain atomic operators.');
  64. }
  65. }
  66. /**
  67. * Create an index on the db and collection.
  68. *
  69. * @method
  70. * @param {Collection} a Collection instance.
  71. * @param {(string|object)} fieldOrSpec Defines the index.
  72. * @param {object} [options] Optional settings. See Collection.prototype.createIndex for a list of options.
  73. * @param {Collection~resultCallback} [callback] The command result callback
  74. */
  75. function createIndex(coll, fieldOrSpec, options, callback) {
  76. createIndexDb(coll.s.db, coll.collectionName, fieldOrSpec, options, callback);
  77. }
  78. /**
  79. * Create multiple indexes in the collection. This method is only supported for
  80. * MongoDB 2.6 or higher. Earlier version of MongoDB will throw a command not supported
  81. * error. Index specifications are defined at http://docs.mongodb.org/manual/reference/command/createIndexes/.
  82. *
  83. * @method
  84. * @param {Collection} a Collection instance.
  85. * @param {array} indexSpecs An array of index specifications to be created
  86. * @param {Object} [options] Optional settings. See Collection.prototype.createIndexes for a list of options.
  87. * @param {Collection~resultCallback} [callback] The command result callback
  88. */
  89. function createIndexes(coll, indexSpecs, options, callback) {
  90. const capabilities = coll.s.topology.capabilities();
  91. // Ensure we generate the correct name if the parameter is not set
  92. for (let i = 0; i < indexSpecs.length; i++) {
  93. if (indexSpecs[i].name == null) {
  94. const keys = [];
  95. // Did the user pass in a collation, check if our write server supports it
  96. if (indexSpecs[i].collation && capabilities && !capabilities.commandsTakeCollation) {
  97. return callback(new MongoError('server/primary/mongos does not support collation'));
  98. }
  99. for (let name in indexSpecs[i].key) {
  100. keys.push(`${name}_${indexSpecs[i].key[name]}`);
  101. }
  102. // Set the name
  103. indexSpecs[i].name = keys.join('_');
  104. }
  105. }
  106. options = Object.assign({}, options, { readPreference: ReadPreference.PRIMARY });
  107. // Execute the index
  108. executeCommand(
  109. coll.s.db,
  110. {
  111. createIndexes: coll.collectionName,
  112. indexes: indexSpecs
  113. },
  114. options,
  115. callback
  116. );
  117. }
  118. /**
  119. * Ensure that an index exists. If the index does not exist, this function creates it.
  120. *
  121. * @method
  122. * @param {Collection} a Collection instance.
  123. * @param {(string|object)} fieldOrSpec Defines the index.
  124. * @param {object} [options] Optional settings. See Collection.prototype.ensureIndex for a list of options.
  125. * @param {Collection~resultCallback} [callback] The command result callback
  126. */
  127. function ensureIndex(coll, fieldOrSpec, options, callback) {
  128. ensureIndexDb(coll.s.db, coll.collectionName, fieldOrSpec, options, callback);
  129. }
  130. /**
  131. * Run a group command across a collection.
  132. *
  133. * @method
  134. * @param {Collection} a Collection instance.
  135. * @param {(object|array|function|code)} keys An object, array or function expressing the keys to group by.
  136. * @param {object} condition An optional condition that must be true for a row to be considered.
  137. * @param {object} initial Initial value of the aggregation counter object.
  138. * @param {(function|Code)} reduce The reduce function aggregates (reduces) the objects iterated
  139. * @param {(function|Code)} finalize An optional function to be run on each item in the result set just before the item is returned.
  140. * @param {boolean} command Specify if you wish to run using the internal group command or using eval, default is true.
  141. * @param {object} [options] Optional settings. See Collection.prototype.group for a list of options.
  142. * @param {Collection~resultCallback} [callback] The command result callback
  143. * @deprecated MongoDB 3.6 or higher will no longer support the group command. We recommend rewriting using the aggregation framework.
  144. */
  145. function group(coll, keys, condition, initial, reduce, finalize, command, options, callback) {
  146. // Execute using the command
  147. if (command) {
  148. const reduceFunction = reduce && reduce._bsontype === 'Code' ? reduce : new Code(reduce);
  149. const selector = {
  150. group: {
  151. ns: coll.collectionName,
  152. $reduce: reduceFunction,
  153. cond: condition,
  154. initial: initial,
  155. out: 'inline'
  156. }
  157. };
  158. // if finalize is defined
  159. if (finalize != null) selector.group['finalize'] = finalize;
  160. // Set up group selector
  161. if ('function' === typeof keys || (keys && keys._bsontype === 'Code')) {
  162. selector.group.$keyf = keys && keys._bsontype === 'Code' ? keys : new Code(keys);
  163. } else {
  164. const hash = {};
  165. keys.forEach(key => {
  166. hash[key] = 1;
  167. });
  168. selector.group.key = hash;
  169. }
  170. options = Object.assign({}, options);
  171. // Ensure we have the right read preference inheritance
  172. options.readPreference = resolveReadPreference(coll, options);
  173. // Do we have a readConcern specified
  174. decorateWithReadConcern(selector, coll, options);
  175. // Have we specified collation
  176. try {
  177. decorateWithCollation(selector, coll, options);
  178. } catch (err) {
  179. return callback(err, null);
  180. }
  181. // Execute command
  182. executeCommand(coll.s.db, selector, options, (err, result) => {
  183. if (err) return handleCallback(callback, err, null);
  184. handleCallback(callback, null, result.retval);
  185. });
  186. } else {
  187. // Create execution scope
  188. const scope = reduce != null && reduce._bsontype === 'Code' ? reduce.scope : {};
  189. scope.ns = coll.collectionName;
  190. scope.keys = keys;
  191. scope.condition = condition;
  192. scope.initial = initial;
  193. // Pass in the function text to execute within mongodb.
  194. const groupfn = groupFunction.replace(/ reduce;/, reduce.toString() + ';');
  195. evaluate(coll.s.db, new Code(groupfn, scope), null, options, (err, results) => {
  196. if (err) return handleCallback(callback, err, null);
  197. handleCallback(callback, null, results.result || results);
  198. });
  199. }
  200. }
  201. /**
  202. * Retrieve all the indexes on the collection.
  203. *
  204. * @method
  205. * @param {Collection} a Collection instance.
  206. * @param {Object} [options] Optional settings. See Collection.prototype.indexes for a list of options.
  207. * @param {Collection~resultCallback} [callback] The command result callback
  208. */
  209. function indexes(coll, options, callback) {
  210. options = Object.assign({}, { full: true }, options);
  211. indexInformationDb(coll.s.db, coll.collectionName, options, callback);
  212. }
  213. /**
  214. * Check if one or more indexes exist on the collection. This fails on the first index that doesn't exist.
  215. *
  216. * @method
  217. * @param {Collection} a Collection instance.
  218. * @param {(string|array)} indexes One or more index names to check.
  219. * @param {Object} [options] Optional settings. See Collection.prototype.indexExists for a list of options.
  220. * @param {Collection~resultCallback} [callback] The command result callback
  221. */
  222. function indexExists(coll, indexes, options, callback) {
  223. indexInformation(coll, options, (err, indexInformation) => {
  224. // If we have an error return
  225. if (err != null) return handleCallback(callback, err, null);
  226. // Let's check for the index names
  227. if (!Array.isArray(indexes))
  228. return handleCallback(callback, null, indexInformation[indexes] != null);
  229. // Check in list of indexes
  230. for (let i = 0; i < indexes.length; i++) {
  231. if (indexInformation[indexes[i]] == null) {
  232. return handleCallback(callback, null, false);
  233. }
  234. }
  235. // All keys found return true
  236. return handleCallback(callback, null, true);
  237. });
  238. }
  239. /**
  240. * Retrieve this collection's index info.
  241. *
  242. * @method
  243. * @param {Collection} a Collection instance.
  244. * @param {object} [options] Optional settings. See Collection.prototype.indexInformation for a list of options.
  245. * @param {Collection~resultCallback} [callback] The command result callback
  246. */
  247. function indexInformation(coll, options, callback) {
  248. indexInformationDb(coll.s.db, coll.collectionName, options, callback);
  249. }
  250. /**
  251. * Return N parallel cursors for a collection to allow parallel reading of the entire collection. There are
  252. * no ordering guarantees for returned results.
  253. *
  254. * @method
  255. * @param {Collection} a Collection instance.
  256. * @param {object} [options] Optional settings. See Collection.prototype.parallelCollectionScan for a list of options.
  257. * @param {Collection~parallelCollectionScanCallback} [callback] The command result callback
  258. */
  259. function parallelCollectionScan(coll, options, callback) {
  260. // Create command object
  261. const commandObject = {
  262. parallelCollectionScan: coll.collectionName,
  263. numCursors: options.numCursors
  264. };
  265. // Do we have a readConcern specified
  266. decorateWithReadConcern(commandObject, coll, options);
  267. // Store the raw value
  268. const raw = options.raw;
  269. delete options['raw'];
  270. // Execute the command
  271. executeCommand(coll.s.db, commandObject, options, (err, result) => {
  272. if (err) return handleCallback(callback, err, null);
  273. if (result == null)
  274. return handleCallback(
  275. callback,
  276. new Error('no result returned for parallelCollectionScan'),
  277. null
  278. );
  279. options = Object.assign({ explicitlyIgnoreSession: true }, options);
  280. const cursors = [];
  281. // Add the raw back to the option
  282. if (raw) options.raw = raw;
  283. // Create command cursors for each item
  284. for (let i = 0; i < result.cursors.length; i++) {
  285. const rawId = result.cursors[i].cursor.id;
  286. // Convert cursorId to Long if needed
  287. const cursorId = typeof rawId === 'number' ? Long.fromNumber(rawId) : rawId;
  288. // Add a command cursor
  289. cursors.push(coll.s.topology.cursor(coll.namespace, cursorId, options));
  290. }
  291. handleCallback(callback, null, cursors);
  292. });
  293. }
  294. /**
  295. * Save a document.
  296. *
  297. * @method
  298. * @param {Collection} a Collection instance.
  299. * @param {object} doc Document to save
  300. * @param {object} [options] Optional settings. See Collection.prototype.save for a list of options.
  301. * @param {Collection~writeOpCallback} [callback] The command result callback
  302. * @deprecated use insertOne, insertMany, updateOne or updateMany
  303. */
  304. function save(coll, doc, options, callback) {
  305. // Get the write concern options
  306. const finalOptions = applyWriteConcern(
  307. Object.assign({}, options),
  308. { db: coll.s.db, collection: coll },
  309. options
  310. );
  311. // Establish if we need to perform an insert or update
  312. if (doc._id != null) {
  313. finalOptions.upsert = true;
  314. return updateDocuments(coll, { _id: doc._id }, doc, finalOptions, callback);
  315. }
  316. // Insert the document
  317. insertDocuments(coll, [doc], finalOptions, (err, result) => {
  318. if (callback == null) return;
  319. if (doc == null) return handleCallback(callback, null, null);
  320. if (err) return handleCallback(callback, err, null);
  321. handleCallback(callback, null, result);
  322. });
  323. }
  324. module.exports = {
  325. checkForAtomicOperators,
  326. createIndex,
  327. createIndexes,
  328. ensureIndex,
  329. group,
  330. indexes,
  331. indexExists,
  332. indexInformation,
  333. parallelCollectionScan,
  334. save
  335. };