123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137 |
- 'use strict';
- const Transform = require('stream').Transform;
- const PassThrough = require('stream').PassThrough;
- const deprecate = require('util').deprecate;
- const handleCallback = require('./utils').handleCallback;
- const ReadPreference = require('./core').ReadPreference;
- const MongoError = require('./core').MongoError;
- const CoreCursor = require('./core/cursor').CoreCursor;
- const CursorState = require('./core/cursor').CursorState;
- const Map = require('./core').BSON.Map;
- const maybePromise = require('./utils').maybePromise;
- const executeOperation = require('./operations/execute_operation');
- const formattedOrderClause = require('./utils').formattedOrderClause;
- const Explain = require('./explain').Explain;
- const Aspect = require('./operations/operation').Aspect;
- const each = require('./operations/cursor_ops').each;
- const CountOperation = require('./operations/count');
- const flags = ['tailable', 'oplogReplay', 'noCursorTimeout', 'awaitData', 'exhaust', 'partial'];
- const fields = ['numberOfRetries', 'tailableRetryInterval'];
- class Cursor extends CoreCursor {
- constructor(topology, ns, cmd, options) {
- super(topology, ns, cmd, options);
- if (this.operation) {
- options = this.operation.options;
- }
-
- const numberOfRetries = options.numberOfRetries || 5;
- const tailableRetryInterval = options.tailableRetryInterval || 500;
- const currentNumberOfRetries = numberOfRetries;
-
- const promiseLibrary = options.promiseLibrary || Promise;
-
- this.s = {
-
- numberOfRetries: numberOfRetries,
- tailableRetryInterval: tailableRetryInterval,
- currentNumberOfRetries: currentNumberOfRetries,
-
- state: CursorState.INIT,
-
- promiseLibrary,
-
- explicitlyIgnoreSession: !!options.explicitlyIgnoreSession
- };
-
- if (!options.explicitlyIgnoreSession && options.session) {
- this.cursorState.session = options.session;
- }
-
- if (this.options.noCursorTimeout === true) {
- this.addCursorFlag('noCursorTimeout', true);
- }
-
- let batchSize = 1000;
- if (this.cmd.cursor && this.cmd.cursor.batchSize) {
- batchSize = this.cmd.cursor.batchSize;
- } else if (options.cursor && options.cursor.batchSize) {
- batchSize = options.cursor.batchSize;
- } else if (typeof options.batchSize === 'number') {
- batchSize = options.batchSize;
- }
-
- this.setCursorBatchSize(batchSize);
- }
- get readPreference() {
- if (this.operation) {
- return this.operation.readPreference;
- }
- return this.options.readPreference;
- }
- get sortValue() {
- return this.cmd.sort;
- }
- _initializeCursor(callback) {
- if (this.operation && this.operation.session != null) {
- this.cursorState.session = this.operation.session;
- } else {
-
- if (
- !this.s.explicitlyIgnoreSession &&
- !this.cursorState.session &&
- this.topology.hasSessionSupport()
- ) {
- this.cursorState.session = this.topology.startSession({ owner: this });
- if (this.operation) {
- this.operation.session = this.cursorState.session;
- }
- }
- }
- super._initializeCursor(callback);
- }
-
- hasNext(callback) {
- if (this.s.state === CursorState.CLOSED || (this.isDead && this.isDead())) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- return maybePromise(this, callback, cb => {
- const cursor = this;
- if (cursor.isNotified()) {
- return cb(null, false);
- }
- cursor._next((err, doc) => {
- if (err) return cb(err);
- if (doc == null || cursor.s.state === Cursor.CLOSED || cursor.isDead()) {
- return cb(null, false);
- }
- cursor.s.state = CursorState.OPEN;
-
- cursor.cursorState.cursorIndex--;
- if (cursor.cursorState.limit > 0) {
- cursor.cursorState.currentLimit--;
- }
- cb(null, true);
- });
- });
- }
-
- next(callback) {
- return maybePromise(this, callback, cb => {
- const cursor = this;
- if (cursor.s.state === CursorState.CLOSED || (cursor.isDead && cursor.isDead())) {
- cb(MongoError.create({ message: 'Cursor is closed', driver: true }));
- return;
- }
- if (cursor.s.state === CursorState.INIT && cursor.cmd.sort) {
- try {
- cursor.cmd.sort = formattedOrderClause(cursor.cmd.sort);
- } catch (err) {
- return cb(err);
- }
- }
- cursor._next((err, doc) => {
- if (err) return cb(err);
- cursor.s.state = CursorState.OPEN;
- cb(null, doc);
- });
- });
- }
-
- filter(filter) {
- if (this.s.state === CursorState.CLOSED || this.s.state === CursorState.OPEN || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- this.cmd.query = filter;
- return this;
- }
-
- maxScan(maxScan) {
- if (this.s.state === CursorState.CLOSED || this.s.state === CursorState.OPEN || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- this.cmd.maxScan = maxScan;
- return this;
- }
-
- hint(hint) {
- if (this.s.state === CursorState.CLOSED || this.s.state === CursorState.OPEN || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- this.cmd.hint = hint;
- return this;
- }
-
- min(min) {
- if (this.s.state === CursorState.CLOSED || this.s.state === CursorState.OPEN || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- this.cmd.min = min;
- return this;
- }
-
- max(max) {
- if (this.s.state === CursorState.CLOSED || this.s.state === CursorState.OPEN || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- this.cmd.max = max;
- return this;
- }
-
- returnKey(value) {
- if (this.s.state === CursorState.CLOSED || this.s.state === CursorState.OPEN || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- this.cmd.returnKey = value;
- return this;
- }
-
- showRecordId(value) {
- if (this.s.state === CursorState.CLOSED || this.s.state === CursorState.OPEN || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- this.cmd.showDiskLoc = value;
- return this;
- }
-
- snapshot(value) {
- if (this.s.state === CursorState.CLOSED || this.s.state === CursorState.OPEN || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- this.cmd.snapshot = value;
- return this;
- }
-
- setCursorOption(field, value) {
- if (this.s.state === CursorState.CLOSED || this.s.state === CursorState.OPEN || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- if (fields.indexOf(field) === -1) {
- throw MongoError.create({
- message: `option ${field} is not a supported option ${fields}`,
- driver: true
- });
- }
- this.s[field] = value;
- if (field === 'numberOfRetries') this.s.currentNumberOfRetries = value;
- return this;
- }
-
- addCursorFlag(flag, value) {
- if (this.s.state === CursorState.CLOSED || this.s.state === CursorState.OPEN || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- if (flags.indexOf(flag) === -1) {
- throw MongoError.create({
- message: `flag ${flag} is not a supported flag ${flags}`,
- driver: true
- });
- }
- if (typeof value !== 'boolean') {
- throw MongoError.create({ message: `flag ${flag} must be a boolean value`, driver: true });
- }
- this.cmd[flag] = value;
- return this;
- }
-
- addQueryModifier(name, value) {
- if (this.s.state === CursorState.CLOSED || this.s.state === CursorState.OPEN || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- if (name[0] !== '$') {
- throw MongoError.create({ message: `${name} is not a valid query modifier`, driver: true });
- }
-
- const field = name.substr(1);
-
- this.cmd[field] = value;
-
- if (field === 'orderby') this.cmd.sort = this.cmd[field];
- return this;
- }
-
- comment(value) {
- if (this.s.state === CursorState.CLOSED || this.s.state === CursorState.OPEN || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- this.cmd.comment = value;
- return this;
- }
-
- maxAwaitTimeMS(value) {
- if (typeof value !== 'number') {
- throw MongoError.create({ message: 'maxAwaitTimeMS must be a number', driver: true });
- }
- if (this.s.state === CursorState.CLOSED || this.s.state === CursorState.OPEN || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- this.cmd.maxAwaitTimeMS = value;
- return this;
- }
-
- maxTimeMS(value) {
- if (typeof value !== 'number') {
- throw MongoError.create({ message: 'maxTimeMS must be a number', driver: true });
- }
- if (this.s.state === CursorState.CLOSED || this.s.state === CursorState.OPEN || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- this.cmd.maxTimeMS = value;
- return this;
- }
-
- project(value) {
- if (this.s.state === CursorState.CLOSED || this.s.state === CursorState.OPEN || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- this.cmd.fields = value;
- return this;
- }
-
- sort(keyOrList, direction) {
- if (this.options.tailable) {
- throw MongoError.create({ message: "Tailable cursor doesn't support sorting", driver: true });
- }
- if (this.s.state === CursorState.CLOSED || this.s.state === CursorState.OPEN || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- let order = keyOrList;
-
-
- if (Array.isArray(order) && Array.isArray(order[0])) {
- order = new Map(
- order.map(x => {
- const value = [x[0], null];
- if (x[1] === 'asc') {
- value[1] = 1;
- } else if (x[1] === 'desc') {
- value[1] = -1;
- } else if (x[1] === 1 || x[1] === -1 || x[1].$meta) {
- value[1] = x[1];
- } else {
- throw new MongoError(
- "Illegal sort clause, must be of the form [['field1', '(ascending|descending)'], ['field2', '(ascending|descending)']]"
- );
- }
- return value;
- })
- );
- }
- if (direction != null) {
- order = [[keyOrList, direction]];
- }
- this.cmd.sort = order;
- return this;
- }
-
- batchSize(value) {
- if (this.options.tailable) {
- throw MongoError.create({
- message: "Tailable cursor doesn't support batchSize",
- driver: true
- });
- }
- if (this.s.state === CursorState.CLOSED || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- if (typeof value !== 'number') {
- throw MongoError.create({ message: 'batchSize requires an integer', driver: true });
- }
- this.cmd.batchSize = value;
- this.setCursorBatchSize(value);
- return this;
- }
-
- collation(value) {
- this.cmd.collation = value;
- return this;
- }
-
- limit(value) {
- if (this.options.tailable) {
- throw MongoError.create({ message: "Tailable cursor doesn't support limit", driver: true });
- }
- if (this.s.state === CursorState.OPEN || this.s.state === CursorState.CLOSED || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- if (typeof value !== 'number') {
- throw MongoError.create({ message: 'limit requires an integer', driver: true });
- }
- this.cmd.limit = value;
- this.setCursorLimit(value);
- return this;
- }
-
- skip(value) {
- if (this.options.tailable) {
- throw MongoError.create({ message: "Tailable cursor doesn't support skip", driver: true });
- }
- if (this.s.state === CursorState.OPEN || this.s.state === CursorState.CLOSED || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- if (typeof value !== 'number') {
- throw MongoError.create({ message: 'skip requires an integer', driver: true });
- }
- this.cmd.skip = value;
- this.setCursorSkip(value);
- return this;
- }
-
-
-
-
- each(callback) {
-
- this.rewind();
-
- this.s.state = CursorState.INIT;
-
- each(this, callback);
- }
-
-
-
- forEach(iterator, callback) {
-
- this.rewind();
-
- this.s.state = CursorState.INIT;
- if (typeof callback === 'function') {
- each(this, (err, doc) => {
- if (err) {
- callback(err);
- return false;
- }
- if (doc != null) {
- iterator(doc);
- return true;
- }
- if (doc == null && callback) {
- const internalCallback = callback;
- callback = null;
- internalCallback(null);
- return false;
- }
- });
- } else {
- return new this.s.promiseLibrary((fulfill, reject) => {
- each(this, (err, doc) => {
- if (err) {
- reject(err);
- return false;
- } else if (doc == null) {
- fulfill(null);
- return false;
- } else {
- iterator(doc);
- return true;
- }
- });
- });
- }
- }
-
- setReadPreference(readPreference) {
- if (this.s.state !== CursorState.INIT) {
- throw MongoError.create({
- message: 'cannot change cursor readPreference after cursor has been accessed',
- driver: true
- });
- }
- if (readPreference instanceof ReadPreference) {
- this.options.readPreference = readPreference;
- } else if (typeof readPreference === 'string') {
- this.options.readPreference = new ReadPreference(readPreference);
- } else {
- throw new TypeError('Invalid read preference: ' + readPreference);
- }
- return this;
- }
-
-
- toArray(callback) {
- if (this.options.tailable) {
- throw MongoError.create({
- message: 'Tailable cursor cannot be converted to array',
- driver: true
- });
- }
- return maybePromise(this, callback, cb => {
- const cursor = this;
- const items = [];
-
- cursor.rewind();
- cursor.s.state = CursorState.INIT;
-
- const fetchDocs = () => {
- cursor._next((err, doc) => {
- if (err) {
- return handleCallback(cb, err);
- }
- if (doc == null) {
- return cursor.close({ skipKillCursors: true }, () => handleCallback(cb, null, items));
- }
-
- items.push(doc);
-
- if (cursor.bufferedCount() > 0) {
- let docs = cursor.readBufferedDocuments(cursor.bufferedCount());
- Array.prototype.push.apply(items, docs);
- }
-
- fetchDocs();
- });
- };
- fetchDocs();
- });
- }
-
-
- count(applySkipLimit, opts, callback) {
- if (this.cmd.query == null)
- throw MongoError.create({
- message: 'count can only be used with find command',
- driver: true
- });
- if (typeof opts === 'function') (callback = opts), (opts = {});
- opts = opts || {};
- if (typeof applySkipLimit === 'function') {
- callback = applySkipLimit;
- applySkipLimit = true;
- }
- if (this.cursorState.session) {
- opts = Object.assign({}, opts, { session: this.cursorState.session });
- }
- const countOperation = new CountOperation(this, applySkipLimit, opts);
- return executeOperation(this.topology, countOperation, callback);
- }
-
- close(options, callback) {
- if (typeof options === 'function') (callback = options), (options = {});
- options = Object.assign({}, { skipKillCursors: false }, options);
- return maybePromise(this, callback, cb => {
- this.s.state = CursorState.CLOSED;
- if (!options.skipKillCursors) {
-
- this.kill();
- }
- this._endSession(() => {
- this.emit('close');
- cb(null, this);
- });
- });
- }
-
- map(transform) {
- if (this.cursorState.transforms && this.cursorState.transforms.doc) {
- const oldTransform = this.cursorState.transforms.doc;
- this.cursorState.transforms.doc = doc => {
- return transform(oldTransform(doc));
- };
- } else {
- this.cursorState.transforms = { doc: transform };
- }
- return this;
- }
-
- isClosed() {
- return this.isDead();
- }
- destroy(err) {
- if (err) this.emit('error', err);
- this.pause();
- this.close();
- }
-
- stream(options) {
- this.cursorState.streamOptions = options || {};
- return this;
- }
-
- transformStream(options) {
- const streamOptions = options || {};
- if (typeof streamOptions.transform === 'function') {
- const stream = new Transform({
- objectMode: true,
- transform: function(chunk, encoding, callback) {
- this.push(streamOptions.transform(chunk));
- callback();
- }
- });
- return this.pipe(stream);
- }
- return this.pipe(new PassThrough({ objectMode: true }));
- }
-
- explain(verbosity, callback) {
- if (typeof verbosity === 'function') (callback = verbosity), (verbosity = true);
- if (verbosity === undefined) verbosity = true;
- if (!this.operation || !this.operation.hasAspect(Aspect.EXPLAINABLE)) {
- throw new MongoError('This command cannot be explained');
- }
- this.operation.explain = new Explain(verbosity);
- return maybePromise(this, callback, cb => {
- CoreCursor.prototype._next.apply(this, [cb]);
- });
- }
-
- getLogger() {
- return this.logger;
- }
- }
- Cursor.prototype.maxTimeMs = Cursor.prototype.maxTimeMS;
- deprecate(Cursor.prototype.each, 'Cursor.each is deprecated. Use Cursor.forEach instead.');
- deprecate(
- Cursor.prototype.maxScan,
- 'Cursor.maxScan is deprecated, and will be removed in a later version'
- );
- deprecate(
- Cursor.prototype.snapshot,
- 'Cursor Snapshot is deprecated, and will be removed in a later version'
- );
- module.exports = Cursor;
|