QueryCursor.js 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363
  1. /*!
  2. * Module dependencies.
  3. */
  4. 'use strict';
  5. const Readable = require('stream').Readable;
  6. const promiseOrCallback = require('../helpers/promiseOrCallback');
  7. const eachAsync = require('../helpers/cursor/eachAsync');
  8. const helpers = require('../queryhelpers');
  9. const util = require('util');
  10. /**
  11. * A QueryCursor is a concurrency primitive for processing query results
  12. * one document at a time. A QueryCursor fulfills the Node.js streams3 API,
  13. * in addition to several other mechanisms for loading documents from MongoDB
  14. * one at a time.
  15. *
  16. * QueryCursors execute the model's pre find hooks, but **not** the model's
  17. * post find hooks.
  18. *
  19. * Unless you're an advanced user, do **not** instantiate this class directly.
  20. * Use [`Query#cursor()`](/docs/api.html#query_Query-cursor) instead.
  21. *
  22. * @param {Query} query
  23. * @param {Object} options query options passed to `.find()`
  24. * @inherits Readable
  25. * @event `cursor`: Emitted when the cursor is created
  26. * @event `error`: Emitted when an error occurred
  27. * @event `data`: Emitted when the stream is flowing and the next doc is ready
  28. * @event `end`: Emitted when the stream is exhausted
  29. * @api public
  30. */
  31. function QueryCursor(query, options) {
  32. Readable.call(this, { objectMode: true });
  33. this.cursor = null;
  34. this.query = query;
  35. const _this = this;
  36. const model = query.model;
  37. this._mongooseOptions = {};
  38. this._transforms = [];
  39. this.model = model;
  40. this.options = options || {};
  41. model.hooks.execPre('find', query, () => {
  42. this._transforms = this._transforms.concat(query._transforms.slice());
  43. if (this.options.transform) {
  44. this._transforms.push(options.transform);
  45. }
  46. // Re: gh-8039, you need to set the `cursor.batchSize` option, top-level
  47. // `batchSize` option doesn't work.
  48. if (this.options.batchSize) {
  49. this.options.cursor = options.cursor || {};
  50. this.options.cursor.batchSize = options.batchSize;
  51. }
  52. model.collection.find(query._conditions, this.options, function(err, cursor) {
  53. if (_this._error) {
  54. cursor.close(function() {});
  55. _this.listeners('error').length > 0 && _this.emit('error', _this._error);
  56. }
  57. if (err) {
  58. return _this.emit('error', err);
  59. }
  60. _this.cursor = cursor;
  61. _this.emit('cursor', cursor);
  62. });
  63. });
  64. }
  65. util.inherits(QueryCursor, Readable);
  66. /*!
  67. * Necessary to satisfy the Readable API
  68. */
  69. QueryCursor.prototype._read = function() {
  70. const _this = this;
  71. _next(this, function(error, doc) {
  72. if (error) {
  73. return _this.emit('error', error);
  74. }
  75. if (!doc) {
  76. _this.push(null);
  77. _this.cursor.close(function(error) {
  78. if (error) {
  79. return _this.emit('error', error);
  80. }
  81. setTimeout(function() {
  82. // on node >= 14 streams close automatically (gh-8834)
  83. const isNotClosedAutomatically = !_this.destroyed;
  84. if (isNotClosedAutomatically) {
  85. _this.emit('close');
  86. }
  87. }, 0);
  88. });
  89. return;
  90. }
  91. _this.push(doc);
  92. });
  93. };
  94. /**
  95. * Registers a transform function which subsequently maps documents retrieved
  96. * via the streams interface or `.next()`
  97. *
  98. * ####Example
  99. *
  100. * // Map documents returned by `data` events
  101. * Thing.
  102. * find({ name: /^hello/ }).
  103. * cursor().
  104. * map(function (doc) {
  105. * doc.foo = "bar";
  106. * return doc;
  107. * })
  108. * on('data', function(doc) { console.log(doc.foo); });
  109. *
  110. * // Or map documents returned by `.next()`
  111. * var cursor = Thing.find({ name: /^hello/ }).
  112. * cursor().
  113. * map(function (doc) {
  114. * doc.foo = "bar";
  115. * return doc;
  116. * });
  117. * cursor.next(function(error, doc) {
  118. * console.log(doc.foo);
  119. * });
  120. *
  121. * @param {Function} fn
  122. * @return {QueryCursor}
  123. * @api public
  124. * @method map
  125. */
  126. QueryCursor.prototype.map = function(fn) {
  127. this._transforms.push(fn);
  128. return this;
  129. };
  130. /*!
  131. * Marks this cursor as errored
  132. */
  133. QueryCursor.prototype._markError = function(error) {
  134. this._error = error;
  135. return this;
  136. };
  137. /**
  138. * Marks this cursor as closed. Will stop streaming and subsequent calls to
  139. * `next()` will error.
  140. *
  141. * @param {Function} callback
  142. * @return {Promise}
  143. * @api public
  144. * @method close
  145. * @emits close
  146. * @see MongoDB driver cursor#close http://mongodb.github.io/node-mongodb-native/2.1/api/Cursor.html#close
  147. */
  148. QueryCursor.prototype.close = function(callback) {
  149. return promiseOrCallback(callback, cb => {
  150. this.cursor.close(error => {
  151. if (error) {
  152. cb(error);
  153. return this.listeners('error').length > 0 && this.emit('error', error);
  154. }
  155. this.emit('close');
  156. cb(null);
  157. });
  158. }, this.model.events);
  159. };
  160. /**
  161. * Get the next document from this cursor. Will return `null` when there are
  162. * no documents left.
  163. *
  164. * @param {Function} callback
  165. * @return {Promise}
  166. * @api public
  167. * @method next
  168. */
  169. QueryCursor.prototype.next = function(callback) {
  170. return promiseOrCallback(callback, cb => {
  171. _next(this, function(error, doc) {
  172. if (error) {
  173. return cb(error);
  174. }
  175. cb(null, doc);
  176. });
  177. }, this.model.events);
  178. };
  179. /**
  180. * Execute `fn` for every document in the cursor. If `fn` returns a promise,
  181. * will wait for the promise to resolve before iterating on to the next one.
  182. * Returns a promise that resolves when done.
  183. *
  184. * ####Example
  185. *
  186. * // Iterate over documents asynchronously
  187. * Thing.
  188. * find({ name: /^hello/ }).
  189. * cursor().
  190. * eachAsync(async function (doc, i) {
  191. * doc.foo = doc.bar + i;
  192. * await doc.save();
  193. * })
  194. *
  195. * @param {Function} fn
  196. * @param {Object} [options]
  197. * @param {Number} [options.parallel] the number of promises to execute in parallel. Defaults to 1.
  198. * @param {Function} [callback] executed when all docs have been processed
  199. * @return {Promise}
  200. * @api public
  201. * @method eachAsync
  202. */
  203. QueryCursor.prototype.eachAsync = function(fn, opts, callback) {
  204. const _this = this;
  205. if (typeof opts === 'function') {
  206. callback = opts;
  207. opts = {};
  208. }
  209. opts = opts || {};
  210. return eachAsync(function(cb) { return _next(_this, cb); }, fn, opts, callback);
  211. };
  212. /**
  213. * The `options` passed in to the `QueryCursor` constructor.
  214. *
  215. * @api public
  216. * @property options
  217. */
  218. QueryCursor.prototype.options;
  219. /**
  220. * Adds a [cursor flag](http://mongodb.github.io/node-mongodb-native/2.2/api/Cursor.html#addCursorFlag).
  221. * Useful for setting the `noCursorTimeout` and `tailable` flags.
  222. *
  223. * @param {String} flag
  224. * @param {Boolean} value
  225. * @return {AggregationCursor} this
  226. * @api public
  227. * @method addCursorFlag
  228. */
  229. QueryCursor.prototype.addCursorFlag = function(flag, value) {
  230. const _this = this;
  231. _waitForCursor(this, function() {
  232. _this.cursor.addCursorFlag(flag, value);
  233. });
  234. return this;
  235. };
  236. /*!
  237. * ignore
  238. */
  239. QueryCursor.prototype.transformNull = function(val) {
  240. if (arguments.length === 0) {
  241. val = true;
  242. }
  243. this._mongooseOptions.transformNull = val;
  244. return this;
  245. };
  246. /*!
  247. * Get the next doc from the underlying cursor and mongooseify it
  248. * (populate, etc.)
  249. */
  250. function _next(ctx, cb) {
  251. let callback = cb;
  252. if (ctx._transforms.length) {
  253. callback = function(err, doc) {
  254. if (err || (doc === null && !ctx._mongooseOptions.transformNull)) {
  255. return cb(err, doc);
  256. }
  257. cb(err, ctx._transforms.reduce(function(doc, fn) {
  258. return fn.call(ctx, doc);
  259. }, doc));
  260. };
  261. }
  262. if (ctx._error) {
  263. return process.nextTick(function() {
  264. callback(ctx._error);
  265. });
  266. }
  267. if (ctx.cursor) {
  268. return ctx.cursor.next(function(error, doc) {
  269. if (error) {
  270. return callback(error);
  271. }
  272. if (!doc) {
  273. return callback(null, null);
  274. }
  275. const opts = ctx.query._mongooseOptions;
  276. if (!opts.populate) {
  277. return opts.lean ?
  278. callback(null, doc) :
  279. _create(ctx, doc, null, callback);
  280. }
  281. const pop = helpers.preparePopulationOptionsMQ(ctx.query,
  282. ctx.query._mongooseOptions);
  283. pop.__noPromise = true;
  284. ctx.query.model.populate(doc, pop, function(err, doc) {
  285. if (err) {
  286. return callback(err);
  287. }
  288. return opts.lean ?
  289. callback(null, doc) :
  290. _create(ctx, doc, pop, callback);
  291. });
  292. });
  293. } else {
  294. ctx.once('cursor', function() {
  295. _next(ctx, cb);
  296. });
  297. }
  298. }
  299. /*!
  300. * ignore
  301. */
  302. function _waitForCursor(ctx, cb) {
  303. if (ctx.cursor) {
  304. return cb();
  305. }
  306. ctx.once('cursor', function() {
  307. cb();
  308. });
  309. }
  310. /*!
  311. * Convert a raw doc into a full mongoose doc.
  312. */
  313. function _create(ctx, doc, populatedIds, cb) {
  314. const instance = helpers.createModel(ctx.query.model, doc, ctx.query._fields);
  315. const opts = populatedIds ?
  316. { populated: populatedIds } :
  317. undefined;
  318. instance.init(doc, opts, function(err) {
  319. if (err) {
  320. return cb(err);
  321. }
  322. cb(null, instance);
  323. });
  324. }
  325. module.exports = QueryCursor;