change_stream.js 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.ChangeStreamCursor = exports.ChangeStream = void 0;
  4. const Denque = require("denque");
  5. const timers_1 = require("timers");
  6. const collection_1 = require("./collection");
  7. const constants_1 = require("./constants");
  8. const abstract_cursor_1 = require("./cursor/abstract_cursor");
  9. const db_1 = require("./db");
  10. const error_1 = require("./error");
  11. const mongo_client_1 = require("./mongo_client");
  12. const mongo_types_1 = require("./mongo_types");
  13. const aggregate_1 = require("./operations/aggregate");
  14. const execute_operation_1 = require("./operations/execute_operation");
  15. const utils_1 = require("./utils");
  16. /** @internal */
  17. const kResumeQueue = Symbol('resumeQueue');
  18. /** @internal */
  19. const kCursorStream = Symbol('cursorStream');
  20. /** @internal */
  21. const kClosed = Symbol('closed');
  22. /** @internal */
  23. const kMode = Symbol('mode');
  24. const CHANGE_STREAM_OPTIONS = [
  25. 'resumeAfter',
  26. 'startAfter',
  27. 'startAtOperationTime',
  28. 'fullDocument',
  29. 'fullDocumentBeforeChange',
  30. 'showExpandedEvents'
  31. ];
  32. const CHANGE_DOMAIN_TYPES = {
  33. COLLECTION: Symbol('Collection'),
  34. DATABASE: Symbol('Database'),
  35. CLUSTER: Symbol('Cluster')
  36. };
  37. const SELECTION_TIMEOUT = 30000;
  38. const CHANGE_STREAM_EVENTS = [constants_1.RESUME_TOKEN_CHANGED, constants_1.END, constants_1.CLOSE];
  39. const NO_RESUME_TOKEN_ERROR = 'A change stream document has been received that lacks a resume token (_id).';
  40. const NO_CURSOR_ERROR = 'ChangeStream has no cursor';
  41. const CHANGESTREAM_CLOSED_ERROR = 'ChangeStream is closed';
  42. /**
  43. * Creates a new Change Stream instance. Normally created using {@link Collection#watch|Collection.watch()}.
  44. * @public
  45. */
  46. class ChangeStream extends mongo_types_1.TypedEventEmitter {
  47. /**
  48. * @internal
  49. *
  50. * @param parent - The parent object that created this change stream
  51. * @param pipeline - An array of {@link https://docs.mongodb.com/manual/reference/operator/aggregation-pipeline/|aggregation pipeline stages} through which to pass change stream documents
  52. */
  53. constructor(parent, pipeline = [], options = {}) {
  54. super();
  55. this.pipeline = pipeline;
  56. this.options = options;
  57. if (parent instanceof collection_1.Collection) {
  58. this.type = CHANGE_DOMAIN_TYPES.COLLECTION;
  59. }
  60. else if (parent instanceof db_1.Db) {
  61. this.type = CHANGE_DOMAIN_TYPES.DATABASE;
  62. }
  63. else if (parent instanceof mongo_client_1.MongoClient) {
  64. this.type = CHANGE_DOMAIN_TYPES.CLUSTER;
  65. }
  66. else {
  67. throw new error_1.MongoChangeStreamError('Parent provided to ChangeStream constructor must be an instance of Collection, Db, or MongoClient');
  68. }
  69. this.parent = parent;
  70. this.namespace = parent.s.namespace;
  71. if (!this.options.readPreference && parent.readPreference) {
  72. this.options.readPreference = parent.readPreference;
  73. }
  74. this[kResumeQueue] = new Denque();
  75. // Create contained Change Stream cursor
  76. this.cursor = this._createChangeStreamCursor(options);
  77. this[kClosed] = false;
  78. this[kMode] = false;
  79. // Listen for any `change` listeners being added to ChangeStream
  80. this.on('newListener', eventName => {
  81. if (eventName === 'change' && this.cursor && this.listenerCount('change') === 0) {
  82. this._streamEvents(this.cursor);
  83. }
  84. });
  85. this.on('removeListener', eventName => {
  86. var _a;
  87. if (eventName === 'change' && this.listenerCount('change') === 0 && this.cursor) {
  88. (_a = this[kCursorStream]) === null || _a === void 0 ? void 0 : _a.removeAllListeners('data');
  89. }
  90. });
  91. }
  92. /** @internal */
  93. get cursorStream() {
  94. return this[kCursorStream];
  95. }
  96. /** The cached resume token that is used to resume after the most recently returned change. */
  97. get resumeToken() {
  98. var _a;
  99. return (_a = this.cursor) === null || _a === void 0 ? void 0 : _a.resumeToken;
  100. }
  101. hasNext(callback) {
  102. this._setIsIterator();
  103. return (0, utils_1.maybePromise)(callback, cb => {
  104. this._getCursor((err, cursor) => {
  105. if (err || !cursor)
  106. return cb(err); // failed to resume, raise an error
  107. cursor.hasNext(cb);
  108. });
  109. });
  110. }
  111. next(callback) {
  112. this._setIsIterator();
  113. return (0, utils_1.maybePromise)(callback, cb => {
  114. this._getCursor((err, cursor) => {
  115. if (err || !cursor)
  116. return cb(err); // failed to resume, raise an error
  117. cursor.next((error, change) => {
  118. if (error) {
  119. this[kResumeQueue].push(() => this.next(cb));
  120. this._processError(error, cb);
  121. return;
  122. }
  123. this._processNewChange(change !== null && change !== void 0 ? change : null, cb);
  124. });
  125. });
  126. });
  127. }
  128. /** Is the cursor closed */
  129. get closed() {
  130. var _a, _b;
  131. return this[kClosed] || ((_b = (_a = this.cursor) === null || _a === void 0 ? void 0 : _a.closed) !== null && _b !== void 0 ? _b : false);
  132. }
  133. /** Close the Change Stream */
  134. close(callback) {
  135. this[kClosed] = true;
  136. return (0, utils_1.maybePromise)(callback, cb => {
  137. if (!this.cursor) {
  138. return cb();
  139. }
  140. const cursor = this.cursor;
  141. return cursor.close(err => {
  142. this._endStream();
  143. this.cursor = undefined;
  144. return cb(err);
  145. });
  146. });
  147. }
  148. /**
  149. * Return a modified Readable stream including a possible transform method.
  150. * @throws MongoDriverError if this.cursor is undefined
  151. */
  152. stream(options) {
  153. this.streamOptions = options;
  154. if (!this.cursor)
  155. throw new error_1.MongoChangeStreamError(NO_CURSOR_ERROR);
  156. return this.cursor.stream(options);
  157. }
  158. tryNext(callback) {
  159. this._setIsIterator();
  160. return (0, utils_1.maybePromise)(callback, cb => {
  161. this._getCursor((err, cursor) => {
  162. if (err || !cursor)
  163. return cb(err); // failed to resume, raise an error
  164. return cursor.tryNext(cb);
  165. });
  166. });
  167. }
  168. /** @internal */
  169. _setIsEmitter() {
  170. if (this[kMode] === 'iterator') {
  171. // TODO(NODE-3485): Replace with MongoChangeStreamModeError
  172. throw new error_1.MongoAPIError('ChangeStream cannot be used as an EventEmitter after being used as an iterator');
  173. }
  174. this[kMode] = 'emitter';
  175. }
  176. /** @internal */
  177. _setIsIterator() {
  178. if (this[kMode] === 'emitter') {
  179. // TODO(NODE-3485): Replace with MongoChangeStreamModeError
  180. throw new error_1.MongoAPIError('ChangeStream cannot be used as an iterator after being used as an EventEmitter');
  181. }
  182. this[kMode] = 'iterator';
  183. }
  184. /**
  185. * Create a new change stream cursor based on self's configuration
  186. * @internal
  187. */
  188. _createChangeStreamCursor(options) {
  189. const changeStreamStageOptions = (0, utils_1.filterOptions)(options, CHANGE_STREAM_OPTIONS);
  190. if (this.type === CHANGE_DOMAIN_TYPES.CLUSTER) {
  191. changeStreamStageOptions.allChangesForCluster = true;
  192. }
  193. const pipeline = [{ $changeStream: changeStreamStageOptions }, ...this.pipeline];
  194. const client = this.type === CHANGE_DOMAIN_TYPES.CLUSTER
  195. ? this.parent
  196. : this.type === CHANGE_DOMAIN_TYPES.DATABASE
  197. ? this.parent.s.client
  198. : this.type === CHANGE_DOMAIN_TYPES.COLLECTION
  199. ? this.parent.s.db.s.client
  200. : null;
  201. if (client == null) {
  202. // This should never happen because of the assertion in the constructor
  203. throw new error_1.MongoRuntimeError(`Changestream type should only be one of cluster, database, collection. Found ${this.type.toString()}`);
  204. }
  205. const changeStreamCursor = new ChangeStreamCursor(client, this.namespace, pipeline, options);
  206. for (const event of CHANGE_STREAM_EVENTS) {
  207. changeStreamCursor.on(event, e => this.emit(event, e));
  208. }
  209. if (this.listenerCount(ChangeStream.CHANGE) > 0) {
  210. this._streamEvents(changeStreamCursor);
  211. }
  212. return changeStreamCursor;
  213. }
  214. /**
  215. * This method performs a basic server selection loop, satisfying the requirements of
  216. * ChangeStream resumability until the new SDAM layer can be used.
  217. * @internal
  218. */
  219. _waitForTopologyConnected(topology, options, callback) {
  220. (0, timers_1.setTimeout)(() => {
  221. if (options && options.start == null) {
  222. options.start = (0, utils_1.now)();
  223. }
  224. const start = options.start || (0, utils_1.now)();
  225. const timeout = options.timeout || SELECTION_TIMEOUT;
  226. if (topology.isConnected()) {
  227. return callback();
  228. }
  229. if ((0, utils_1.calculateDurationInMs)(start) > timeout) {
  230. // TODO(NODE-3497): Replace with MongoNetworkTimeoutError
  231. return callback(new error_1.MongoRuntimeError('Timed out waiting for connection'));
  232. }
  233. this._waitForTopologyConnected(topology, options, callback);
  234. }, 500); // this is an arbitrary wait time to allow SDAM to transition
  235. }
  236. /** @internal */
  237. _closeWithError(error, callback) {
  238. if (!callback) {
  239. this.emit(ChangeStream.ERROR, error);
  240. }
  241. this.close(() => callback && callback(error));
  242. }
  243. /** @internal */
  244. _streamEvents(cursor) {
  245. var _a;
  246. this._setIsEmitter();
  247. const stream = (_a = this[kCursorStream]) !== null && _a !== void 0 ? _a : cursor.stream();
  248. this[kCursorStream] = stream;
  249. stream.on('data', change => this._processNewChange(change));
  250. stream.on('error', error => this._processError(error));
  251. }
  252. /** @internal */
  253. _endStream() {
  254. const cursorStream = this[kCursorStream];
  255. if (cursorStream) {
  256. ['data', 'close', 'end', 'error'].forEach(event => cursorStream.removeAllListeners(event));
  257. cursorStream.destroy();
  258. }
  259. this[kCursorStream] = undefined;
  260. }
  261. /** @internal */
  262. _processNewChange(change, callback) {
  263. var _a;
  264. if (this[kClosed]) {
  265. // TODO(NODE-3485): Replace with MongoChangeStreamClosedError
  266. if (callback)
  267. callback(new error_1.MongoAPIError(CHANGESTREAM_CLOSED_ERROR));
  268. return;
  269. }
  270. // a null change means the cursor has been notified, implicitly closing the change stream
  271. if (change == null) {
  272. // TODO(NODE-3485): Replace with MongoChangeStreamClosedError
  273. return this._closeWithError(new error_1.MongoRuntimeError(CHANGESTREAM_CLOSED_ERROR), callback);
  274. }
  275. if (change && !change._id) {
  276. return this._closeWithError(new error_1.MongoChangeStreamError(NO_RESUME_TOKEN_ERROR), callback);
  277. }
  278. // cache the resume token
  279. (_a = this.cursor) === null || _a === void 0 ? void 0 : _a.cacheResumeToken(change._id);
  280. // wipe the startAtOperationTime if there was one so that there won't be a conflict
  281. // between resumeToken and startAtOperationTime if we need to reconnect the cursor
  282. this.options.startAtOperationTime = undefined;
  283. // Return the change
  284. if (!callback)
  285. return this.emit(ChangeStream.CHANGE, change);
  286. return callback(undefined, change);
  287. }
  288. /** @internal */
  289. _processError(error, callback) {
  290. const cursor = this.cursor;
  291. // If the change stream has been closed explicitly, do not process error.
  292. if (this[kClosed]) {
  293. // TODO(NODE-3485): Replace with MongoChangeStreamClosedError
  294. if (callback)
  295. callback(new error_1.MongoAPIError(CHANGESTREAM_CLOSED_ERROR));
  296. return;
  297. }
  298. // if the resume succeeds, continue with the new cursor
  299. const resumeWithCursor = (newCursor) => {
  300. this.cursor = newCursor;
  301. this._processResumeQueue();
  302. };
  303. // otherwise, raise an error and close the change stream
  304. const unresumableError = (err) => {
  305. if (!callback) {
  306. this.emit(ChangeStream.ERROR, err);
  307. }
  308. this.close(() => this._processResumeQueue(err));
  309. };
  310. if (cursor && (0, error_1.isResumableError)(error, (0, utils_1.maxWireVersion)(cursor.server))) {
  311. this.cursor = undefined;
  312. // stop listening to all events from old cursor
  313. this._endStream();
  314. // close internal cursor, ignore errors
  315. cursor.close();
  316. const topology = (0, utils_1.getTopology)(this.parent);
  317. this._waitForTopologyConnected(topology, { readPreference: cursor.readPreference }, err => {
  318. // if the topology can't reconnect, close the stream
  319. if (err)
  320. return unresumableError(err);
  321. // create a new cursor, preserving the old cursor's options
  322. const newCursor = this._createChangeStreamCursor(cursor.resumeOptions);
  323. // attempt to continue in emitter mode
  324. if (!callback)
  325. return resumeWithCursor(newCursor);
  326. // attempt to continue in iterator mode
  327. newCursor.hasNext(err => {
  328. // if there's an error immediately after resuming, close the stream
  329. if (err)
  330. return unresumableError(err);
  331. resumeWithCursor(newCursor);
  332. });
  333. });
  334. return;
  335. }
  336. // if initial error wasn't resumable, raise an error and close the change stream
  337. return this._closeWithError(error, callback);
  338. }
  339. /** @internal */
  340. _getCursor(callback) {
  341. if (this[kClosed]) {
  342. // TODO(NODE-3485): Replace with MongoChangeStreamClosedError
  343. callback(new error_1.MongoAPIError(CHANGESTREAM_CLOSED_ERROR));
  344. return;
  345. }
  346. // if a cursor exists and it is open, return it
  347. if (this.cursor) {
  348. callback(undefined, this.cursor);
  349. return;
  350. }
  351. // no cursor, queue callback until topology reconnects
  352. this[kResumeQueue].push(callback);
  353. }
  354. /**
  355. * Drain the resume queue when a new has become available
  356. * @internal
  357. *
  358. * @param error - error getting a new cursor
  359. */
  360. _processResumeQueue(error) {
  361. var _a;
  362. while (this[kResumeQueue].length) {
  363. const request = this[kResumeQueue].pop();
  364. if (!request)
  365. break; // Should never occur but TS can't use the length check in the while condition
  366. if (!error) {
  367. if (this[kClosed]) {
  368. // TODO(NODE-3485): Replace with MongoChangeStreamClosedError
  369. request(new error_1.MongoAPIError(CHANGESTREAM_CLOSED_ERROR));
  370. return;
  371. }
  372. if (!this.cursor) {
  373. request(new error_1.MongoChangeStreamError(NO_CURSOR_ERROR));
  374. return;
  375. }
  376. }
  377. request(error, (_a = this.cursor) !== null && _a !== void 0 ? _a : undefined);
  378. }
  379. }
  380. }
  381. exports.ChangeStream = ChangeStream;
  382. /** @event */
  383. ChangeStream.RESPONSE = constants_1.RESPONSE;
  384. /** @event */
  385. ChangeStream.MORE = constants_1.MORE;
  386. /** @event */
  387. ChangeStream.INIT = constants_1.INIT;
  388. /** @event */
  389. ChangeStream.CLOSE = constants_1.CLOSE;
  390. /**
  391. * Fired for each new matching change in the specified namespace. Attaching a `change`
  392. * event listener to a Change Stream will switch the stream into flowing mode. Data will
  393. * then be passed as soon as it is available.
  394. * @event
  395. */
  396. ChangeStream.CHANGE = constants_1.CHANGE;
  397. /** @event */
  398. ChangeStream.END = constants_1.END;
  399. /** @event */
  400. ChangeStream.ERROR = constants_1.ERROR;
  401. /**
  402. * Emitted each time the change stream stores a new resume token.
  403. * @event
  404. */
  405. ChangeStream.RESUME_TOKEN_CHANGED = constants_1.RESUME_TOKEN_CHANGED;
  406. /** @internal */
  407. class ChangeStreamCursor extends abstract_cursor_1.AbstractCursor {
  408. constructor(client, namespace, pipeline = [], options = {}) {
  409. super(client, namespace, options);
  410. this.pipeline = pipeline;
  411. this.options = options;
  412. this._resumeToken = null;
  413. this.startAtOperationTime = options.startAtOperationTime;
  414. if (options.startAfter) {
  415. this.resumeToken = options.startAfter;
  416. }
  417. else if (options.resumeAfter) {
  418. this.resumeToken = options.resumeAfter;
  419. }
  420. }
  421. set resumeToken(token) {
  422. this._resumeToken = token;
  423. this.emit(ChangeStream.RESUME_TOKEN_CHANGED, token);
  424. }
  425. get resumeToken() {
  426. return this._resumeToken;
  427. }
  428. get resumeOptions() {
  429. const options = {
  430. ...this.options
  431. };
  432. for (const key of ['resumeAfter', 'startAfter', 'startAtOperationTime']) {
  433. delete options[key];
  434. }
  435. if (this.resumeToken != null) {
  436. if (this.options.startAfter && !this.hasReceived) {
  437. options.startAfter = this.resumeToken;
  438. }
  439. else {
  440. options.resumeAfter = this.resumeToken;
  441. }
  442. }
  443. else if (this.startAtOperationTime != null && (0, utils_1.maxWireVersion)(this.server) >= 7) {
  444. options.startAtOperationTime = this.startAtOperationTime;
  445. }
  446. return options;
  447. }
  448. cacheResumeToken(resumeToken) {
  449. if (this.bufferedCount() === 0 && this.postBatchResumeToken) {
  450. this.resumeToken = this.postBatchResumeToken;
  451. }
  452. else {
  453. this.resumeToken = resumeToken;
  454. }
  455. this.hasReceived = true;
  456. }
  457. _processBatch(response) {
  458. const cursor = response.cursor;
  459. if (cursor.postBatchResumeToken) {
  460. this.postBatchResumeToken = response.cursor.postBatchResumeToken;
  461. const batch = 'firstBatch' in response.cursor ? response.cursor.firstBatch : response.cursor.nextBatch;
  462. if (batch.length === 0) {
  463. this.resumeToken = cursor.postBatchResumeToken;
  464. }
  465. }
  466. }
  467. clone() {
  468. return new ChangeStreamCursor(this.client, this.namespace, this.pipeline, {
  469. ...this.cursorOptions
  470. });
  471. }
  472. _initialize(session, callback) {
  473. const aggregateOperation = new aggregate_1.AggregateOperation(this.namespace, this.pipeline, {
  474. ...this.cursorOptions,
  475. ...this.options,
  476. session
  477. });
  478. (0, execute_operation_1.executeOperation)(session.client, aggregateOperation, (err, response) => {
  479. if (err || response == null) {
  480. return callback(err);
  481. }
  482. const server = aggregateOperation.server;
  483. if (this.startAtOperationTime == null &&
  484. this.resumeAfter == null &&
  485. this.startAfter == null &&
  486. (0, utils_1.maxWireVersion)(server) >= 7) {
  487. this.startAtOperationTime = response.operationTime;
  488. }
  489. this._processBatch(response);
  490. this.emit(ChangeStream.INIT, response);
  491. this.emit(ChangeStream.RESPONSE);
  492. // TODO: NODE-2882
  493. callback(undefined, { server, session, response });
  494. });
  495. }
  496. _getMore(batchSize, callback) {
  497. super._getMore(batchSize, (err, response) => {
  498. if (err) {
  499. return callback(err);
  500. }
  501. this._processBatch(response);
  502. this.emit(ChangeStream.MORE, response);
  503. this.emit(ChangeStream.RESPONSE);
  504. callback(err, response);
  505. });
  506. }
  507. }
  508. exports.ChangeStreamCursor = ChangeStreamCursor;
  509. //# sourceMappingURL=change_stream.js.map