import { EventEmitter } from "events"; export class Adapter extends EventEmitter { /** * In-memory adapter constructor. * * @param {Namespace} nsp */ constructor(nsp) { super(); this.nsp = nsp; this.rooms = new Map(); this.sids = new Map(); this.encoder = nsp.server.encoder; } /** * To be overridden */ init() { } /** * To be overridden */ close() { } /** * Adds a socket to a list of room. * * @param {SocketId} id the socket id * @param {Set} rooms a set of rooms * @public */ addAll(id, rooms) { if (!this.sids.has(id)) { this.sids.set(id, new Set()); } for (const room of rooms) { this.sids.get(id).add(room); if (!this.rooms.has(room)) { this.rooms.set(room, new Set()); this.emit("create-room", room); } if (!this.rooms.get(room).has(id)) { this.rooms.get(room).add(id); this.emit("join-room", room, id); } } } /** * Removes a socket from a room. * * @param {SocketId} id the socket id * @param {Room} room the room name */ del(id, room) { if (this.sids.has(id)) { this.sids.get(id).delete(room); } this._del(room, id); } _del(room, id) { const _room = this.rooms.get(room); if (_room != null) { const deleted = _room.delete(id); if (deleted) { this.emit("leave-room", room, id); } if (_room.size === 0 && this.rooms.delete(room)) { this.emit("delete-room", room); } } } /** * Removes a socket from all rooms it's joined. * * @param {SocketId} id the socket id */ delAll(id) { if (!this.sids.has(id)) { return; } for (const room of this.sids.get(id)) { this._del(room, id); } this.sids.delete(id); } /** * Broadcasts a packet. * * Options: * - `flags` {Object} flags for this packet * - `except` {Array} sids that should be excluded * - `rooms` {Array} list of rooms to broadcast to * * @param {Object} packet the packet object * @param {Object} opts the options * @public */ broadcast(packet, opts) { const flags = opts.flags || {}; const basePacketOpts = { preEncoded: true, volatile: flags.volatile, compress: flags.compress }; packet.nsp = this.nsp.name; const encodedPackets = this.encoder.encode(packet); const packetOpts = encodedPackets.map(encodedPacket => { if (typeof encodedPacket === "string") { return { ...basePacketOpts, wsPreEncoded: "4" + encodedPacket // "4" being the "message" packet type in Engine.IO }; } else { return basePacketOpts; } }); this.apply(opts, socket => { for (let i = 0; i < encodedPackets.length; i++) { socket.client.writeToEngine(encodedPackets[i], packetOpts[i]); } }); } /** * Gets a list of sockets by sid. * * @param {Set} rooms the explicit set of rooms to check. */ sockets(rooms) { const sids = new Set(); this.apply({ rooms }, socket => { sids.add(socket.id); }); return Promise.resolve(sids); } /** * Gets the list of rooms a given socket has joined. * * @param {SocketId} id the socket id */ socketRooms(id) { return this.sids.get(id); } /** * Returns the matching socket instances * * @param opts - the filters to apply */ fetchSockets(opts) { const sockets = []; this.apply(opts, socket => { sockets.push(socket); }); return Promise.resolve(sockets); } /** * Makes the matching socket instances join the specified rooms * * @param opts - the filters to apply * @param rooms - the rooms to join */ addSockets(opts, rooms) { this.apply(opts, socket => { socket.join(rooms); }); } /** * Makes the matching socket instances leave the specified rooms * * @param opts - the filters to apply * @param rooms - the rooms to leave */ delSockets(opts, rooms) { this.apply(opts, socket => { rooms.forEach(room => socket.leave(room)); }); } /** * Makes the matching socket instances disconnect * * @param opts - the filters to apply * @param close - whether to close the underlying connection */ disconnectSockets(opts, close) { this.apply(opts, socket => { socket.disconnect(close); }); } apply(opts, callback) { const rooms = opts.rooms; const except = this.computeExceptSids(opts.except); if (rooms.size) { const ids = new Set(); for (const room of rooms) { if (!this.rooms.has(room)) continue; for (const id of this.rooms.get(room)) { if (ids.has(id) || except.has(id)) continue; const socket = this.nsp.sockets.get(id); if (socket) { callback(socket); ids.add(id); } } } } else { for (const [id] of this.sids) { if (except.has(id)) continue; const socket = this.nsp.sockets.get(id); if (socket) callback(socket); } } } computeExceptSids(exceptRooms) { const exceptSids = new Set(); if (exceptRooms && exceptRooms.size > 0) { for (const room of exceptRooms) { if (this.rooms.has(room)) { this.rooms.get(room).forEach(sid => exceptSids.add(sid)); } } } return exceptSids; } /** * Send a packet to the other Socket.IO servers in the cluster * @param packet - an array of arguments, which may include an acknowledgement callback at the end */ serverSideEmit(packet) { throw new Error("this adapter does not support the serverSideEmit() functionality"); } }