change_stream.js 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.ChangeStreamCursor = exports.ChangeStream = void 0;
  4. const Denque = require("denque");
  5. const collection_1 = require("./collection");
  6. const abstract_cursor_1 = require("./cursor/abstract_cursor");
  7. const db_1 = require("./db");
  8. const error_1 = require("./error");
  9. const mongo_client_1 = require("./mongo_client");
  10. const mongo_types_1 = require("./mongo_types");
  11. const aggregate_1 = require("./operations/aggregate");
  12. const execute_operation_1 = require("./operations/execute_operation");
  13. const utils_1 = require("./utils");
  14. /** @internal */
  15. const kResumeQueue = Symbol('resumeQueue');
  16. /** @internal */
  17. const kCursorStream = Symbol('cursorStream');
  18. /** @internal */
  19. const kClosed = Symbol('closed');
  20. /** @internal */
  21. const kMode = Symbol('mode');
  22. const CHANGE_STREAM_OPTIONS = ['resumeAfter', 'startAfter', 'startAtOperationTime', 'fullDocument'];
  23. const CURSOR_OPTIONS = ['batchSize', 'maxAwaitTimeMS', 'collation', 'readPreference'].concat(CHANGE_STREAM_OPTIONS);
  24. const CHANGE_DOMAIN_TYPES = {
  25. COLLECTION: Symbol('Collection'),
  26. DATABASE: Symbol('Database'),
  27. CLUSTER: Symbol('Cluster')
  28. };
  29. const NO_RESUME_TOKEN_ERROR = 'A change stream document has been received that lacks a resume token (_id).';
  30. const NO_CURSOR_ERROR = 'ChangeStream has no cursor';
  31. const CHANGESTREAM_CLOSED_ERROR = 'ChangeStream is closed';
  32. /**
  33. * Creates a new Change Stream instance. Normally created using {@link Collection#watch|Collection.watch()}.
  34. * @public
  35. */
  36. class ChangeStream extends mongo_types_1.TypedEventEmitter {
  37. /**
  38. * @internal
  39. *
  40. * @param parent - The parent object that created this change stream
  41. * @param pipeline - An array of {@link https://docs.mongodb.com/manual/reference/operator/aggregation-pipeline/|aggregation pipeline stages} through which to pass change stream documents
  42. */
  43. constructor(parent, pipeline = [], options = {}) {
  44. super();
  45. this.pipeline = pipeline;
  46. this.options = options;
  47. if (parent instanceof collection_1.Collection) {
  48. this.type = CHANGE_DOMAIN_TYPES.COLLECTION;
  49. }
  50. else if (parent instanceof db_1.Db) {
  51. this.type = CHANGE_DOMAIN_TYPES.DATABASE;
  52. }
  53. else if (parent instanceof mongo_client_1.MongoClient) {
  54. this.type = CHANGE_DOMAIN_TYPES.CLUSTER;
  55. }
  56. else {
  57. throw new error_1.MongoChangeStreamError('Parent provided to ChangeStream constructor must be an instance of Collection, Db, or MongoClient');
  58. }
  59. this.parent = parent;
  60. this.namespace = parent.s.namespace;
  61. if (!this.options.readPreference && parent.readPreference) {
  62. this.options.readPreference = parent.readPreference;
  63. }
  64. this[kResumeQueue] = new Denque();
  65. // Create contained Change Stream cursor
  66. this.cursor = createChangeStreamCursor(this, options);
  67. this[kClosed] = false;
  68. this[kMode] = false;
  69. // Listen for any `change` listeners being added to ChangeStream
  70. this.on('newListener', eventName => {
  71. if (eventName === 'change' && this.cursor && this.listenerCount('change') === 0) {
  72. streamEvents(this, this.cursor);
  73. }
  74. });
  75. this.on('removeListener', eventName => {
  76. var _a;
  77. if (eventName === 'change' && this.listenerCount('change') === 0 && this.cursor) {
  78. (_a = this[kCursorStream]) === null || _a === void 0 ? void 0 : _a.removeAllListeners('data');
  79. }
  80. });
  81. }
  82. /** @internal */
  83. get cursorStream() {
  84. return this[kCursorStream];
  85. }
  86. /** The cached resume token that is used to resume after the most recently returned change. */
  87. get resumeToken() {
  88. var _a;
  89. return (_a = this.cursor) === null || _a === void 0 ? void 0 : _a.resumeToken;
  90. }
  91. hasNext(callback) {
  92. setIsIterator(this);
  93. return (0, utils_1.maybePromise)(callback, cb => {
  94. getCursor(this, (err, cursor) => {
  95. if (err || !cursor)
  96. return cb(err); // failed to resume, raise an error
  97. cursor.hasNext(cb);
  98. });
  99. });
  100. }
  101. next(callback) {
  102. setIsIterator(this);
  103. return (0, utils_1.maybePromise)(callback, cb => {
  104. getCursor(this, (err, cursor) => {
  105. if (err || !cursor)
  106. return cb(err); // failed to resume, raise an error
  107. cursor.next((error, change) => {
  108. if (error) {
  109. this[kResumeQueue].push(() => this.next(cb));
  110. processError(this, error, cb);
  111. return;
  112. }
  113. processNewChange(this, change, cb);
  114. });
  115. });
  116. });
  117. }
  118. /** Is the cursor closed */
  119. get closed() {
  120. var _a, _b;
  121. return this[kClosed] || ((_b = (_a = this.cursor) === null || _a === void 0 ? void 0 : _a.closed) !== null && _b !== void 0 ? _b : false);
  122. }
  123. /** Close the Change Stream */
  124. close(callback) {
  125. this[kClosed] = true;
  126. return (0, utils_1.maybePromise)(callback, cb => {
  127. if (!this.cursor) {
  128. return cb();
  129. }
  130. const cursor = this.cursor;
  131. return cursor.close(err => {
  132. endStream(this);
  133. this.cursor = undefined;
  134. return cb(err);
  135. });
  136. });
  137. }
  138. /**
  139. * Return a modified Readable stream including a possible transform method.
  140. * @throws MongoDriverError if this.cursor is undefined
  141. */
  142. stream(options) {
  143. this.streamOptions = options;
  144. if (!this.cursor)
  145. throw new error_1.MongoChangeStreamError(NO_CURSOR_ERROR);
  146. return this.cursor.stream(options);
  147. }
  148. tryNext(callback) {
  149. setIsIterator(this);
  150. return (0, utils_1.maybePromise)(callback, cb => {
  151. getCursor(this, (err, cursor) => {
  152. if (err || !cursor)
  153. return cb(err); // failed to resume, raise an error
  154. return cursor.tryNext(cb);
  155. });
  156. });
  157. }
  158. }
  159. exports.ChangeStream = ChangeStream;
  160. /** @event */
  161. ChangeStream.RESPONSE = 'response';
  162. /** @event */
  163. ChangeStream.MORE = 'more';
  164. /** @event */
  165. ChangeStream.INIT = 'init';
  166. /** @event */
  167. ChangeStream.CLOSE = 'close';
  168. /**
  169. * Fired for each new matching change in the specified namespace. Attaching a `change`
  170. * event listener to a Change Stream will switch the stream into flowing mode. Data will
  171. * then be passed as soon as it is available.
  172. * @event
  173. */
  174. ChangeStream.CHANGE = 'change';
  175. /** @event */
  176. ChangeStream.END = 'end';
  177. /** @event */
  178. ChangeStream.ERROR = 'error';
  179. /**
  180. * Emitted each time the change stream stores a new resume token.
  181. * @event
  182. */
  183. ChangeStream.RESUME_TOKEN_CHANGED = 'resumeTokenChanged';
  184. /** @internal */
  185. class ChangeStreamCursor extends abstract_cursor_1.AbstractCursor {
  186. constructor(topology, namespace, pipeline = [], options = {}) {
  187. super(topology, namespace, options);
  188. this.pipeline = pipeline;
  189. this.options = options;
  190. this._resumeToken = null;
  191. this.startAtOperationTime = options.startAtOperationTime;
  192. if (options.startAfter) {
  193. this.resumeToken = options.startAfter;
  194. }
  195. else if (options.resumeAfter) {
  196. this.resumeToken = options.resumeAfter;
  197. }
  198. }
  199. set resumeToken(token) {
  200. this._resumeToken = token;
  201. this.emit(ChangeStream.RESUME_TOKEN_CHANGED, token);
  202. }
  203. get resumeToken() {
  204. return this._resumeToken;
  205. }
  206. get resumeOptions() {
  207. const result = {};
  208. for (const optionName of CURSOR_OPTIONS) {
  209. if (Reflect.has(this.options, optionName)) {
  210. Reflect.set(result, optionName, Reflect.get(this.options, optionName));
  211. }
  212. }
  213. if (this.resumeToken || this.startAtOperationTime) {
  214. ['resumeAfter', 'startAfter', 'startAtOperationTime'].forEach(key => Reflect.deleteProperty(result, key));
  215. if (this.resumeToken) {
  216. const resumeKey = this.options.startAfter && !this.hasReceived ? 'startAfter' : 'resumeAfter';
  217. Reflect.set(result, resumeKey, this.resumeToken);
  218. }
  219. else if (this.startAtOperationTime && (0, utils_1.maxWireVersion)(this.server) >= 7) {
  220. result.startAtOperationTime = this.startAtOperationTime;
  221. }
  222. }
  223. return result;
  224. }
  225. cacheResumeToken(resumeToken) {
  226. if (this.bufferedCount() === 0 && this.postBatchResumeToken) {
  227. this.resumeToken = this.postBatchResumeToken;
  228. }
  229. else {
  230. this.resumeToken = resumeToken;
  231. }
  232. this.hasReceived = true;
  233. }
  234. _processBatch(batchName, response) {
  235. const cursor = (response === null || response === void 0 ? void 0 : response.cursor) || {};
  236. if (cursor.postBatchResumeToken) {
  237. this.postBatchResumeToken = cursor.postBatchResumeToken;
  238. if (cursor[batchName].length === 0) {
  239. this.resumeToken = cursor.postBatchResumeToken;
  240. }
  241. }
  242. }
  243. clone() {
  244. return new ChangeStreamCursor(this.topology, this.namespace, this.pipeline, {
  245. ...this.cursorOptions
  246. });
  247. }
  248. _initialize(session, callback) {
  249. const aggregateOperation = new aggregate_1.AggregateOperation(this.namespace, this.pipeline, {
  250. ...this.cursorOptions,
  251. ...this.options,
  252. session
  253. });
  254. (0, execute_operation_1.executeOperation)(this.topology, aggregateOperation, (err, response) => {
  255. if (err || response == null) {
  256. return callback(err);
  257. }
  258. const server = aggregateOperation.server;
  259. if (this.startAtOperationTime == null &&
  260. this.resumeAfter == null &&
  261. this.startAfter == null &&
  262. (0, utils_1.maxWireVersion)(server) >= 7) {
  263. this.startAtOperationTime = response.operationTime;
  264. }
  265. this._processBatch('firstBatch', response);
  266. this.emit(ChangeStream.INIT, response);
  267. this.emit(ChangeStream.RESPONSE);
  268. // TODO: NODE-2882
  269. callback(undefined, { server, session, response });
  270. });
  271. }
  272. _getMore(batchSize, callback) {
  273. super._getMore(batchSize, (err, response) => {
  274. if (err) {
  275. return callback(err);
  276. }
  277. this._processBatch('nextBatch', response);
  278. this.emit(ChangeStream.MORE, response);
  279. this.emit(ChangeStream.RESPONSE);
  280. callback(err, response);
  281. });
  282. }
  283. }
  284. exports.ChangeStreamCursor = ChangeStreamCursor;
  285. const CHANGE_STREAM_EVENTS = [
  286. ChangeStream.RESUME_TOKEN_CHANGED,
  287. ChangeStream.END,
  288. ChangeStream.CLOSE
  289. ];
  290. function setIsEmitter(changeStream) {
  291. if (changeStream[kMode] === 'iterator') {
  292. // TODO(NODE-3485): Replace with MongoChangeStreamModeError
  293. throw new error_1.MongoAPIError('ChangeStream cannot be used as an EventEmitter after being used as an iterator');
  294. }
  295. changeStream[kMode] = 'emitter';
  296. }
  297. function setIsIterator(changeStream) {
  298. if (changeStream[kMode] === 'emitter') {
  299. // TODO(NODE-3485): Replace with MongoChangeStreamModeError
  300. throw new error_1.MongoAPIError('ChangeStream cannot be used as an iterator after being used as an EventEmitter');
  301. }
  302. changeStream[kMode] = 'iterator';
  303. }
  304. /**
  305. * Create a new change stream cursor based on self's configuration
  306. * @internal
  307. */
  308. function createChangeStreamCursor(changeStream, options) {
  309. const changeStreamStageOptions = { fullDocument: options.fullDocument || 'default' };
  310. applyKnownOptions(changeStreamStageOptions, options, CHANGE_STREAM_OPTIONS);
  311. if (changeStream.type === CHANGE_DOMAIN_TYPES.CLUSTER) {
  312. changeStreamStageOptions.allChangesForCluster = true;
  313. }
  314. const pipeline = [{ $changeStream: changeStreamStageOptions }].concat(changeStream.pipeline);
  315. const cursorOptions = applyKnownOptions({}, options, CURSOR_OPTIONS);
  316. const changeStreamCursor = new ChangeStreamCursor((0, utils_1.getTopology)(changeStream.parent), changeStream.namespace, pipeline, cursorOptions);
  317. for (const event of CHANGE_STREAM_EVENTS) {
  318. changeStreamCursor.on(event, e => changeStream.emit(event, e));
  319. }
  320. if (changeStream.listenerCount(ChangeStream.CHANGE) > 0) {
  321. streamEvents(changeStream, changeStreamCursor);
  322. }
  323. return changeStreamCursor;
  324. }
  325. function applyKnownOptions(target, source, optionNames) {
  326. optionNames.forEach(name => {
  327. if (source[name]) {
  328. target[name] = source[name];
  329. }
  330. });
  331. return target;
  332. }
  333. // This method performs a basic server selection loop, satisfying the requirements of
  334. // ChangeStream resumability until the new SDAM layer can be used.
  335. const SELECTION_TIMEOUT = 30000;
  336. function waitForTopologyConnected(topology, options, callback) {
  337. setTimeout(() => {
  338. if (options && options.start == null) {
  339. options.start = (0, utils_1.now)();
  340. }
  341. const start = options.start || (0, utils_1.now)();
  342. const timeout = options.timeout || SELECTION_TIMEOUT;
  343. if (topology.isConnected()) {
  344. return callback();
  345. }
  346. if ((0, utils_1.calculateDurationInMs)(start) > timeout) {
  347. // TODO(NODE-3497): Replace with MongoNetworkTimeoutError
  348. return callback(new error_1.MongoRuntimeError('Timed out waiting for connection'));
  349. }
  350. waitForTopologyConnected(topology, options, callback);
  351. }, 500); // this is an arbitrary wait time to allow SDAM to transition
  352. }
  353. function closeWithError(changeStream, error, callback) {
  354. if (!callback) {
  355. changeStream.emit(ChangeStream.ERROR, error);
  356. }
  357. changeStream.close(() => callback && callback(error));
  358. }
  359. function streamEvents(changeStream, cursor) {
  360. setIsEmitter(changeStream);
  361. const stream = changeStream[kCursorStream] || cursor.stream();
  362. changeStream[kCursorStream] = stream;
  363. stream.on('data', change => processNewChange(changeStream, change));
  364. stream.on('error', error => processError(changeStream, error));
  365. }
  366. function endStream(changeStream) {
  367. const cursorStream = changeStream[kCursorStream];
  368. if (cursorStream) {
  369. ['data', 'close', 'end', 'error'].forEach(event => cursorStream.removeAllListeners(event));
  370. cursorStream.destroy();
  371. }
  372. changeStream[kCursorStream] = undefined;
  373. }
  374. function processNewChange(changeStream, change, callback) {
  375. var _a;
  376. if (changeStream[kClosed]) {
  377. // TODO(NODE-3485): Replace with MongoChangeStreamClosedError
  378. if (callback)
  379. callback(new error_1.MongoAPIError(CHANGESTREAM_CLOSED_ERROR));
  380. return;
  381. }
  382. // a null change means the cursor has been notified, implicitly closing the change stream
  383. if (change == null) {
  384. // TODO(NODE-3485): Replace with MongoChangeStreamClosedError
  385. return closeWithError(changeStream, new error_1.MongoRuntimeError(CHANGESTREAM_CLOSED_ERROR), callback);
  386. }
  387. if (change && !change._id) {
  388. return closeWithError(changeStream, new error_1.MongoChangeStreamError(NO_RESUME_TOKEN_ERROR), callback);
  389. }
  390. // cache the resume token
  391. (_a = changeStream.cursor) === null || _a === void 0 ? void 0 : _a.cacheResumeToken(change._id);
  392. // wipe the startAtOperationTime if there was one so that there won't be a conflict
  393. // between resumeToken and startAtOperationTime if we need to reconnect the cursor
  394. changeStream.options.startAtOperationTime = undefined;
  395. // Return the change
  396. if (!callback)
  397. return changeStream.emit(ChangeStream.CHANGE, change);
  398. return callback(undefined, change);
  399. }
  400. function processError(changeStream, error, callback) {
  401. const cursor = changeStream.cursor;
  402. // If the change stream has been closed explicitly, do not process error.
  403. if (changeStream[kClosed]) {
  404. // TODO(NODE-3485): Replace with MongoChangeStreamClosedError
  405. if (callback)
  406. callback(new error_1.MongoAPIError(CHANGESTREAM_CLOSED_ERROR));
  407. return;
  408. }
  409. // if the resume succeeds, continue with the new cursor
  410. function resumeWithCursor(newCursor) {
  411. changeStream.cursor = newCursor;
  412. processResumeQueue(changeStream);
  413. }
  414. // otherwise, raise an error and close the change stream
  415. function unresumableError(err) {
  416. if (!callback) {
  417. changeStream.emit(ChangeStream.ERROR, err);
  418. }
  419. changeStream.close(() => processResumeQueue(changeStream, err));
  420. }
  421. if (cursor && (0, error_1.isResumableError)(error, (0, utils_1.maxWireVersion)(cursor.server))) {
  422. changeStream.cursor = undefined;
  423. // stop listening to all events from old cursor
  424. endStream(changeStream);
  425. // close internal cursor, ignore errors
  426. cursor.close();
  427. const topology = (0, utils_1.getTopology)(changeStream.parent);
  428. waitForTopologyConnected(topology, { readPreference: cursor.readPreference }, err => {
  429. // if the topology can't reconnect, close the stream
  430. if (err)
  431. return unresumableError(err);
  432. // create a new cursor, preserving the old cursor's options
  433. const newCursor = createChangeStreamCursor(changeStream, cursor.resumeOptions);
  434. // attempt to continue in emitter mode
  435. if (!callback)
  436. return resumeWithCursor(newCursor);
  437. // attempt to continue in iterator mode
  438. newCursor.hasNext(err => {
  439. // if there's an error immediately after resuming, close the stream
  440. if (err)
  441. return unresumableError(err);
  442. resumeWithCursor(newCursor);
  443. });
  444. });
  445. return;
  446. }
  447. // if initial error wasn't resumable, raise an error and close the change stream
  448. return closeWithError(changeStream, error, callback);
  449. }
  450. /**
  451. * Safely provides a cursor across resume attempts
  452. *
  453. * @param changeStream - the parent ChangeStream
  454. */
  455. function getCursor(changeStream, callback) {
  456. if (changeStream[kClosed]) {
  457. // TODO(NODE-3485): Replace with MongoChangeStreamClosedError
  458. callback(new error_1.MongoAPIError(CHANGESTREAM_CLOSED_ERROR));
  459. return;
  460. }
  461. // if a cursor exists and it is open, return it
  462. if (changeStream.cursor) {
  463. callback(undefined, changeStream.cursor);
  464. return;
  465. }
  466. // no cursor, queue callback until topology reconnects
  467. changeStream[kResumeQueue].push(callback);
  468. }
  469. /**
  470. * Drain the resume queue when a new has become available
  471. *
  472. * @param changeStream - the parent ChangeStream
  473. * @param err - error getting a new cursor
  474. */
  475. function processResumeQueue(changeStream, err) {
  476. while (changeStream[kResumeQueue].length) {
  477. const request = changeStream[kResumeQueue].pop();
  478. if (!request)
  479. break; // Should never occur but TS can't use the length check in the while condition
  480. if (!err) {
  481. if (changeStream[kClosed]) {
  482. // TODO(NODE-3485): Replace with MongoChangeStreamClosedError
  483. request(new error_1.MongoAPIError(CHANGESTREAM_CLOSED_ERROR));
  484. return;
  485. }
  486. if (!changeStream.cursor) {
  487. request(new error_1.MongoChangeStreamError(NO_CURSOR_ERROR));
  488. return;
  489. }
  490. }
  491. request(err, changeStream.cursor);
  492. }
  493. }
  494. //# sourceMappingURL=change_stream.js.map