binlog_dump.js 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. 'use strict';
  2. const Command = require('./command');
  3. const Packets = require('../packets');
  4. const eventParsers = [];
  5. class BinlogEventHeader {
  6. constructor(packet) {
  7. this.timestamp = packet.readInt32();
  8. this.eventType = packet.readInt8();
  9. this.serverId = packet.readInt32();
  10. this.eventSize = packet.readInt32();
  11. this.logPos = packet.readInt32();
  12. this.flags = packet.readInt16();
  13. }
  14. }
  15. class BinlogDump extends Command {
  16. constructor(opts) {
  17. super();
  18. // this.onResult = callback;
  19. this.opts = opts;
  20. }
  21. start(packet, connection) {
  22. const newPacket = new Packets.BinlogDump(this.opts);
  23. connection.writePacket(newPacket.toPacket(1));
  24. return BinlogDump.prototype.binlogData;
  25. }
  26. binlogData(packet) {
  27. // ok - continue consuming events
  28. // error - error
  29. // eof - end of binlog
  30. if (packet.isEOF()) {
  31. this.emit('eof');
  32. return null;
  33. }
  34. // binlog event header
  35. packet.readInt8();
  36. const header = new BinlogEventHeader(packet);
  37. const EventParser = eventParsers[header.eventType];
  38. let event;
  39. if (EventParser) {
  40. event = new EventParser(packet);
  41. } else {
  42. event = {
  43. name: 'UNKNOWN'
  44. };
  45. }
  46. event.header = header;
  47. this.emit('event', event);
  48. return BinlogDump.prototype.binlogData;
  49. }
  50. }
  51. class RotateEvent {
  52. constructor(packet) {
  53. this.pposition = packet.readInt32();
  54. // TODO: read uint64 here
  55. packet.readInt32(); // positionDword2
  56. this.nextBinlog = packet.readString();
  57. this.name = 'RotateEvent';
  58. }
  59. }
  60. class FormatDescriptionEvent {
  61. constructor(packet) {
  62. this.binlogVersion = packet.readInt16();
  63. this.serverVersion = packet.readString(50).replace(/\u0000.*/, ''); // eslint-disable-line no-control-regex
  64. this.createTimestamp = packet.readInt32();
  65. this.eventHeaderLength = packet.readInt8(); // should be 19
  66. this.eventsLength = packet.readBuffer();
  67. this.name = 'FormatDescriptionEvent';
  68. }
  69. }
  70. class QueryEvent {
  71. constructor(packet) {
  72. const parseStatusVars = require('../packets/binlog_query_statusvars.js');
  73. this.slaveProxyId = packet.readInt32();
  74. this.executionTime = packet.readInt32();
  75. const schemaLength = packet.readInt8();
  76. this.errorCode = packet.readInt16();
  77. const statusVarsLength = packet.readInt16();
  78. const statusVars = packet.readBuffer(statusVarsLength);
  79. this.schema = packet.readString(schemaLength);
  80. packet.readInt8(); // should be zero
  81. this.statusVars = parseStatusVars(statusVars);
  82. this.query = packet.readString();
  83. this.name = 'QueryEvent';
  84. }
  85. }
  86. class XidEvent {
  87. constructor(packet) {
  88. this.binlogVersion = packet.readInt16();
  89. this.xid = packet.readInt64();
  90. this.name = 'XidEvent';
  91. }
  92. }
  93. eventParsers[2] = QueryEvent;
  94. eventParsers[4] = RotateEvent;
  95. eventParsers[15] = FormatDescriptionEvent;
  96. eventParsers[16] = XidEvent;
  97. module.exports = BinlogDump;