server.js 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585
  1. const qs = require("querystring");
  2. const parse = require("url").parse;
  3. const base64id = require("base64id");
  4. const transports = require("./transports");
  5. const EventEmitter = require("events").EventEmitter;
  6. const Socket = require("./socket");
  7. const debug = require("debug")("engine");
  8. const cookieMod = require("cookie");
  9. class Server extends EventEmitter {
  10. /**
  11. * Server constructor.
  12. *
  13. * @param {Object} options
  14. * @api public
  15. */
  16. constructor(opts = {}) {
  17. super();
  18. this.clients = {};
  19. this.clientsCount = 0;
  20. this.opts = Object.assign(
  21. {
  22. wsEngine: process.env.EIO_WS_ENGINE || "ws",
  23. pingTimeout: 5000,
  24. pingInterval: 25000,
  25. upgradeTimeout: 10000,
  26. maxHttpBufferSize: 1e6,
  27. transports: Object.keys(transports),
  28. allowUpgrades: true,
  29. httpCompression: {
  30. threshold: 1024
  31. },
  32. cors: false
  33. },
  34. opts
  35. );
  36. if (opts.cookie) {
  37. this.opts.cookie = Object.assign(
  38. {
  39. name: "io",
  40. path: "/",
  41. httpOnly: opts.cookie.path !== false,
  42. sameSite: "lax"
  43. },
  44. opts.cookie
  45. );
  46. }
  47. if (this.opts.cors) {
  48. this.corsMiddleware = require("cors")(this.opts.cors);
  49. }
  50. if (opts.perMessageDeflate) {
  51. this.opts.perMessageDeflate = Object.assign(
  52. {
  53. threshold: 1024
  54. },
  55. opts.perMessageDeflate
  56. );
  57. }
  58. this.init();
  59. }
  60. /**
  61. * Initialize websocket server
  62. *
  63. * @api private
  64. */
  65. init() {
  66. if (!~this.opts.transports.indexOf("websocket")) return;
  67. if (this.ws) this.ws.close();
  68. // add explicit require for bundlers like webpack
  69. const wsModule =
  70. this.opts.wsEngine === "ws" ? require("ws") : require(this.opts.wsEngine);
  71. this.ws = new wsModule.Server({
  72. noServer: true,
  73. clientTracking: false,
  74. perMessageDeflate: this.opts.perMessageDeflate,
  75. maxPayload: this.opts.maxHttpBufferSize
  76. });
  77. }
  78. /**
  79. * Returns a list of available transports for upgrade given a certain transport.
  80. *
  81. * @return {Array}
  82. * @api public
  83. */
  84. upgrades(transport) {
  85. if (!this.opts.allowUpgrades) return [];
  86. return transports[transport].upgradesTo || [];
  87. }
  88. /**
  89. * Verifies a request.
  90. *
  91. * @param {http.IncomingMessage}
  92. * @return {Boolean} whether the request is valid
  93. * @api private
  94. */
  95. verify(req, upgrade, fn) {
  96. // transport check
  97. const transport = req._query.transport;
  98. if (!~this.opts.transports.indexOf(transport)) {
  99. debug('unknown transport "%s"', transport);
  100. return fn(Server.errors.UNKNOWN_TRANSPORT, false);
  101. }
  102. // 'Origin' header check
  103. const isOriginInvalid = checkInvalidHeaderChar(req.headers.origin);
  104. if (isOriginInvalid) {
  105. req.headers.origin = null;
  106. debug("origin header invalid");
  107. return fn(Server.errors.BAD_REQUEST, false);
  108. }
  109. // sid check
  110. const sid = req._query.sid;
  111. if (sid) {
  112. if (!this.clients.hasOwnProperty(sid)) {
  113. debug('unknown sid "%s"', sid);
  114. return fn(Server.errors.UNKNOWN_SID, false);
  115. }
  116. if (!upgrade && this.clients[sid].transport.name !== transport) {
  117. debug("bad request: unexpected transport without upgrade");
  118. return fn(Server.errors.BAD_REQUEST, false);
  119. }
  120. } else {
  121. // handshake is GET only
  122. if ("GET" !== req.method)
  123. return fn(Server.errors.BAD_HANDSHAKE_METHOD, false);
  124. if (!this.opts.allowRequest) return fn(null, true);
  125. return this.opts.allowRequest(req, fn);
  126. }
  127. fn(null, true);
  128. }
  129. /**
  130. * Prepares a request by processing the query string.
  131. *
  132. * @api private
  133. */
  134. prepare(req) {
  135. // try to leverage pre-existing `req._query` (e.g: from connect)
  136. if (!req._query) {
  137. req._query = ~req.url.indexOf("?") ? qs.parse(parse(req.url).query) : {};
  138. }
  139. }
  140. /**
  141. * Closes all clients.
  142. *
  143. * @api public
  144. */
  145. close() {
  146. debug("closing all open clients");
  147. for (let i in this.clients) {
  148. if (this.clients.hasOwnProperty(i)) {
  149. this.clients[i].close(true);
  150. }
  151. }
  152. if (this.ws) {
  153. debug("closing webSocketServer");
  154. this.ws.close();
  155. // don't delete this.ws because it can be used again if the http server starts listening again
  156. }
  157. return this;
  158. }
  159. /**
  160. * Handles an Engine.IO HTTP request.
  161. *
  162. * @param {http.IncomingMessage} request
  163. * @param {http.ServerResponse|http.OutgoingMessage} response
  164. * @api public
  165. */
  166. handleRequest(req, res) {
  167. debug('handling "%s" http request "%s"', req.method, req.url);
  168. this.prepare(req);
  169. req.res = res;
  170. const callback = (err, success) => {
  171. if (!success) {
  172. sendErrorMessage(req, res, err);
  173. return;
  174. }
  175. if (req._query.sid) {
  176. debug("setting new request for existing client");
  177. this.clients[req._query.sid].transport.onRequest(req);
  178. } else {
  179. this.handshake(req._query.transport, req);
  180. }
  181. };
  182. if (this.corsMiddleware) {
  183. this.corsMiddleware.call(null, req, res, () => {
  184. this.verify(req, false, callback);
  185. });
  186. } else {
  187. this.verify(req, false, callback);
  188. }
  189. }
  190. /**
  191. * generate a socket id.
  192. * Overwrite this method to generate your custom socket id
  193. *
  194. * @param {Object} request object
  195. * @api public
  196. */
  197. generateId(req) {
  198. return base64id.generateId();
  199. }
  200. /**
  201. * Handshakes a new client.
  202. *
  203. * @param {String} transport name
  204. * @param {Object} request object
  205. * @api private
  206. */
  207. async handshake(transportName, req) {
  208. let id;
  209. try {
  210. id = await this.generateId(req);
  211. } catch (e) {
  212. debug("error while generating an id");
  213. sendErrorMessage(req, req.res, Server.errors.BAD_REQUEST);
  214. return;
  215. }
  216. debug('handshaking client "%s"', id);
  217. try {
  218. var transport = new transports[transportName](req);
  219. if ("polling" === transportName) {
  220. transport.maxHttpBufferSize = this.opts.maxHttpBufferSize;
  221. transport.httpCompression = this.opts.httpCompression;
  222. } else if ("websocket" === transportName) {
  223. transport.perMessageDeflate = this.opts.perMessageDeflate;
  224. }
  225. if (req._query && req._query.b64) {
  226. transport.supportsBinary = false;
  227. } else {
  228. transport.supportsBinary = true;
  229. }
  230. } catch (e) {
  231. debug('error handshaking to transport "%s"', transportName);
  232. sendErrorMessage(req, req.res, Server.errors.BAD_REQUEST);
  233. return;
  234. }
  235. const socket = new Socket(id, this, transport, req);
  236. const self = this;
  237. if (this.opts.cookie) {
  238. transport.on("headers", headers => {
  239. headers["Set-Cookie"] = cookieMod.serialize(
  240. this.opts.cookie.name,
  241. id,
  242. this.opts.cookie
  243. );
  244. });
  245. }
  246. transport.onRequest(req);
  247. this.clients[id] = socket;
  248. this.clientsCount++;
  249. socket.once("close", function() {
  250. delete self.clients[id];
  251. self.clientsCount--;
  252. });
  253. this.emit("connection", socket);
  254. }
  255. /**
  256. * Handles an Engine.IO HTTP Upgrade.
  257. *
  258. * @api public
  259. */
  260. handleUpgrade(req, socket, upgradeHead) {
  261. this.prepare(req);
  262. const self = this;
  263. this.verify(req, true, function(err, success) {
  264. if (!success) {
  265. abortConnection(socket, err);
  266. return;
  267. }
  268. const head = Buffer.from(upgradeHead); // eslint-disable-line node/no-deprecated-api
  269. upgradeHead = null;
  270. // delegate to ws
  271. self.ws.handleUpgrade(req, socket, head, function(conn) {
  272. self.onWebSocket(req, conn);
  273. });
  274. });
  275. }
  276. /**
  277. * Called upon a ws.io connection.
  278. *
  279. * @param {ws.Socket} websocket
  280. * @api private
  281. */
  282. onWebSocket(req, socket) {
  283. socket.on("error", onUpgradeError);
  284. if (
  285. transports[req._query.transport] !== undefined &&
  286. !transports[req._query.transport].prototype.handlesUpgrades
  287. ) {
  288. debug("transport doesnt handle upgraded requests");
  289. socket.close();
  290. return;
  291. }
  292. // get client id
  293. const id = req._query.sid;
  294. // keep a reference to the ws.Socket
  295. req.websocket = socket;
  296. if (id) {
  297. const client = this.clients[id];
  298. if (!client) {
  299. debug("upgrade attempt for closed client");
  300. socket.close();
  301. } else if (client.upgrading) {
  302. debug("transport has already been trying to upgrade");
  303. socket.close();
  304. } else if (client.upgraded) {
  305. debug("transport had already been upgraded");
  306. socket.close();
  307. } else {
  308. debug("upgrading existing transport");
  309. // transport error handling takes over
  310. socket.removeListener("error", onUpgradeError);
  311. const transport = new transports[req._query.transport](req);
  312. if (req._query && req._query.b64) {
  313. transport.supportsBinary = false;
  314. } else {
  315. transport.supportsBinary = true;
  316. }
  317. transport.perMessageDeflate = this.perMessageDeflate;
  318. client.maybeUpgrade(transport);
  319. }
  320. } else {
  321. // transport error handling takes over
  322. socket.removeListener("error", onUpgradeError);
  323. this.handshake(req._query.transport, req);
  324. }
  325. function onUpgradeError() {
  326. debug("websocket error before upgrade");
  327. // socket.close() not needed
  328. }
  329. }
  330. /**
  331. * Captures upgrade requests for a http.Server.
  332. *
  333. * @param {http.Server} server
  334. * @param {Object} options
  335. * @api public
  336. */
  337. attach(server, options) {
  338. const self = this;
  339. options = options || {};
  340. let path = (options.path || "/engine.io").replace(/\/$/, "");
  341. const destroyUpgradeTimeout = options.destroyUpgradeTimeout || 1000;
  342. // normalize path
  343. path += "/";
  344. function check(req) {
  345. return path === req.url.substr(0, path.length);
  346. }
  347. // cache and clean up listeners
  348. const listeners = server.listeners("request").slice(0);
  349. server.removeAllListeners("request");
  350. server.on("close", self.close.bind(self));
  351. server.on("listening", self.init.bind(self));
  352. // add request handler
  353. server.on("request", function(req, res) {
  354. if (check(req)) {
  355. debug('intercepting request for path "%s"', path);
  356. self.handleRequest(req, res);
  357. } else {
  358. let i = 0;
  359. const l = listeners.length;
  360. for (; i < l; i++) {
  361. listeners[i].call(server, req, res);
  362. }
  363. }
  364. });
  365. if (~self.opts.transports.indexOf("websocket")) {
  366. server.on("upgrade", function(req, socket, head) {
  367. if (check(req)) {
  368. self.handleUpgrade(req, socket, head);
  369. } else if (false !== options.destroyUpgrade) {
  370. // default node behavior is to disconnect when no handlers
  371. // but by adding a handler, we prevent that
  372. // and if no eio thing handles the upgrade
  373. // then the socket needs to die!
  374. setTimeout(function() {
  375. if (socket.writable && socket.bytesWritten <= 0) {
  376. return socket.end();
  377. }
  378. }, destroyUpgradeTimeout);
  379. }
  380. });
  381. }
  382. }
  383. }
  384. /**
  385. * Protocol errors mappings.
  386. */
  387. Server.errors = {
  388. UNKNOWN_TRANSPORT: 0,
  389. UNKNOWN_SID: 1,
  390. BAD_HANDSHAKE_METHOD: 2,
  391. BAD_REQUEST: 3,
  392. FORBIDDEN: 4
  393. };
  394. Server.errorMessages = {
  395. 0: "Transport unknown",
  396. 1: "Session ID unknown",
  397. 2: "Bad handshake method",
  398. 3: "Bad request",
  399. 4: "Forbidden"
  400. };
  401. /**
  402. * Sends an Engine.IO Error Message
  403. *
  404. * @param {http.ServerResponse} response
  405. * @param {code} error code
  406. * @api private
  407. */
  408. function sendErrorMessage(req, res, code) {
  409. const headers = { "Content-Type": "application/json" };
  410. const isForbidden = !Server.errorMessages.hasOwnProperty(code);
  411. if (isForbidden) {
  412. res.writeHead(403, headers);
  413. res.end(
  414. JSON.stringify({
  415. code: Server.errors.FORBIDDEN,
  416. message: code || Server.errorMessages[Server.errors.FORBIDDEN]
  417. })
  418. );
  419. return;
  420. }
  421. if (res !== undefined) {
  422. res.writeHead(400, headers);
  423. res.end(
  424. JSON.stringify({
  425. code: code,
  426. message: Server.errorMessages[code]
  427. })
  428. );
  429. }
  430. }
  431. /**
  432. * Closes the connection
  433. *
  434. * @param {net.Socket} socket
  435. * @param {code} error code
  436. * @api private
  437. */
  438. function abortConnection(socket, code) {
  439. socket.on("error", () => {
  440. debug("ignoring error from closed connection");
  441. });
  442. if (socket.writable) {
  443. const message = Server.errorMessages.hasOwnProperty(code)
  444. ? Server.errorMessages[code]
  445. : String(code || "");
  446. const length = Buffer.byteLength(message);
  447. socket.write(
  448. "HTTP/1.1 400 Bad Request\r\n" +
  449. "Connection: close\r\n" +
  450. "Content-type: text/html\r\n" +
  451. "Content-Length: " +
  452. length +
  453. "\r\n" +
  454. "\r\n" +
  455. message
  456. );
  457. }
  458. socket.destroy();
  459. }
  460. module.exports = Server;
  461. /* eslint-disable */
  462. /**
  463. * From https://github.com/nodejs/node/blob/v8.4.0/lib/_http_common.js#L303-L354
  464. *
  465. * True if val contains an invalid field-vchar
  466. * field-value = *( field-content / obs-fold )
  467. * field-content = field-vchar [ 1*( SP / HTAB ) field-vchar ]
  468. * field-vchar = VCHAR / obs-text
  469. *
  470. * checkInvalidHeaderChar() is currently designed to be inlinable by v8,
  471. * so take care when making changes to the implementation so that the source
  472. * code size does not exceed v8's default max_inlined_source_size setting.
  473. **/
  474. // prettier-ignore
  475. const validHdrChars = [
  476. 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, // 0 - 15
  477. 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, // 16 - 31
  478. 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, // 32 - 47
  479. 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, // 48 - 63
  480. 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, // 64 - 79
  481. 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, // 80 - 95
  482. 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, // 96 - 111
  483. 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, // 112 - 127
  484. 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, // 128 ...
  485. 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
  486. 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
  487. 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
  488. 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
  489. 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
  490. 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
  491. 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 // ... 255
  492. ]
  493. function checkInvalidHeaderChar(val) {
  494. val += "";
  495. if (val.length < 1) return false;
  496. if (!validHdrChars[val.charCodeAt(0)]) {
  497. debug('invalid header, index 0, char "%s"', val.charCodeAt(0));
  498. return true;
  499. }
  500. if (val.length < 2) return false;
  501. if (!validHdrChars[val.charCodeAt(1)]) {
  502. debug('invalid header, index 1, char "%s"', val.charCodeAt(1));
  503. return true;
  504. }
  505. if (val.length < 3) return false;
  506. if (!validHdrChars[val.charCodeAt(2)]) {
  507. debug('invalid header, index 2, char "%s"', val.charCodeAt(2));
  508. return true;
  509. }
  510. if (val.length < 4) return false;
  511. if (!validHdrChars[val.charCodeAt(3)]) {
  512. debug('invalid header, index 3, char "%s"', val.charCodeAt(3));
  513. return true;
  514. }
  515. for (let i = 4; i < val.length; ++i) {
  516. if (!validHdrChars[val.charCodeAt(i)]) {
  517. debug('invalid header, index "%i", char "%s"', i, val.charCodeAt(i));
  518. return true;
  519. }
  520. }
  521. return false;
  522. }