ChangeStream.js 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. 'use strict';
  2. /*!
  3. * Module dependencies.
  4. */
  5. const EventEmitter = require('events').EventEmitter;
  6. /*!
  7. * ignore
  8. */
  9. class ChangeStream extends EventEmitter {
  10. constructor(changeStreamThunk, pipeline, options) {
  11. super();
  12. this.driverChangeStream = null;
  13. this.closed = false;
  14. this.bindedEvents = false;
  15. this.pipeline = pipeline;
  16. this.options = options;
  17. // This wrapper is necessary because of buffering.
  18. changeStreamThunk((err, driverChangeStream) => {
  19. if (err != null) {
  20. this.emit('error', err);
  21. return;
  22. }
  23. this.driverChangeStream = driverChangeStream;
  24. this.emit('ready');
  25. });
  26. }
  27. _bindEvents() {
  28. if (this.bindedEvents) {
  29. return;
  30. }
  31. this.bindedEvents = true;
  32. if (this.driverChangeStream == null) {
  33. this.once('ready', () => {
  34. this.driverChangeStream.on('close', () => {
  35. this.closed = true;
  36. });
  37. ['close', 'change', 'end', 'error'].forEach(ev => {
  38. this.driverChangeStream.on(ev, data => this.emit(ev, data));
  39. });
  40. });
  41. return;
  42. }
  43. this.driverChangeStream.on('close', () => {
  44. this.closed = true;
  45. });
  46. ['close', 'change', 'end', 'error'].forEach(ev => {
  47. this.driverChangeStream.on(ev, data => {
  48. this.emit(ev, data);
  49. });
  50. });
  51. }
  52. hasNext(cb) {
  53. return this.driverChangeStream.hasNext(cb);
  54. }
  55. next(cb) {
  56. return this.driverChangeStream.next(cb);
  57. }
  58. on(event, handler) {
  59. this._bindEvents();
  60. return super.on(event, handler);
  61. }
  62. once(event, handler) {
  63. this._bindEvents();
  64. return super.once(event, handler);
  65. }
  66. _queue(cb) {
  67. this.once('ready', () => cb());
  68. }
  69. close() {
  70. this.closed = true;
  71. if (this.driverChangeStream) {
  72. this.driverChangeStream.close();
  73. }
  74. }
  75. }
  76. /*!
  77. * ignore
  78. */
  79. module.exports = ChangeStream;