|
- 'use strict';
- const Transform = require('stream').Transform;
- const PassThrough = require('stream').PassThrough;
- const deprecate = require('util').deprecate;
- const handleCallback = require('./utils').handleCallback;
- const ReadPreference = require('./core').ReadPreference;
- const MongoError = require('./core').MongoError;
- const CoreCursor = require('./core/cursor').CoreCursor;
- const CursorState = require('./core/cursor').CursorState;
- const Map = require('./core').BSON.Map;
- const maybePromise = require('./utils').maybePromise;
- const executeOperation = require('./operations/execute_operation');
- const formattedOrderClause = require('./utils').formattedOrderClause;
- const Explain = require('./explain').Explain;
- const Aspect = require('./operations/operation').Aspect;
- const each = require('./operations/cursor_ops').each;
- const CountOperation = require('./operations/count');
- const flags = ['tailable', 'oplogReplay', 'noCursorTimeout', 'awaitData', 'exhaust', 'partial'];
- const fields = ['numberOfRetries', 'tailableRetryInterval'];
- class Cursor extends CoreCursor {
- constructor(topology, ns, cmd, options) {
- super(topology, ns, cmd, options);
- if (this.operation) {
- options = this.operation.options;
- }
-
- const numberOfRetries = options.numberOfRetries || 5;
- const tailableRetryInterval = options.tailableRetryInterval || 500;
- const currentNumberOfRetries = numberOfRetries;
-
- const promiseLibrary = options.promiseLibrary || Promise;
-
- this.s = {
-
- numberOfRetries: numberOfRetries,
- tailableRetryInterval: tailableRetryInterval,
- currentNumberOfRetries: currentNumberOfRetries,
-
- state: CursorState.INIT,
-
- promiseLibrary,
-
- explicitlyIgnoreSession: !!options.explicitlyIgnoreSession
- };
-
- if (!options.explicitlyIgnoreSession && options.session) {
- this.cursorState.session = options.session;
- }
-
- if (this.options.noCursorTimeout === true) {
- this.addCursorFlag('noCursorTimeout', true);
- }
-
- let batchSize = 1000;
- if (this.cmd.cursor && this.cmd.cursor.batchSize) {
- batchSize = this.cmd.cursor.batchSize;
- } else if (options.cursor && options.cursor.batchSize) {
- batchSize = options.cursor.batchSize;
- } else if (typeof options.batchSize === 'number') {
- batchSize = options.batchSize;
- }
-
- this.setCursorBatchSize(batchSize);
- }
- get readPreference() {
- if (this.operation) {
- return this.operation.readPreference;
- }
- return this.options.readPreference;
- }
- get sortValue() {
- return this.cmd.sort;
- }
- _initializeCursor(callback) {
- if (this.operation && this.operation.session != null) {
- this.cursorState.session = this.operation.session;
- } else {
-
- if (
- !this.s.explicitlyIgnoreSession &&
- !this.cursorState.session &&
- this.topology.hasSessionSupport()
- ) {
- this.cursorState.session = this.topology.startSession({ owner: this });
- if (this.operation) {
- this.operation.session = this.cursorState.session;
- }
- }
- }
- super._initializeCursor(callback);
- }
-
- hasNext(callback) {
- if (this.s.state === CursorState.CLOSED || (this.isDead && this.isDead())) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- return maybePromise(this, callback, cb => {
- const cursor = this;
- if (cursor.isNotified()) {
- return cb(null, false);
- }
- cursor._next((err, doc) => {
- if (err) return cb(err);
- if (doc == null || cursor.s.state === Cursor.CLOSED || cursor.isDead()) {
- return cb(null, false);
- }
- cursor.s.state = CursorState.OPEN;
-
- cursor.cursorState.cursorIndex--;
- if (cursor.cursorState.limit > 0) {
- cursor.cursorState.currentLimit--;
- }
- cb(null, true);
- });
- });
- }
-
- next(callback) {
- return maybePromise(this, callback, cb => {
- const cursor = this;
- if (cursor.s.state === CursorState.CLOSED || (cursor.isDead && cursor.isDead())) {
- cb(MongoError.create({ message: 'Cursor is closed', driver: true }));
- return;
- }
- if (cursor.s.state === CursorState.INIT && cursor.cmd.sort) {
- try {
- cursor.cmd.sort = formattedOrderClause(cursor.cmd.sort);
- } catch (err) {
- return cb(err);
- }
- }
- cursor._next((err, doc) => {
- if (err) return cb(err);
- cursor.s.state = CursorState.OPEN;
- cb(null, doc);
- });
- });
- }
-
- filter(filter) {
- if (this.s.state === CursorState.CLOSED || this.s.state === CursorState.OPEN || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- this.cmd.query = filter;
- return this;
- }
-
- maxScan(maxScan) {
- if (this.s.state === CursorState.CLOSED || this.s.state === CursorState.OPEN || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- this.cmd.maxScan = maxScan;
- return this;
- }
-
- hint(hint) {
- if (this.s.state === CursorState.CLOSED || this.s.state === CursorState.OPEN || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- this.cmd.hint = hint;
- return this;
- }
-
- min(min) {
- if (this.s.state === CursorState.CLOSED || this.s.state === CursorState.OPEN || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- this.cmd.min = min;
- return this;
- }
-
- max(max) {
- if (this.s.state === CursorState.CLOSED || this.s.state === CursorState.OPEN || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- this.cmd.max = max;
- return this;
- }
-
- returnKey(value) {
- if (this.s.state === CursorState.CLOSED || this.s.state === CursorState.OPEN || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- this.cmd.returnKey = value;
- return this;
- }
-
- showRecordId(value) {
- if (this.s.state === CursorState.CLOSED || this.s.state === CursorState.OPEN || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- this.cmd.showDiskLoc = value;
- return this;
- }
-
- snapshot(value) {
- if (this.s.state === CursorState.CLOSED || this.s.state === CursorState.OPEN || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- this.cmd.snapshot = value;
- return this;
- }
-
- setCursorOption(field, value) {
- if (this.s.state === CursorState.CLOSED || this.s.state === CursorState.OPEN || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- if (fields.indexOf(field) === -1) {
- throw MongoError.create({
- message: `option ${field} is not a supported option ${fields}`,
- driver: true
- });
- }
- this.s[field] = value;
- if (field === 'numberOfRetries') this.s.currentNumberOfRetries = value;
- return this;
- }
-
- addCursorFlag(flag, value) {
- if (this.s.state === CursorState.CLOSED || this.s.state === CursorState.OPEN || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- if (flags.indexOf(flag) === -1) {
- throw MongoError.create({
- message: `flag ${flag} is not a supported flag ${flags}`,
- driver: true
- });
- }
- if (typeof value !== 'boolean') {
- throw MongoError.create({ message: `flag ${flag} must be a boolean value`, driver: true });
- }
- this.cmd[flag] = value;
- return this;
- }
-
- addQueryModifier(name, value) {
- if (this.s.state === CursorState.CLOSED || this.s.state === CursorState.OPEN || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- if (name[0] !== '$') {
- throw MongoError.create({ message: `${name} is not a valid query modifier`, driver: true });
- }
-
- const field = name.substr(1);
-
- this.cmd[field] = value;
-
- if (field === 'orderby') this.cmd.sort = this.cmd[field];
- return this;
- }
-
- comment(value) {
- if (this.s.state === CursorState.CLOSED || this.s.state === CursorState.OPEN || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- this.cmd.comment = value;
- return this;
- }
-
- maxAwaitTimeMS(value) {
- if (typeof value !== 'number') {
- throw MongoError.create({ message: 'maxAwaitTimeMS must be a number', driver: true });
- }
- if (this.s.state === CursorState.CLOSED || this.s.state === CursorState.OPEN || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- this.cmd.maxAwaitTimeMS = value;
- return this;
- }
-
- maxTimeMS(value) {
- if (typeof value !== 'number') {
- throw MongoError.create({ message: 'maxTimeMS must be a number', driver: true });
- }
- if (this.s.state === CursorState.CLOSED || this.s.state === CursorState.OPEN || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- this.cmd.maxTimeMS = value;
- return this;
- }
-
- project(value) {
- if (this.s.state === CursorState.CLOSED || this.s.state === CursorState.OPEN || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- this.cmd.fields = value;
- return this;
- }
-
- sort(keyOrList, direction) {
- if (this.options.tailable) {
- throw MongoError.create({ message: "Tailable cursor doesn't support sorting", driver: true });
- }
- if (this.s.state === CursorState.CLOSED || this.s.state === CursorState.OPEN || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- let order = keyOrList;
-
-
- if (Array.isArray(order) && Array.isArray(order[0])) {
- order = new Map(
- order.map(x => {
- const value = [x[0], null];
- if (x[1] === 'asc') {
- value[1] = 1;
- } else if (x[1] === 'desc') {
- value[1] = -1;
- } else if (x[1] === 1 || x[1] === -1 || x[1].$meta) {
- value[1] = x[1];
- } else {
- throw new MongoError(
- "Illegal sort clause, must be of the form [['field1', '(ascending|descending)'], ['field2', '(ascending|descending)']]"
- );
- }
- return value;
- })
- );
- }
- if (direction != null) {
- order = [[keyOrList, direction]];
- }
- this.cmd.sort = order;
- return this;
- }
-
- batchSize(value) {
- if (this.options.tailable) {
- throw MongoError.create({
- message: "Tailable cursor doesn't support batchSize",
- driver: true
- });
- }
- if (this.s.state === CursorState.CLOSED || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- if (typeof value !== 'number') {
- throw MongoError.create({ message: 'batchSize requires an integer', driver: true });
- }
- this.cmd.batchSize = value;
- this.setCursorBatchSize(value);
- return this;
- }
-
- collation(value) {
- this.cmd.collation = value;
- return this;
- }
-
- limit(value) {
- if (this.options.tailable) {
- throw MongoError.create({ message: "Tailable cursor doesn't support limit", driver: true });
- }
- if (this.s.state === CursorState.OPEN || this.s.state === CursorState.CLOSED || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- if (typeof value !== 'number') {
- throw MongoError.create({ message: 'limit requires an integer', driver: true });
- }
- this.cmd.limit = value;
- this.setCursorLimit(value);
- return this;
- }
-
- skip(value) {
- if (this.options.tailable) {
- throw MongoError.create({ message: "Tailable cursor doesn't support skip", driver: true });
- }
- if (this.s.state === CursorState.OPEN || this.s.state === CursorState.CLOSED || this.isDead()) {
- throw MongoError.create({ message: 'Cursor is closed', driver: true });
- }
- if (typeof value !== 'number') {
- throw MongoError.create({ message: 'skip requires an integer', driver: true });
- }
- this.cmd.skip = value;
- this.setCursorSkip(value);
- return this;
- }
-
-
-
-
- each(callback) {
-
- this.rewind();
-
- this.s.state = CursorState.INIT;
-
- each(this, callback);
- }
-
-
-
- forEach(iterator, callback) {
-
- this.rewind();
-
- this.s.state = CursorState.INIT;
- if (typeof callback === 'function') {
- each(this, (err, doc) => {
- if (err) {
- callback(err);
- return false;
- }
- if (doc != null) {
- iterator(doc);
- return true;
- }
- if (doc == null && callback) {
- const internalCallback = callback;
- callback = null;
- internalCallback(null);
- return false;
- }
- });
- } else {
- return new this.s.promiseLibrary((fulfill, reject) => {
- each(this, (err, doc) => {
- if (err) {
- reject(err);
- return false;
- } else if (doc == null) {
- fulfill(null);
- return false;
- } else {
- iterator(doc);
- return true;
- }
- });
- });
- }
- }
-
- setReadPreference(readPreference) {
- if (this.s.state !== CursorState.INIT) {
- throw MongoError.create({
- message: 'cannot change cursor readPreference after cursor has been accessed',
- driver: true
- });
- }
- if (readPreference instanceof ReadPreference) {
- this.options.readPreference = readPreference;
- } else if (typeof readPreference === 'string') {
- this.options.readPreference = new ReadPreference(readPreference);
- } else {
- throw new TypeError('Invalid read preference: ' + readPreference);
- }
- return this;
- }
-
-
- toArray(callback) {
- if (this.options.tailable) {
- throw MongoError.create({
- message: 'Tailable cursor cannot be converted to array',
- driver: true
- });
- }
- return maybePromise(this, callback, cb => {
- const cursor = this;
- const items = [];
-
- cursor.rewind();
- cursor.s.state = CursorState.INIT;
-
- const fetchDocs = () => {
- cursor._next((err, doc) => {
- if (err) {
- return handleCallback(cb, err);
- }
- if (doc == null) {
- return cursor.close({ skipKillCursors: true }, () => handleCallback(cb, null, items));
- }
-
- items.push(doc);
-
- if (cursor.bufferedCount() > 0) {
- let docs = cursor.readBufferedDocuments(cursor.bufferedCount());
- Array.prototype.push.apply(items, docs);
- }
-
- fetchDocs();
- });
- };
- fetchDocs();
- });
- }
-
-
- count(applySkipLimit, opts, callback) {
- if (this.cmd.query == null)
- throw MongoError.create({
- message: 'count can only be used with find command',
- driver: true
- });
- if (typeof opts === 'function') (callback = opts), (opts = {});
- opts = opts || {};
- if (typeof applySkipLimit === 'function') {
- callback = applySkipLimit;
- applySkipLimit = true;
- }
- if (this.cursorState.session) {
- opts = Object.assign({}, opts, { session: this.cursorState.session });
- }
- const countOperation = new CountOperation(this, applySkipLimit, opts);
- return executeOperation(this.topology, countOperation, callback);
- }
-
- close(options, callback) {
- if (typeof options === 'function') (callback = options), (options = {});
- options = Object.assign({}, { skipKillCursors: false }, options);
- return maybePromise(this, callback, cb => {
- this.s.state = CursorState.CLOSED;
- if (!options.skipKillCursors) {
-
- this.kill();
- }
- this._endSession(() => {
- this.emit('close');
- cb(null, this);
- });
- });
- }
-
- map(transform) {
- if (this.cursorState.transforms && this.cursorState.transforms.doc) {
- const oldTransform = this.cursorState.transforms.doc;
- this.cursorState.transforms.doc = doc => {
- return transform(oldTransform(doc));
- };
- } else {
- this.cursorState.transforms = { doc: transform };
- }
- return this;
- }
-
- isClosed() {
- return this.isDead();
- }
- destroy(err) {
- if (err) this.emit('error', err);
- this.pause();
- this.close();
- }
-
- stream(options) {
- this.cursorState.streamOptions = options || {};
- return this;
- }
-
- transformStream(options) {
- const streamOptions = options || {};
- if (typeof streamOptions.transform === 'function') {
- const stream = new Transform({
- objectMode: true,
- transform: function(chunk, encoding, callback) {
- this.push(streamOptions.transform(chunk));
- callback();
- }
- });
- return this.pipe(stream);
- }
- return this.pipe(new PassThrough({ objectMode: true }));
- }
-
- explain(verbosity, callback) {
- if (typeof verbosity === 'function') (callback = verbosity), (verbosity = true);
- if (verbosity === undefined) verbosity = true;
- if (!this.operation || !this.operation.hasAspect(Aspect.EXPLAINABLE)) {
- throw new MongoError('This command cannot be explained');
- }
- this.operation.explain = new Explain(verbosity);
- return maybePromise(this, callback, cb => {
- CoreCursor.prototype._next.apply(this, [cb]);
- });
- }
-
- getLogger() {
- return this.logger;
- }
- }
- Cursor.prototype.maxTimeMs = Cursor.prototype.maxTimeMS;
- deprecate(Cursor.prototype.each, 'Cursor.each is deprecated. Use Cursor.forEach instead.');
- deprecate(
- Cursor.prototype.maxScan,
- 'Cursor.maxScan is deprecated, and will be removed in a later version'
- );
- deprecate(
- Cursor.prototype.snapshot,
- 'Cursor Snapshot is deprecated, and will be removed in a later version'
- );
- module.exports = Cursor;
|