123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623 |
- 'use strict';
- const Denque = require('denque');
- const EventEmitter = require('events');
- const isResumableError = require('./error').isResumableError;
- const MongoError = require('./core').MongoError;
- const Cursor = require('./cursor');
- const relayEvents = require('./core/utils').relayEvents;
- const maxWireVersion = require('./core/utils').maxWireVersion;
- const maybePromise = require('./utils').maybePromise;
- const now = require('./utils').now;
- const calculateDurationInMs = require('./utils').calculateDurationInMs;
- const AggregateOperation = require('./operations/aggregate');
- const kResumeQueue = Symbol('resumeQueue');
- const CHANGE_STREAM_OPTIONS = ['resumeAfter', 'startAfter', 'startAtOperationTime', 'fullDocument'];
- const CURSOR_OPTIONS = ['batchSize', 'maxAwaitTimeMS', 'collation', 'readPreference'].concat(
- CHANGE_STREAM_OPTIONS
- );
- const CHANGE_DOMAIN_TYPES = {
- COLLECTION: Symbol('Collection'),
- DATABASE: Symbol('Database'),
- CLUSTER: Symbol('Cluster')
- };
- /**
- * @typedef ResumeToken
- * @description Represents the logical starting point for a new or resuming {@link ChangeStream} on the server.
- * @see https://docs.mongodb.com/master/changeStreams/#change-stream-resume-token
- */
- /**
- * @typedef OperationTime
- * @description Represents a specific point in time on a server. Can be retrieved by using {@link Db#command}
- * @see https://docs.mongodb.com/manual/reference/method/db.runCommand/#response
- */
- /**
- * @typedef ChangeStreamOptions
- * @description Options that can be passed to a ChangeStream. Note that startAfter, resumeAfter, and startAtOperationTime are all mutually exclusive, and the server will error if more than one is specified.
- * @property {string} [fullDocument='default'] Allowed values: ‘default’, ‘updateLookup’. When set to ‘updateLookup’, the change stream will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred.
- * @property {number} [maxAwaitTimeMS] The maximum amount of time for the server to wait on new documents to satisfy a change stream query.
- * @property {ResumeToken} [resumeAfter] Allows you to start a changeStream after a specified event. See {@link https://docs.mongodb.com/master/changeStreams/#resumeafter-for-change-streams|ChangeStream documentation}.
- * @property {ResumeToken} [startAfter] Similar to resumeAfter, but will allow you to start after an invalidated event. See {@link https://docs.mongodb.com/master/changeStreams/#startafter-for-change-streams|ChangeStream documentation}.
- * @property {OperationTime} [startAtOperationTime] Will start the changeStream after the specified operationTime.
- * @property {number} [batchSize=1000] The number of documents to return per batch. See {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}.
- * @property {object} [collation] Specify collation settings for operation. See {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}.
- * @property {ReadPreference} [readPreference] The read preference. Defaults to the read preference of the database or collection. See {@link https://docs.mongodb.com/manual/reference/read-preference|read preference documentation}.
- */
- /**
- * Creates a new Change Stream instance. Normally created using {@link Collection#watch|Collection.watch()}.
- * @class ChangeStream
- * @since 3.0.0
- * @param {(MongoClient|Db|Collection)} parent The parent object that created this change stream
- * @param {Array} pipeline An array of {@link https://docs.mongodb.com/manual/reference/operator/aggregation-pipeline/|aggregation pipeline stages} through which to pass change stream documents
- * @param {ChangeStreamOptions} [options] Optional settings
- * @fires ChangeStream#close
- * @fires ChangeStream#change
- * @fires ChangeStream#end
- * @fires ChangeStream#error
- * @fires ChangeStream#resumeTokenChanged
- * @return {ChangeStream} a ChangeStream instance.
- */
- class ChangeStream extends EventEmitter {
- constructor(parent, pipeline, options) {
- super();
- const Collection = require('./collection');
- const Db = require('./db');
- const MongoClient = require('./mongo_client');
- this.pipeline = pipeline || [];
- this.options = options || {};
- this.parent = parent;
- this.namespace = parent.s.namespace;
- if (parent instanceof Collection) {
- this.type = CHANGE_DOMAIN_TYPES.COLLECTION;
- this.topology = parent.s.db.serverConfig;
- } else if (parent instanceof Db) {
- this.type = CHANGE_DOMAIN_TYPES.DATABASE;
- this.topology = parent.serverConfig;
- } else if (parent instanceof MongoClient) {
- this.type = CHANGE_DOMAIN_TYPES.CLUSTER;
- this.topology = parent.topology;
- } else {
- throw new TypeError(
- 'parent provided to ChangeStream constructor is not an instance of Collection, Db, or MongoClient'
- );
- }
- this.promiseLibrary = parent.s.promiseLibrary;
- if (!this.options.readPreference && parent.s.readPreference) {
- this.options.readPreference = parent.s.readPreference;
- }
- this[kResumeQueue] = new Denque();
- // Create contained Change Stream cursor
- this.cursor = createChangeStreamCursor(this, options);
- this.closed = false;
- // Listen for any `change` listeners being added to ChangeStream
- this.on('newListener', eventName => {
- if (eventName === 'change' && this.cursor && this.listenerCount('change') === 0) {
- this.cursor.on('data', change => processNewChange(this, change));
- }
- });
- // Listen for all `change` listeners being removed from ChangeStream
- this.on('removeListener', eventName => {
- if (eventName === 'change' && this.listenerCount('change') === 0 && this.cursor) {
- this.cursor.removeAllListeners('data');
- }
- });
- }
- /**
- * @property {ResumeToken} resumeToken
- * The cached resume token that will be used to resume
- * after the most recently returned change.
- */
- get resumeToken() {
- return this.cursor.resumeToken;
- }
- /**
- * Check if there is any document still available in the Change Stream
- * @function ChangeStream.prototype.hasNext
- * @param {ChangeStream~resultCallback} [callback] The result callback.
- * @throws {MongoError}
- * @returns {Promise|void} returns Promise if no callback passed
- */
- hasNext(callback) {
- return maybePromise(this.parent, callback, cb => {
- getCursor(this, (err, cursor) => {
- if (err) return cb(err); // failed to resume, raise an error
- cursor.hasNext(cb);
- });
- });
- }
- /**
- * Get the next available document from the Change Stream, returns null if no more documents are available.
- * @function ChangeStream.prototype.next
- * @param {ChangeStream~resultCallback} [callback] The result callback.
- * @throws {MongoError}
- * @returns {Promise|void} returns Promise if no callback passed
- */
- next(callback) {
- return maybePromise(this.parent, callback, cb => {
- getCursor(this, (err, cursor) => {
- if (err) return cb(err); // failed to resume, raise an error
- cursor.next((error, change) => {
- if (error) {
- this[kResumeQueue].push(() => this.next(cb));
- processError(this, error, cb);
- return;
- }
- processNewChange(this, change, cb);
- });
- });
- });
- }
- /**
- * Is the change stream closed
- * @method ChangeStream.prototype.isClosed
- * @return {boolean}
- */
- isClosed() {
- return this.closed || (this.cursor && this.cursor.isClosed());
- }
- /**
- * Close the Change Stream
- * @method ChangeStream.prototype.close
- * @param {ChangeStream~resultCallback} [callback] The result callback.
- * @return {Promise} returns Promise if no callback passed
- */
- close(callback) {
- return maybePromise(this.parent, callback, cb => {
- if (this.closed) return cb();
- // flag the change stream as explicitly closed
- this.closed = true;
- if (!this.cursor) return cb();
- // Tidy up the existing cursor
- const cursor = this.cursor;
- return cursor.close(err => {
- ['data', 'close', 'end', 'error'].forEach(event => cursor.removeAllListeners(event));
- this.cursor = undefined;
- return cb(err);
- });
- });
- }
- /**
- * This method pulls all the data out of a readable stream, and writes it to the supplied destination, automatically managing the flow so that the destination is not overwhelmed by a fast readable stream.
- * @method
- * @param {Writable} destination The destination for writing data
- * @param {object} [options] {@link https://nodejs.org/api/stream.html#stream_readable_pipe_destination_options|Pipe options}
- * @return {null}
- */
- pipe(destination, options) {
- if (!this.pipeDestinations) {
- this.pipeDestinations = [];
- }
- this.pipeDestinations.push(destination);
- return this.cursor.pipe(destination, options);
- }
- /**
- * This method will remove the hooks set up for a previous pipe() call.
- * @param {Writable} [destination] The destination for writing data
- * @return {null}
- */
- unpipe(destination) {
- if (this.pipeDestinations && this.pipeDestinations.indexOf(destination) > -1) {
- this.pipeDestinations.splice(this.pipeDestinations.indexOf(destination), 1);
- }
- return this.cursor.unpipe(destination);
- }
- /**
- * Return a modified Readable stream including a possible transform method.
- * @method
- * @param {object} [options] Optional settings.
- * @param {function} [options.transform] A transformation method applied to each document emitted by the stream.
- * @return {Cursor}
- */
- stream(options) {
- this.streamOptions = options;
- return this.cursor.stream(options);
- }
- /**
- * This method will cause a stream in flowing mode to stop emitting data events. Any data that becomes available will remain in the internal buffer.
- * @return {null}
- */
- pause() {
- return this.cursor.pause();
- }
- /**
- * This method will cause the readable stream to resume emitting data events.
- * @return {null}
- */
- resume() {
- return this.cursor.resume();
- }
- }
- class ChangeStreamCursor extends Cursor {
- constructor(topology, operation, options) {
- super(topology, operation, options);
- options = options || {};
- this._resumeToken = null;
- this.startAtOperationTime = options.startAtOperationTime;
- if (options.startAfter) {
- this.resumeToken = options.startAfter;
- } else if (options.resumeAfter) {
- this.resumeToken = options.resumeAfter;
- }
- }
- set resumeToken(token) {
- this._resumeToken = token;
- this.emit('resumeTokenChanged', token);
- }
- get resumeToken() {
- return this._resumeToken;
- }
- get resumeOptions() {
- const result = {};
- for (const optionName of CURSOR_OPTIONS) {
- if (this.options[optionName]) result[optionName] = this.options[optionName];
- }
- if (this.resumeToken || this.startAtOperationTime) {
- ['resumeAfter', 'startAfter', 'startAtOperationTime'].forEach(key => delete result[key]);
- if (this.resumeToken) {
- const resumeKey =
- this.options.startAfter && !this.hasReceived ? 'startAfter' : 'resumeAfter';
- result[resumeKey] = this.resumeToken;
- } else if (this.startAtOperationTime && maxWireVersion(this.server) >= 7) {
- result.startAtOperationTime = this.startAtOperationTime;
- }
- }
- return result;
- }
- cacheResumeToken(resumeToken) {
- if (this.bufferedCount() === 0 && this.cursorState.postBatchResumeToken) {
- this.resumeToken = this.cursorState.postBatchResumeToken;
- } else {
- this.resumeToken = resumeToken;
- }
- this.hasReceived = true;
- }
- _processBatch(batchName, response) {
- const cursor = response.cursor;
- if (cursor.postBatchResumeToken) {
- this.cursorState.postBatchResumeToken = cursor.postBatchResumeToken;
- if (cursor[batchName].length === 0) {
- this.resumeToken = cursor.postBatchResumeToken;
- }
- }
- }
- _initializeCursor(callback) {
- super._initializeCursor((err, result) => {
- if (err || result == null) {
- callback(err, result);
- return;
- }
- const response = result.documents[0];
- if (
- this.startAtOperationTime == null &&
- this.resumeAfter == null &&
- this.startAfter == null &&
- maxWireVersion(this.server) >= 7
- ) {
- this.startAtOperationTime = response.operationTime;
- }
- this._processBatch('firstBatch', response);
- this.emit('init', result);
- this.emit('response');
- callback(err, result);
- });
- }
- _getMore(callback) {
- super._getMore((err, response) => {
- if (err) {
- callback(err);
- return;
- }
- this._processBatch('nextBatch', response);
- this.emit('more', response);
- this.emit('response');
- callback(err, response);
- });
- }
- }
- /**
- * @event ChangeStreamCursor#response
- * internal event DO NOT USE
- * @ignore
- */
- // Create a new change stream cursor based on self's configuration
- function createChangeStreamCursor(self, options) {
- const changeStreamStageOptions = { fullDocument: options.fullDocument || 'default' };
- applyKnownOptions(changeStreamStageOptions, options, CHANGE_STREAM_OPTIONS);
- if (self.type === CHANGE_DOMAIN_TYPES.CLUSTER) {
- changeStreamStageOptions.allChangesForCluster = true;
- }
- const pipeline = [{ $changeStream: changeStreamStageOptions }].concat(self.pipeline);
- const cursorOptions = applyKnownOptions({}, options, CURSOR_OPTIONS);
- const changeStreamCursor = new ChangeStreamCursor(
- self.topology,
- new AggregateOperation(self.parent, pipeline, options),
- cursorOptions
- );
- relayEvents(changeStreamCursor, self, ['resumeTokenChanged', 'end', 'close']);
- /**
- * Fired for each new matching change in the specified namespace. Attaching a `change`
- * event listener to a Change Stream will switch the stream into flowing mode. Data will
- * then be passed as soon as it is available.
- *
- * @event ChangeStream#change
- * @type {object}
- */
- if (self.listenerCount('change') > 0) {
- changeStreamCursor.on('data', function(change) {
- processNewChange(self, change);
- });
- }
- /**
- * Change stream close event
- *
- * @event ChangeStream#close
- * @type {null}
- */
- /**
- * Change stream end event
- *
- * @event ChangeStream#end
- * @type {null}
- */
- /**
- * Emitted each time the change stream stores a new resume token.
- *
- * @event ChangeStream#resumeTokenChanged
- * @type {ResumeToken}
- */
- /**
- * Fired when the stream encounters an error.
- *
- * @event ChangeStream#error
- * @type {Error}
- */
- changeStreamCursor.on('error', function(error) {
- processError(self, error);
- });
- if (self.pipeDestinations) {
- const cursorStream = changeStreamCursor.stream(self.streamOptions);
- for (let pipeDestination of self.pipeDestinations) {
- cursorStream.pipe(pipeDestination);
- }
- }
- return changeStreamCursor;
- }
- function applyKnownOptions(target, source, optionNames) {
- optionNames.forEach(name => {
- if (source[name]) {
- target[name] = source[name];
- }
- });
- return target;
- }
- // This method performs a basic server selection loop, satisfying the requirements of
- // ChangeStream resumability until the new SDAM layer can be used.
- const SELECTION_TIMEOUT = 30000;
- function waitForTopologyConnected(topology, options, callback) {
- setTimeout(() => {
- if (options && options.start == null) {
- options.start = now();
- }
- const start = options.start || now();
- const timeout = options.timeout || SELECTION_TIMEOUT;
- const readPreference = options.readPreference;
- if (topology.isConnected({ readPreference })) {
- return callback();
- }
- if (calculateDurationInMs(start) > timeout) {
- return callback(new MongoError('Timed out waiting for connection'));
- }
- waitForTopologyConnected(topology, options, callback);
- }, 500); // this is an arbitrary wait time to allow SDAM to transition
- }
- function processNewChange(changeStream, change, callback) {
- const cursor = changeStream.cursor;
- // a null change means the cursor has been notified, implicitly closing the change stream
- if (change == null) {
- changeStream.closed = true;
- }
- if (changeStream.closed) {
- if (callback) callback(new MongoError('ChangeStream is closed'));
- return;
- }
- if (change && !change._id) {
- const noResumeTokenError = new Error(
- 'A change stream document has been received that lacks a resume token (_id).'
- );
- if (!callback) return changeStream.emit('error', noResumeTokenError);
- return callback(noResumeTokenError);
- }
- // cache the resume token
- cursor.cacheResumeToken(change._id);
- // wipe the startAtOperationTime if there was one so that there won't be a conflict
- // between resumeToken and startAtOperationTime if we need to reconnect the cursor
- changeStream.options.startAtOperationTime = undefined;
- // Return the change
- if (!callback) return changeStream.emit('change', change);
- return callback(undefined, change);
- }
- function processError(changeStream, error, callback) {
- const topology = changeStream.topology;
- const cursor = changeStream.cursor;
- // If the change stream has been closed explictly, do not process error.
- if (changeStream.closed) {
- if (callback) callback(new MongoError('ChangeStream is closed'));
- return;
- }
- // if the resume succeeds, continue with the new cursor
- function resumeWithCursor(newCursor) {
- changeStream.cursor = newCursor;
- processResumeQueue(changeStream);
- }
- // otherwise, raise an error and close the change stream
- function unresumableError(err) {
- if (!callback) {
- changeStream.emit('error', err);
- changeStream.emit('close');
- }
- processResumeQueue(changeStream, err);
- changeStream.closed = true;
- }
- if (cursor && isResumableError(error, maxWireVersion(cursor.server))) {
- changeStream.cursor = undefined;
- // stop listening to all events from old cursor
- ['data', 'close', 'end', 'error'].forEach(event => cursor.removeAllListeners(event));
- // close internal cursor, ignore errors
- cursor.close();
- waitForTopologyConnected(topology, { readPreference: cursor.options.readPreference }, err => {
- // if the topology can't reconnect, close the stream
- if (err) return unresumableError(err);
- // create a new cursor, preserving the old cursor's options
- const newCursor = createChangeStreamCursor(changeStream, cursor.resumeOptions);
- // attempt to continue in emitter mode
- if (!callback) return resumeWithCursor(newCursor);
- // attempt to continue in iterator mode
- newCursor.hasNext(err => {
- // if there's an error immediately after resuming, close the stream
- if (err) return unresumableError(err);
- resumeWithCursor(newCursor);
- });
- });
- return;
- }
- if (!callback) return changeStream.emit('error', error);
- return callback(error);
- }
- /**
- * Safely provides a cursor across resume attempts
- *
- * @param {ChangeStream} changeStream the parent ChangeStream
- * @param {function} callback gets the cursor or error
- * @param {ChangeStreamCursor} [oldCursor] when resuming from an error, carry over options from previous cursor
- */
- function getCursor(changeStream, callback) {
- if (changeStream.isClosed()) {
- callback(new MongoError('ChangeStream is closed.'));
- return;
- }
- // if a cursor exists and it is open, return it
- if (changeStream.cursor) {
- callback(undefined, changeStream.cursor);
- return;
- }
- // no cursor, queue callback until topology reconnects
- changeStream[kResumeQueue].push(callback);
- }
- /**
- * Drain the resume queue when a new has become available
- *
- * @param {ChangeStream} changeStream the parent ChangeStream
- * @param {ChangeStreamCursor?} changeStream.cursor the new cursor
- * @param {Error} [err] error getting a new cursor
- */
- function processResumeQueue(changeStream, err) {
- while (changeStream[kResumeQueue].length) {
- const request = changeStream[kResumeQueue].pop();
- if (changeStream.isClosed() && !err) {
- request(new MongoError('Change Stream is not open.'));
- return;
- }
- request(err, changeStream.cursor);
- }
- }
- /**
- * The callback format for results
- * @callback ChangeStream~resultCallback
- * @param {MongoError} error An error instance representing the error during the execution.
- * @param {(object|null)} result The result object if the command was executed successfully.
- */
- module.exports = ChangeStream;
|