ChangeStream.js 1.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. 'use strict';
  2. /*!
  3. * Module dependencies.
  4. */
  5. const EventEmitter = require('events').EventEmitter;
  6. /*!
  7. * ignore
  8. */
  9. class ChangeStream extends EventEmitter {
  10. constructor(changeStreamThunk, pipeline, options) {
  11. super();
  12. this.driverChangeStream = null;
  13. this.closed = false;
  14. this.pipeline = pipeline;
  15. this.options = options;
  16. // This wrapper is necessary because of buffering.
  17. changeStreamThunk((err, driverChangeStream) => {
  18. if (err != null) {
  19. this.emit('error', err);
  20. return;
  21. }
  22. this.driverChangeStream = driverChangeStream;
  23. this._bindEvents();
  24. this.emit('ready');
  25. });
  26. }
  27. _bindEvents() {
  28. this.driverChangeStream.on('close', () => {
  29. this.closed = true;
  30. });
  31. ['close', 'change', 'end', 'error'].forEach(ev => {
  32. this.driverChangeStream.on(ev, data => this.emit(ev, data));
  33. });
  34. }
  35. _queue(cb) {
  36. this.once('ready', () => cb());
  37. }
  38. close() {
  39. this.closed = true;
  40. if (this.driverChangeStream) {
  41. this.driverChangeStream.close();
  42. }
  43. }
  44. }
  45. /*!
  46. * ignore
  47. */
  48. module.exports = ChangeStream;