change_stream.js 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575
  1. 'use strict';
  2. const EventEmitter = require('events');
  3. const isResumableError = require('./error').isResumableError;
  4. const MongoError = require('./core').MongoError;
  5. const Cursor = require('./cursor');
  6. const relayEvents = require('./core/utils').relayEvents;
  7. const maxWireVersion = require('./core/utils').maxWireVersion;
  8. const maybePromise = require('./utils').maybePromise;
  9. const AggregateOperation = require('./operations/aggregate');
  10. const CHANGE_STREAM_OPTIONS = ['resumeAfter', 'startAfter', 'startAtOperationTime', 'fullDocument'];
  11. const CURSOR_OPTIONS = ['batchSize', 'maxAwaitTimeMS', 'collation', 'readPreference'].concat(
  12. CHANGE_STREAM_OPTIONS
  13. );
  14. const CHANGE_DOMAIN_TYPES = {
  15. COLLECTION: Symbol('Collection'),
  16. DATABASE: Symbol('Database'),
  17. CLUSTER: Symbol('Cluster')
  18. };
  19. /**
  20. * @typedef ResumeToken
  21. * @description Represents the logical starting point for a new or resuming {@link ChangeStream} on the server.
  22. * @see https://docs.mongodb.com/master/changeStreams/#change-stream-resume-token
  23. */
  24. /**
  25. * @typedef OperationTime
  26. * @description Represents a specific point in time on a server. Can be retrieved by using {@link Db#command}
  27. * @see https://docs.mongodb.com/manual/reference/method/db.runCommand/#response
  28. */
  29. /**
  30. * @typedef ChangeStreamOptions
  31. * @description Options that can be passed to a ChangeStream. Note that startAfter, resumeAfter, and startAtOperationTime are all mutually exclusive, and the server will error if more than one is specified.
  32. * @property {string} [fullDocument='default'] Allowed values: ‘default’, ‘updateLookup’. When set to ‘updateLookup’, the change stream will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred.
  33. * @property {number} [maxAwaitTimeMS] The maximum amount of time for the server to wait on new documents to satisfy a change stream query.
  34. * @property {ResumeToken} [resumeAfter] Allows you to start a changeStream after a specified event. See {@link https://docs.mongodb.com/master/changeStreams/#resumeafter-for-change-streams|ChangeStream documentation}.
  35. * @property {ResumeToken} [startAfter] Similar to resumeAfter, but will allow you to start after an invalidated event. See {@link https://docs.mongodb.com/master/changeStreams/#startafter-for-change-streams|ChangeStream documentation}.
  36. * @property {OperationTime} [startAtOperationTime] Will start the changeStream after the specified operationTime.
  37. * @property {number} [batchSize=1000] The number of documents to return per batch. See {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}.
  38. * @property {object} [collation] Specify collation settings for operation. See {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}.
  39. * @property {ReadPreference} [readPreference] The read preference. Defaults to the read preference of the database or collection. See {@link https://docs.mongodb.com/manual/reference/read-preference|read preference documentation}.
  40. */
  41. /**
  42. * Creates a new Change Stream instance. Normally created using {@link Collection#watch|Collection.watch()}.
  43. * @class ChangeStream
  44. * @since 3.0.0
  45. * @param {(MongoClient|Db|Collection)} parent The parent object that created this change stream
  46. * @param {Array} pipeline An array of {@link https://docs.mongodb.com/manual/reference/operator/aggregation-pipeline/|aggregation pipeline stages} through which to pass change stream documents
  47. * @param {ChangeStreamOptions} [options] Optional settings
  48. * @fires ChangeStream#close
  49. * @fires ChangeStream#change
  50. * @fires ChangeStream#end
  51. * @fires ChangeStream#error
  52. * @fires ChangeStream#resumeTokenChanged
  53. * @return {ChangeStream} a ChangeStream instance.
  54. */
  55. class ChangeStream extends EventEmitter {
  56. constructor(parent, pipeline, options) {
  57. super();
  58. const Collection = require('./collection');
  59. const Db = require('./db');
  60. const MongoClient = require('./mongo_client');
  61. this.pipeline = pipeline || [];
  62. this.options = options || {};
  63. this.parent = parent;
  64. this.namespace = parent.s.namespace;
  65. if (parent instanceof Collection) {
  66. this.type = CHANGE_DOMAIN_TYPES.COLLECTION;
  67. this.topology = parent.s.db.serverConfig;
  68. } else if (parent instanceof Db) {
  69. this.type = CHANGE_DOMAIN_TYPES.DATABASE;
  70. this.topology = parent.serverConfig;
  71. } else if (parent instanceof MongoClient) {
  72. this.type = CHANGE_DOMAIN_TYPES.CLUSTER;
  73. this.topology = parent.topology;
  74. } else {
  75. throw new TypeError(
  76. 'parent provided to ChangeStream constructor is not an instance of Collection, Db, or MongoClient'
  77. );
  78. }
  79. this.promiseLibrary = parent.s.promiseLibrary;
  80. if (!this.options.readPreference && parent.s.readPreference) {
  81. this.options.readPreference = parent.s.readPreference;
  82. }
  83. // Create contained Change Stream cursor
  84. this.cursor = createChangeStreamCursor(this, options);
  85. // Listen for any `change` listeners being added to ChangeStream
  86. this.on('newListener', eventName => {
  87. if (eventName === 'change' && this.cursor && this.listenerCount('change') === 0) {
  88. this.cursor.on('data', change =>
  89. processNewChange({ changeStream: this, change, eventEmitter: true })
  90. );
  91. }
  92. });
  93. // Listen for all `change` listeners being removed from ChangeStream
  94. this.on('removeListener', eventName => {
  95. if (eventName === 'change' && this.listenerCount('change') === 0 && this.cursor) {
  96. this.cursor.removeAllListeners('data');
  97. }
  98. });
  99. }
  100. /**
  101. * @property {ResumeToken} resumeToken
  102. * The cached resume token that will be used to resume
  103. * after the most recently returned change.
  104. */
  105. get resumeToken() {
  106. return this.cursor.resumeToken;
  107. }
  108. /**
  109. * Check if there is any document still available in the Change Stream
  110. * @function ChangeStream.prototype.hasNext
  111. * @param {ChangeStream~resultCallback} [callback] The result callback.
  112. * @throws {MongoError}
  113. * @returns {Promise|void} returns Promise if no callback passed
  114. */
  115. hasNext(callback) {
  116. return maybePromise(this.parent, callback, cb => this.cursor.hasNext(cb));
  117. }
  118. /**
  119. * Get the next available document from the Change Stream, returns null if no more documents are available.
  120. * @function ChangeStream.prototype.next
  121. * @param {ChangeStream~resultCallback} [callback] The result callback.
  122. * @throws {MongoError}
  123. * @returns {Promise|void} returns Promise if no callback passed
  124. */
  125. next(callback) {
  126. return maybePromise(this.parent, callback, cb => {
  127. if (this.isClosed()) {
  128. return cb(new Error('Change Stream is not open.'));
  129. }
  130. this.cursor.next((error, change) => {
  131. processNewChange({ changeStream: this, error, change, callback: cb });
  132. });
  133. });
  134. }
  135. /**
  136. * Is the cursor closed
  137. * @method ChangeStream.prototype.isClosed
  138. * @return {boolean}
  139. */
  140. isClosed() {
  141. if (this.cursor) {
  142. return this.cursor.isClosed();
  143. }
  144. return true;
  145. }
  146. /**
  147. * Close the Change Stream
  148. * @method ChangeStream.prototype.close
  149. * @param {ChangeStream~resultCallback} [callback] The result callback.
  150. * @return {Promise} returns Promise if no callback passed
  151. */
  152. close(callback) {
  153. if (!this.cursor) {
  154. if (callback) return callback();
  155. return this.promiseLibrary.resolve();
  156. }
  157. // Tidy up the existing cursor
  158. const cursor = this.cursor;
  159. if (callback) {
  160. return cursor.close(err => {
  161. ['data', 'close', 'end', 'error'].forEach(event => cursor.removeAllListeners(event));
  162. delete this.cursor;
  163. return callback(err);
  164. });
  165. }
  166. const PromiseCtor = this.promiseLibrary || Promise;
  167. return new PromiseCtor((resolve, reject) => {
  168. cursor.close(err => {
  169. ['data', 'close', 'end', 'error'].forEach(event => cursor.removeAllListeners(event));
  170. delete this.cursor;
  171. if (err) return reject(err);
  172. resolve();
  173. });
  174. });
  175. }
  176. /**
  177. * This method pulls all the data out of a readable stream, and writes it to the supplied destination, automatically managing the flow so that the destination is not overwhelmed by a fast readable stream.
  178. * @method
  179. * @param {Writable} destination The destination for writing data
  180. * @param {object} [options] {@link https://nodejs.org/api/stream.html#stream_readable_pipe_destination_options|Pipe options}
  181. * @return {null}
  182. */
  183. pipe(destination, options) {
  184. if (!this.pipeDestinations) {
  185. this.pipeDestinations = [];
  186. }
  187. this.pipeDestinations.push(destination);
  188. return this.cursor.pipe(destination, options);
  189. }
  190. /**
  191. * This method will remove the hooks set up for a previous pipe() call.
  192. * @param {Writable} [destination] The destination for writing data
  193. * @return {null}
  194. */
  195. unpipe(destination) {
  196. if (this.pipeDestinations && this.pipeDestinations.indexOf(destination) > -1) {
  197. this.pipeDestinations.splice(this.pipeDestinations.indexOf(destination), 1);
  198. }
  199. return this.cursor.unpipe(destination);
  200. }
  201. /**
  202. * Return a modified Readable stream including a possible transform method.
  203. * @method
  204. * @param {object} [options] Optional settings.
  205. * @param {function} [options.transform] A transformation method applied to each document emitted by the stream.
  206. * @return {Cursor}
  207. */
  208. stream(options) {
  209. this.streamOptions = options;
  210. return this.cursor.stream(options);
  211. }
  212. /**
  213. * This method will cause a stream in flowing mode to stop emitting data events. Any data that becomes available will remain in the internal buffer.
  214. * @return {null}
  215. */
  216. pause() {
  217. return this.cursor.pause();
  218. }
  219. /**
  220. * This method will cause the readable stream to resume emitting data events.
  221. * @return {null}
  222. */
  223. resume() {
  224. return this.cursor.resume();
  225. }
  226. }
  227. class ChangeStreamCursor extends Cursor {
  228. constructor(topology, operation, options) {
  229. super(topology, operation, options);
  230. options = options || {};
  231. this._resumeToken = null;
  232. this.startAtOperationTime = options.startAtOperationTime;
  233. if (options.startAfter) {
  234. this.resumeToken = options.startAfter;
  235. } else if (options.resumeAfter) {
  236. this.resumeToken = options.resumeAfter;
  237. }
  238. }
  239. set resumeToken(token) {
  240. this._resumeToken = token;
  241. this.emit('resumeTokenChanged', token);
  242. }
  243. get resumeToken() {
  244. return this._resumeToken;
  245. }
  246. get resumeOptions() {
  247. const result = {};
  248. for (const optionName of CURSOR_OPTIONS) {
  249. if (this.options[optionName]) result[optionName] = this.options[optionName];
  250. }
  251. if (this.resumeToken || this.startAtOperationTime) {
  252. ['resumeAfter', 'startAfter', 'startAtOperationTime'].forEach(key => delete result[key]);
  253. if (this.resumeToken) {
  254. result.resumeAfter = this.resumeToken;
  255. } else if (this.startAtOperationTime && maxWireVersion(this.server) >= 7) {
  256. result.startAtOperationTime = this.startAtOperationTime;
  257. }
  258. }
  259. return result;
  260. }
  261. _initializeCursor(callback) {
  262. super._initializeCursor((err, result) => {
  263. if (err) {
  264. callback(err, null);
  265. return;
  266. }
  267. const response = result.documents[0];
  268. if (
  269. this.startAtOperationTime == null &&
  270. this.resumeAfter == null &&
  271. this.startAfter == null &&
  272. maxWireVersion(this.server) >= 7
  273. ) {
  274. this.startAtOperationTime = response.operationTime;
  275. }
  276. const cursor = response.cursor;
  277. if (cursor.postBatchResumeToken) {
  278. this.cursorState.postBatchResumeToken = cursor.postBatchResumeToken;
  279. if (cursor.firstBatch.length === 0) {
  280. this.resumeToken = cursor.postBatchResumeToken;
  281. }
  282. }
  283. this.emit('response');
  284. callback(err, result);
  285. });
  286. }
  287. _getMore(callback) {
  288. super._getMore((err, response) => {
  289. if (err) {
  290. callback(err, null);
  291. return;
  292. }
  293. const cursor = response.cursor;
  294. if (cursor.postBatchResumeToken) {
  295. this.cursorState.postBatchResumeToken = cursor.postBatchResumeToken;
  296. if (cursor.nextBatch.length === 0) {
  297. this.resumeToken = cursor.postBatchResumeToken;
  298. }
  299. }
  300. this.emit('response');
  301. callback(err, response);
  302. });
  303. }
  304. }
  305. /**
  306. * @event ChangeStreamCursor#response
  307. * internal event DO NOT USE
  308. * @ignore
  309. */
  310. // Create a new change stream cursor based on self's configuration
  311. function createChangeStreamCursor(self, options) {
  312. const changeStreamStageOptions = { fullDocument: options.fullDocument || 'default' };
  313. applyKnownOptions(changeStreamStageOptions, options, CHANGE_STREAM_OPTIONS);
  314. if (self.type === CHANGE_DOMAIN_TYPES.CLUSTER) {
  315. changeStreamStageOptions.allChangesForCluster = true;
  316. }
  317. const pipeline = [{ $changeStream: changeStreamStageOptions }].concat(self.pipeline);
  318. const cursorOptions = applyKnownOptions({}, options, CURSOR_OPTIONS);
  319. const changeStreamCursor = new ChangeStreamCursor(
  320. self.topology,
  321. new AggregateOperation(self.parent, pipeline, options),
  322. cursorOptions
  323. );
  324. relayEvents(changeStreamCursor, self, ['resumeTokenChanged', 'end', 'close']);
  325. /**
  326. * Fired for each new matching change in the specified namespace. Attaching a `change`
  327. * event listener to a Change Stream will switch the stream into flowing mode. Data will
  328. * then be passed as soon as it is available.
  329. *
  330. * @event ChangeStream#change
  331. * @type {object}
  332. */
  333. if (self.listenerCount('change') > 0) {
  334. changeStreamCursor.on('data', function(change) {
  335. processNewChange({ changeStream: self, change, eventEmitter: true });
  336. });
  337. }
  338. /**
  339. * Change stream close event
  340. *
  341. * @event ChangeStream#close
  342. * @type {null}
  343. */
  344. /**
  345. * Change stream end event
  346. *
  347. * @event ChangeStream#end
  348. * @type {null}
  349. */
  350. /**
  351. * Emitted each time the change stream stores a new resume token.
  352. *
  353. * @event ChangeStream#resumeTokenChanged
  354. * @type {ResumeToken}
  355. */
  356. /**
  357. * Fired when the stream encounters an error.
  358. *
  359. * @event ChangeStream#error
  360. * @type {Error}
  361. */
  362. changeStreamCursor.on('error', function(error) {
  363. processNewChange({ changeStream: self, error, eventEmitter: true });
  364. });
  365. if (self.pipeDestinations) {
  366. const cursorStream = changeStreamCursor.stream(self.streamOptions);
  367. for (let pipeDestination in self.pipeDestinations) {
  368. cursorStream.pipe(pipeDestination);
  369. }
  370. }
  371. return changeStreamCursor;
  372. }
  373. function applyKnownOptions(target, source, optionNames) {
  374. optionNames.forEach(name => {
  375. if (source[name]) {
  376. target[name] = source[name];
  377. }
  378. });
  379. return target;
  380. }
  381. // This method performs a basic server selection loop, satisfying the requirements of
  382. // ChangeStream resumability until the new SDAM layer can be used.
  383. const SELECTION_TIMEOUT = 30000;
  384. function waitForTopologyConnected(topology, options, callback) {
  385. setTimeout(() => {
  386. if (options && options.start == null) options.start = process.hrtime();
  387. const start = options.start || process.hrtime();
  388. const timeout = options.timeout || SELECTION_TIMEOUT;
  389. const readPreference = options.readPreference;
  390. if (topology.isConnected({ readPreference })) return callback(null, null);
  391. const hrElapsed = process.hrtime(start);
  392. const elapsed = (hrElapsed[0] * 1e9 + hrElapsed[1]) / 1e6;
  393. if (elapsed > timeout) return callback(new MongoError('Timed out waiting for connection'));
  394. waitForTopologyConnected(topology, options, callback);
  395. }, 3000); // this is an arbitrary wait time to allow SDAM to transition
  396. }
  397. // Handle new change events. This method brings together the routes from the callback, event emitter, and promise ways of using ChangeStream.
  398. function processNewChange(args) {
  399. const changeStream = args.changeStream;
  400. const error = args.error;
  401. const change = args.change;
  402. const callback = args.callback;
  403. const eventEmitter = args.eventEmitter || false;
  404. // If the changeStream is closed, then it should not process a change.
  405. if (changeStream.isClosed()) {
  406. // We do not error in the eventEmitter case.
  407. if (eventEmitter) {
  408. return;
  409. }
  410. const error = new MongoError('ChangeStream is closed');
  411. return typeof callback === 'function'
  412. ? callback(error, null)
  413. : changeStream.promiseLibrary.reject(error);
  414. }
  415. const cursor = changeStream.cursor;
  416. const topology = changeStream.topology;
  417. const options = changeStream.cursor.options;
  418. if (error) {
  419. if (isResumableError(error) && !changeStream.attemptingResume) {
  420. changeStream.attemptingResume = true;
  421. // stop listening to all events from old cursor
  422. ['data', 'close', 'end', 'error'].forEach(event =>
  423. changeStream.cursor.removeAllListeners(event)
  424. );
  425. // close internal cursor, ignore errors
  426. changeStream.cursor.close();
  427. // attempt recreating the cursor
  428. if (eventEmitter) {
  429. waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => {
  430. if (err) {
  431. changeStream.emit('error', err);
  432. changeStream.emit('close');
  433. return;
  434. }
  435. changeStream.cursor = createChangeStreamCursor(changeStream, cursor.resumeOptions);
  436. });
  437. return;
  438. }
  439. if (callback) {
  440. waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => {
  441. if (err) return callback(err, null);
  442. changeStream.cursor = createChangeStreamCursor(changeStream, cursor.resumeOptions);
  443. changeStream.next(callback);
  444. });
  445. return;
  446. }
  447. return new Promise((resolve, reject) => {
  448. waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => {
  449. if (err) return reject(err);
  450. resolve();
  451. });
  452. })
  453. .then(
  454. () => (changeStream.cursor = createChangeStreamCursor(changeStream, cursor.resumeOptions))
  455. )
  456. .then(() => changeStream.next());
  457. }
  458. if (eventEmitter) return changeStream.emit('error', error);
  459. if (typeof callback === 'function') return callback(error, null);
  460. return changeStream.promiseLibrary.reject(error);
  461. }
  462. changeStream.attemptingResume = false;
  463. if (change && !change._id) {
  464. const noResumeTokenError = new Error(
  465. 'A change stream document has been received that lacks a resume token (_id).'
  466. );
  467. if (eventEmitter) return changeStream.emit('error', noResumeTokenError);
  468. if (typeof callback === 'function') return callback(noResumeTokenError, null);
  469. return changeStream.promiseLibrary.reject(noResumeTokenError);
  470. }
  471. // cache the resume token
  472. if (cursor.bufferedCount() === 0 && cursor.cursorState.postBatchResumeToken) {
  473. cursor.resumeToken = cursor.cursorState.postBatchResumeToken;
  474. } else {
  475. cursor.resumeToken = change._id;
  476. }
  477. // wipe the startAtOperationTime if there was one so that there won't be a conflict
  478. // between resumeToken and startAtOperationTime if we need to reconnect the cursor
  479. changeStream.options.startAtOperationTime = undefined;
  480. // Return the change
  481. if (eventEmitter) return changeStream.emit('change', change);
  482. if (typeof callback === 'function') return callback(error, change);
  483. return changeStream.promiseLibrary.resolve(change);
  484. }
  485. /**
  486. * The callback format for results
  487. * @callback ChangeStream~resultCallback
  488. * @param {MongoError} error An error instance representing the error during the execution.
  489. * @param {(object|null)} result The result object if the command was executed successfully.
  490. */
  491. module.exports = ChangeStream;