change_stream.js 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623
  1. 'use strict';
  2. const Denque = require('denque');
  3. const EventEmitter = require('events');
  4. const isResumableError = require('./error').isResumableError;
  5. const MongoError = require('./core').MongoError;
  6. const Cursor = require('./cursor');
  7. const relayEvents = require('./core/utils').relayEvents;
  8. const maxWireVersion = require('./core/utils').maxWireVersion;
  9. const maybePromise = require('./utils').maybePromise;
  10. const now = require('./utils').now;
  11. const calculateDurationInMs = require('./utils').calculateDurationInMs;
  12. const AggregateOperation = require('./operations/aggregate');
  13. const kResumeQueue = Symbol('resumeQueue');
  14. const CHANGE_STREAM_OPTIONS = ['resumeAfter', 'startAfter', 'startAtOperationTime', 'fullDocument'];
  15. const CURSOR_OPTIONS = ['batchSize', 'maxAwaitTimeMS', 'collation', 'readPreference'].concat(
  16. CHANGE_STREAM_OPTIONS
  17. );
  18. const CHANGE_DOMAIN_TYPES = {
  19. COLLECTION: Symbol('Collection'),
  20. DATABASE: Symbol('Database'),
  21. CLUSTER: Symbol('Cluster')
  22. };
  23. /**
  24. * @typedef ResumeToken
  25. * @description Represents the logical starting point for a new or resuming {@link ChangeStream} on the server.
  26. * @see https://docs.mongodb.com/master/changeStreams/#change-stream-resume-token
  27. */
  28. /**
  29. * @typedef OperationTime
  30. * @description Represents a specific point in time on a server. Can be retrieved by using {@link Db#command}
  31. * @see https://docs.mongodb.com/manual/reference/method/db.runCommand/#response
  32. */
  33. /**
  34. * @typedef ChangeStreamOptions
  35. * @description Options that can be passed to a ChangeStream. Note that startAfter, resumeAfter, and startAtOperationTime are all mutually exclusive, and the server will error if more than one is specified.
  36. * @property {string} [fullDocument='default'] Allowed values: ‘default’, ‘updateLookup’. When set to ‘updateLookup’, the change stream will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred.
  37. * @property {number} [maxAwaitTimeMS] The maximum amount of time for the server to wait on new documents to satisfy a change stream query.
  38. * @property {ResumeToken} [resumeAfter] Allows you to start a changeStream after a specified event. See {@link https://docs.mongodb.com/master/changeStreams/#resumeafter-for-change-streams|ChangeStream documentation}.
  39. * @property {ResumeToken} [startAfter] Similar to resumeAfter, but will allow you to start after an invalidated event. See {@link https://docs.mongodb.com/master/changeStreams/#startafter-for-change-streams|ChangeStream documentation}.
  40. * @property {OperationTime} [startAtOperationTime] Will start the changeStream after the specified operationTime.
  41. * @property {number} [batchSize=1000] The number of documents to return per batch. See {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}.
  42. * @property {object} [collation] Specify collation settings for operation. See {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}.
  43. * @property {ReadPreference} [readPreference] The read preference. Defaults to the read preference of the database or collection. See {@link https://docs.mongodb.com/manual/reference/read-preference|read preference documentation}.
  44. */
  45. /**
  46. * Creates a new Change Stream instance. Normally created using {@link Collection#watch|Collection.watch()}.
  47. * @class ChangeStream
  48. * @since 3.0.0
  49. * @param {(MongoClient|Db|Collection)} parent The parent object that created this change stream
  50. * @param {Array} pipeline An array of {@link https://docs.mongodb.com/manual/reference/operator/aggregation-pipeline/|aggregation pipeline stages} through which to pass change stream documents
  51. * @param {ChangeStreamOptions} [options] Optional settings
  52. * @fires ChangeStream#close
  53. * @fires ChangeStream#change
  54. * @fires ChangeStream#end
  55. * @fires ChangeStream#error
  56. * @fires ChangeStream#resumeTokenChanged
  57. * @return {ChangeStream} a ChangeStream instance.
  58. */
  59. class ChangeStream extends EventEmitter {
  60. constructor(parent, pipeline, options) {
  61. super();
  62. const Collection = require('./collection');
  63. const Db = require('./db');
  64. const MongoClient = require('./mongo_client');
  65. this.pipeline = pipeline || [];
  66. this.options = options || {};
  67. this.parent = parent;
  68. this.namespace = parent.s.namespace;
  69. if (parent instanceof Collection) {
  70. this.type = CHANGE_DOMAIN_TYPES.COLLECTION;
  71. this.topology = parent.s.db.serverConfig;
  72. } else if (parent instanceof Db) {
  73. this.type = CHANGE_DOMAIN_TYPES.DATABASE;
  74. this.topology = parent.serverConfig;
  75. } else if (parent instanceof MongoClient) {
  76. this.type = CHANGE_DOMAIN_TYPES.CLUSTER;
  77. this.topology = parent.topology;
  78. } else {
  79. throw new TypeError(
  80. 'parent provided to ChangeStream constructor is not an instance of Collection, Db, or MongoClient'
  81. );
  82. }
  83. this.promiseLibrary = parent.s.promiseLibrary;
  84. if (!this.options.readPreference && parent.s.readPreference) {
  85. this.options.readPreference = parent.s.readPreference;
  86. }
  87. this[kResumeQueue] = new Denque();
  88. // Create contained Change Stream cursor
  89. this.cursor = createChangeStreamCursor(this, options);
  90. this.closed = false;
  91. // Listen for any `change` listeners being added to ChangeStream
  92. this.on('newListener', eventName => {
  93. if (eventName === 'change' && this.cursor && this.listenerCount('change') === 0) {
  94. this.cursor.on('data', change => processNewChange(this, change));
  95. }
  96. });
  97. // Listen for all `change` listeners being removed from ChangeStream
  98. this.on('removeListener', eventName => {
  99. if (eventName === 'change' && this.listenerCount('change') === 0 && this.cursor) {
  100. this.cursor.removeAllListeners('data');
  101. }
  102. });
  103. }
  104. /**
  105. * @property {ResumeToken} resumeToken
  106. * The cached resume token that will be used to resume
  107. * after the most recently returned change.
  108. */
  109. get resumeToken() {
  110. return this.cursor.resumeToken;
  111. }
  112. /**
  113. * Check if there is any document still available in the Change Stream
  114. * @function ChangeStream.prototype.hasNext
  115. * @param {ChangeStream~resultCallback} [callback] The result callback.
  116. * @throws {MongoError}
  117. * @returns {Promise|void} returns Promise if no callback passed
  118. */
  119. hasNext(callback) {
  120. return maybePromise(this.parent, callback, cb => {
  121. getCursor(this, (err, cursor) => {
  122. if (err) return cb(err); // failed to resume, raise an error
  123. cursor.hasNext(cb);
  124. });
  125. });
  126. }
  127. /**
  128. * Get the next available document from the Change Stream, returns null if no more documents are available.
  129. * @function ChangeStream.prototype.next
  130. * @param {ChangeStream~resultCallback} [callback] The result callback.
  131. * @throws {MongoError}
  132. * @returns {Promise|void} returns Promise if no callback passed
  133. */
  134. next(callback) {
  135. return maybePromise(this.parent, callback, cb => {
  136. getCursor(this, (err, cursor) => {
  137. if (err) return cb(err); // failed to resume, raise an error
  138. cursor.next((error, change) => {
  139. if (error) {
  140. this[kResumeQueue].push(() => this.next(cb));
  141. processError(this, error, cb);
  142. return;
  143. }
  144. processNewChange(this, change, cb);
  145. });
  146. });
  147. });
  148. }
  149. /**
  150. * Is the change stream closed
  151. * @method ChangeStream.prototype.isClosed
  152. * @return {boolean}
  153. */
  154. isClosed() {
  155. return this.closed || (this.cursor && this.cursor.isClosed());
  156. }
  157. /**
  158. * Close the Change Stream
  159. * @method ChangeStream.prototype.close
  160. * @param {ChangeStream~resultCallback} [callback] The result callback.
  161. * @return {Promise} returns Promise if no callback passed
  162. */
  163. close(callback) {
  164. return maybePromise(this.parent, callback, cb => {
  165. if (this.closed) return cb();
  166. // flag the change stream as explicitly closed
  167. this.closed = true;
  168. if (!this.cursor) return cb();
  169. // Tidy up the existing cursor
  170. const cursor = this.cursor;
  171. return cursor.close(err => {
  172. ['data', 'close', 'end', 'error'].forEach(event => cursor.removeAllListeners(event));
  173. this.cursor = undefined;
  174. return cb(err);
  175. });
  176. });
  177. }
  178. /**
  179. * This method pulls all the data out of a readable stream, and writes it to the supplied destination, automatically managing the flow so that the destination is not overwhelmed by a fast readable stream.
  180. * @method
  181. * @param {Writable} destination The destination for writing data
  182. * @param {object} [options] {@link https://nodejs.org/api/stream.html#stream_readable_pipe_destination_options|Pipe options}
  183. * @return {null}
  184. */
  185. pipe(destination, options) {
  186. if (!this.pipeDestinations) {
  187. this.pipeDestinations = [];
  188. }
  189. this.pipeDestinations.push(destination);
  190. return this.cursor.pipe(destination, options);
  191. }
  192. /**
  193. * This method will remove the hooks set up for a previous pipe() call.
  194. * @param {Writable} [destination] The destination for writing data
  195. * @return {null}
  196. */
  197. unpipe(destination) {
  198. if (this.pipeDestinations && this.pipeDestinations.indexOf(destination) > -1) {
  199. this.pipeDestinations.splice(this.pipeDestinations.indexOf(destination), 1);
  200. }
  201. return this.cursor.unpipe(destination);
  202. }
  203. /**
  204. * Return a modified Readable stream including a possible transform method.
  205. * @method
  206. * @param {object} [options] Optional settings.
  207. * @param {function} [options.transform] A transformation method applied to each document emitted by the stream.
  208. * @return {Cursor}
  209. */
  210. stream(options) {
  211. this.streamOptions = options;
  212. return this.cursor.stream(options);
  213. }
  214. /**
  215. * This method will cause a stream in flowing mode to stop emitting data events. Any data that becomes available will remain in the internal buffer.
  216. * @return {null}
  217. */
  218. pause() {
  219. return this.cursor.pause();
  220. }
  221. /**
  222. * This method will cause the readable stream to resume emitting data events.
  223. * @return {null}
  224. */
  225. resume() {
  226. return this.cursor.resume();
  227. }
  228. }
  229. class ChangeStreamCursor extends Cursor {
  230. constructor(topology, operation, options) {
  231. super(topology, operation, options);
  232. options = options || {};
  233. this._resumeToken = null;
  234. this.startAtOperationTime = options.startAtOperationTime;
  235. if (options.startAfter) {
  236. this.resumeToken = options.startAfter;
  237. } else if (options.resumeAfter) {
  238. this.resumeToken = options.resumeAfter;
  239. }
  240. }
  241. set resumeToken(token) {
  242. this._resumeToken = token;
  243. this.emit('resumeTokenChanged', token);
  244. }
  245. get resumeToken() {
  246. return this._resumeToken;
  247. }
  248. get resumeOptions() {
  249. const result = {};
  250. for (const optionName of CURSOR_OPTIONS) {
  251. if (this.options[optionName]) result[optionName] = this.options[optionName];
  252. }
  253. if (this.resumeToken || this.startAtOperationTime) {
  254. ['resumeAfter', 'startAfter', 'startAtOperationTime'].forEach(key => delete result[key]);
  255. if (this.resumeToken) {
  256. const resumeKey =
  257. this.options.startAfter && !this.hasReceived ? 'startAfter' : 'resumeAfter';
  258. result[resumeKey] = this.resumeToken;
  259. } else if (this.startAtOperationTime && maxWireVersion(this.server) >= 7) {
  260. result.startAtOperationTime = this.startAtOperationTime;
  261. }
  262. }
  263. return result;
  264. }
  265. cacheResumeToken(resumeToken) {
  266. if (this.bufferedCount() === 0 && this.cursorState.postBatchResumeToken) {
  267. this.resumeToken = this.cursorState.postBatchResumeToken;
  268. } else {
  269. this.resumeToken = resumeToken;
  270. }
  271. this.hasReceived = true;
  272. }
  273. _processBatch(batchName, response) {
  274. const cursor = response.cursor;
  275. if (cursor.postBatchResumeToken) {
  276. this.cursorState.postBatchResumeToken = cursor.postBatchResumeToken;
  277. if (cursor[batchName].length === 0) {
  278. this.resumeToken = cursor.postBatchResumeToken;
  279. }
  280. }
  281. }
  282. _initializeCursor(callback) {
  283. super._initializeCursor((err, result) => {
  284. if (err || result == null) {
  285. callback(err, result);
  286. return;
  287. }
  288. const response = result.documents[0];
  289. if (
  290. this.startAtOperationTime == null &&
  291. this.resumeAfter == null &&
  292. this.startAfter == null &&
  293. maxWireVersion(this.server) >= 7
  294. ) {
  295. this.startAtOperationTime = response.operationTime;
  296. }
  297. this._processBatch('firstBatch', response);
  298. this.emit('init', result);
  299. this.emit('response');
  300. callback(err, result);
  301. });
  302. }
  303. _getMore(callback) {
  304. super._getMore((err, response) => {
  305. if (err) {
  306. callback(err);
  307. return;
  308. }
  309. this._processBatch('nextBatch', response);
  310. this.emit('more', response);
  311. this.emit('response');
  312. callback(err, response);
  313. });
  314. }
  315. }
  316. /**
  317. * @event ChangeStreamCursor#response
  318. * internal event DO NOT USE
  319. * @ignore
  320. */
  321. // Create a new change stream cursor based on self's configuration
  322. function createChangeStreamCursor(self, options) {
  323. const changeStreamStageOptions = { fullDocument: options.fullDocument || 'default' };
  324. applyKnownOptions(changeStreamStageOptions, options, CHANGE_STREAM_OPTIONS);
  325. if (self.type === CHANGE_DOMAIN_TYPES.CLUSTER) {
  326. changeStreamStageOptions.allChangesForCluster = true;
  327. }
  328. const pipeline = [{ $changeStream: changeStreamStageOptions }].concat(self.pipeline);
  329. const cursorOptions = applyKnownOptions({}, options, CURSOR_OPTIONS);
  330. const changeStreamCursor = new ChangeStreamCursor(
  331. self.topology,
  332. new AggregateOperation(self.parent, pipeline, options),
  333. cursorOptions
  334. );
  335. relayEvents(changeStreamCursor, self, ['resumeTokenChanged', 'end', 'close']);
  336. /**
  337. * Fired for each new matching change in the specified namespace. Attaching a `change`
  338. * event listener to a Change Stream will switch the stream into flowing mode. Data will
  339. * then be passed as soon as it is available.
  340. *
  341. * @event ChangeStream#change
  342. * @type {object}
  343. */
  344. if (self.listenerCount('change') > 0) {
  345. changeStreamCursor.on('data', function(change) {
  346. processNewChange(self, change);
  347. });
  348. }
  349. /**
  350. * Change stream close event
  351. *
  352. * @event ChangeStream#close
  353. * @type {null}
  354. */
  355. /**
  356. * Change stream end event
  357. *
  358. * @event ChangeStream#end
  359. * @type {null}
  360. */
  361. /**
  362. * Emitted each time the change stream stores a new resume token.
  363. *
  364. * @event ChangeStream#resumeTokenChanged
  365. * @type {ResumeToken}
  366. */
  367. /**
  368. * Fired when the stream encounters an error.
  369. *
  370. * @event ChangeStream#error
  371. * @type {Error}
  372. */
  373. changeStreamCursor.on('error', function(error) {
  374. processError(self, error);
  375. });
  376. if (self.pipeDestinations) {
  377. const cursorStream = changeStreamCursor.stream(self.streamOptions);
  378. for (let pipeDestination of self.pipeDestinations) {
  379. cursorStream.pipe(pipeDestination);
  380. }
  381. }
  382. return changeStreamCursor;
  383. }
  384. function applyKnownOptions(target, source, optionNames) {
  385. optionNames.forEach(name => {
  386. if (source[name]) {
  387. target[name] = source[name];
  388. }
  389. });
  390. return target;
  391. }
  392. // This method performs a basic server selection loop, satisfying the requirements of
  393. // ChangeStream resumability until the new SDAM layer can be used.
  394. const SELECTION_TIMEOUT = 30000;
  395. function waitForTopologyConnected(topology, options, callback) {
  396. setTimeout(() => {
  397. if (options && options.start == null) {
  398. options.start = now();
  399. }
  400. const start = options.start || now();
  401. const timeout = options.timeout || SELECTION_TIMEOUT;
  402. const readPreference = options.readPreference;
  403. if (topology.isConnected({ readPreference })) {
  404. return callback();
  405. }
  406. if (calculateDurationInMs(start) > timeout) {
  407. return callback(new MongoError('Timed out waiting for connection'));
  408. }
  409. waitForTopologyConnected(topology, options, callback);
  410. }, 500); // this is an arbitrary wait time to allow SDAM to transition
  411. }
  412. function processNewChange(changeStream, change, callback) {
  413. const cursor = changeStream.cursor;
  414. // a null change means the cursor has been notified, implicitly closing the change stream
  415. if (change == null) {
  416. changeStream.closed = true;
  417. }
  418. if (changeStream.closed) {
  419. if (callback) callback(new MongoError('ChangeStream is closed'));
  420. return;
  421. }
  422. if (change && !change._id) {
  423. const noResumeTokenError = new Error(
  424. 'A change stream document has been received that lacks a resume token (_id).'
  425. );
  426. if (!callback) return changeStream.emit('error', noResumeTokenError);
  427. return callback(noResumeTokenError);
  428. }
  429. // cache the resume token
  430. cursor.cacheResumeToken(change._id);
  431. // wipe the startAtOperationTime if there was one so that there won't be a conflict
  432. // between resumeToken and startAtOperationTime if we need to reconnect the cursor
  433. changeStream.options.startAtOperationTime = undefined;
  434. // Return the change
  435. if (!callback) return changeStream.emit('change', change);
  436. return callback(undefined, change);
  437. }
  438. function processError(changeStream, error, callback) {
  439. const topology = changeStream.topology;
  440. const cursor = changeStream.cursor;
  441. // If the change stream has been closed explictly, do not process error.
  442. if (changeStream.closed) {
  443. if (callback) callback(new MongoError('ChangeStream is closed'));
  444. return;
  445. }
  446. // if the resume succeeds, continue with the new cursor
  447. function resumeWithCursor(newCursor) {
  448. changeStream.cursor = newCursor;
  449. processResumeQueue(changeStream);
  450. }
  451. // otherwise, raise an error and close the change stream
  452. function unresumableError(err) {
  453. if (!callback) {
  454. changeStream.emit('error', err);
  455. changeStream.emit('close');
  456. }
  457. processResumeQueue(changeStream, err);
  458. changeStream.closed = true;
  459. }
  460. if (cursor && isResumableError(error, maxWireVersion(cursor.server))) {
  461. changeStream.cursor = undefined;
  462. // stop listening to all events from old cursor
  463. ['data', 'close', 'end', 'error'].forEach(event => cursor.removeAllListeners(event));
  464. // close internal cursor, ignore errors
  465. cursor.close();
  466. waitForTopologyConnected(topology, { readPreference: cursor.options.readPreference }, err => {
  467. // if the topology can't reconnect, close the stream
  468. if (err) return unresumableError(err);
  469. // create a new cursor, preserving the old cursor's options
  470. const newCursor = createChangeStreamCursor(changeStream, cursor.resumeOptions);
  471. // attempt to continue in emitter mode
  472. if (!callback) return resumeWithCursor(newCursor);
  473. // attempt to continue in iterator mode
  474. newCursor.hasNext(err => {
  475. // if there's an error immediately after resuming, close the stream
  476. if (err) return unresumableError(err);
  477. resumeWithCursor(newCursor);
  478. });
  479. });
  480. return;
  481. }
  482. if (!callback) return changeStream.emit('error', error);
  483. return callback(error);
  484. }
  485. /**
  486. * Safely provides a cursor across resume attempts
  487. *
  488. * @param {ChangeStream} changeStream the parent ChangeStream
  489. * @param {function} callback gets the cursor or error
  490. * @param {ChangeStreamCursor} [oldCursor] when resuming from an error, carry over options from previous cursor
  491. */
  492. function getCursor(changeStream, callback) {
  493. if (changeStream.isClosed()) {
  494. callback(new MongoError('ChangeStream is closed.'));
  495. return;
  496. }
  497. // if a cursor exists and it is open, return it
  498. if (changeStream.cursor) {
  499. callback(undefined, changeStream.cursor);
  500. return;
  501. }
  502. // no cursor, queue callback until topology reconnects
  503. changeStream[kResumeQueue].push(callback);
  504. }
  505. /**
  506. * Drain the resume queue when a new has become available
  507. *
  508. * @param {ChangeStream} changeStream the parent ChangeStream
  509. * @param {ChangeStreamCursor?} changeStream.cursor the new cursor
  510. * @param {Error} [err] error getting a new cursor
  511. */
  512. function processResumeQueue(changeStream, err) {
  513. while (changeStream[kResumeQueue].length) {
  514. const request = changeStream[kResumeQueue].pop();
  515. if (changeStream.isClosed() && !err) {
  516. request(new MongoError('Change Stream is not open.'));
  517. return;
  518. }
  519. request(err, changeStream.cursor);
  520. }
  521. }
  522. /**
  523. * The callback format for results
  524. * @callback ChangeStream~resultCallback
  525. * @param {MongoError} error An error instance representing the error during the execution.
  526. * @param {(object|null)} result The result object if the command was executed successfully.
  527. */
  528. module.exports = ChangeStream;