socket.js 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499
  1. const EventEmitter = require("events");
  2. const debug = require("debug")("engine:socket");
  3. class Socket extends EventEmitter {
  4. /**
  5. * Client class (abstract).
  6. *
  7. * @api private
  8. */
  9. constructor(id, server, transport, req) {
  10. super();
  11. this.id = id;
  12. this.server = server;
  13. this.upgrading = false;
  14. this.upgraded = false;
  15. this.readyState = "opening";
  16. this.writeBuffer = [];
  17. this.packetsFn = [];
  18. this.sentCallbackFn = [];
  19. this.cleanupFn = [];
  20. this.request = req;
  21. // Cache IP since it might not be in the req later
  22. if (req.websocket && req.websocket._socket) {
  23. this.remoteAddress = req.websocket._socket.remoteAddress;
  24. } else {
  25. this.remoteAddress = req.connection.remoteAddress;
  26. }
  27. this.checkIntervalTimer = null;
  28. this.upgradeTimeoutTimer = null;
  29. this.pingTimeoutTimer = null;
  30. this.pingIntervalTimer = null;
  31. this.setTransport(transport);
  32. this.onOpen();
  33. }
  34. /**
  35. * Called upon transport considered open.
  36. *
  37. * @api private
  38. */
  39. onOpen() {
  40. this.readyState = "open";
  41. // sends an `open` packet
  42. this.transport.sid = this.id;
  43. this.sendPacket(
  44. "open",
  45. JSON.stringify({
  46. sid: this.id,
  47. upgrades: this.getAvailableUpgrades(),
  48. pingInterval: this.server.opts.pingInterval,
  49. pingTimeout: this.server.opts.pingTimeout
  50. })
  51. );
  52. if (this.server.opts.initialPacket) {
  53. this.sendPacket("message", this.server.opts.initialPacket);
  54. }
  55. this.emit("open");
  56. this.schedulePing();
  57. }
  58. /**
  59. * Called upon transport packet.
  60. *
  61. * @param {Object} packet
  62. * @api private
  63. */
  64. onPacket(packet) {
  65. if ("open" === this.readyState) {
  66. // export packet event
  67. debug("packet");
  68. this.emit("packet", packet);
  69. // Reset ping timeout on any packet, incoming data is a good sign of
  70. // other side's liveness
  71. this.resetPingTimeout(
  72. this.server.opts.pingInterval + this.server.opts.pingTimeout
  73. );
  74. switch (packet.type) {
  75. case "pong":
  76. debug("got pong");
  77. this.schedulePing();
  78. this.emit("heartbeat");
  79. break;
  80. case "error":
  81. this.onClose("parse error");
  82. break;
  83. case "message":
  84. this.emit("data", packet.data);
  85. this.emit("message", packet.data);
  86. break;
  87. }
  88. } else {
  89. debug("packet received with closed socket");
  90. }
  91. }
  92. /**
  93. * Called upon transport error.
  94. *
  95. * @param {Error} error object
  96. * @api private
  97. */
  98. onError(err) {
  99. debug("transport error");
  100. this.onClose("transport error", err);
  101. }
  102. /**
  103. * Pings client every `this.pingInterval` and expects response
  104. * within `this.pingTimeout` or closes connection.
  105. *
  106. * @api private
  107. */
  108. schedulePing() {
  109. clearTimeout(this.pingIntervalTimer);
  110. this.pingIntervalTimer = setTimeout(() => {
  111. debug(
  112. "writing ping packet - expecting pong within %sms",
  113. this.server.opts.pingTimeout
  114. );
  115. this.sendPacket("ping");
  116. this.resetPingTimeout(this.server.opts.pingTimeout);
  117. }, this.server.opts.pingInterval);
  118. }
  119. /**
  120. * Resets ping timeout.
  121. *
  122. * @api private
  123. */
  124. resetPingTimeout(timeout) {
  125. clearTimeout(this.pingTimeoutTimer);
  126. this.pingTimeoutTimer = setTimeout(() => {
  127. if (this.readyState === "closed") return;
  128. this.onClose("ping timeout");
  129. }, timeout);
  130. }
  131. /**
  132. * Attaches handlers for the given transport.
  133. *
  134. * @param {Transport} transport
  135. * @api private
  136. */
  137. setTransport(transport) {
  138. const onError = this.onError.bind(this);
  139. const onPacket = this.onPacket.bind(this);
  140. const flush = this.flush.bind(this);
  141. const onClose = this.onClose.bind(this, "transport close");
  142. this.transport = transport;
  143. this.transport.once("error", onError);
  144. this.transport.on("packet", onPacket);
  145. this.transport.on("drain", flush);
  146. this.transport.once("close", onClose);
  147. // this function will manage packet events (also message callbacks)
  148. this.setupSendCallback();
  149. this.cleanupFn.push(function() {
  150. transport.removeListener("error", onError);
  151. transport.removeListener("packet", onPacket);
  152. transport.removeListener("drain", flush);
  153. transport.removeListener("close", onClose);
  154. });
  155. }
  156. /**
  157. * Upgrades socket to the given transport
  158. *
  159. * @param {Transport} transport
  160. * @api private
  161. */
  162. maybeUpgrade(transport) {
  163. debug(
  164. 'might upgrade socket transport from "%s" to "%s"',
  165. this.transport.name,
  166. transport.name
  167. );
  168. this.upgrading = true;
  169. const self = this;
  170. // set transport upgrade timer
  171. self.upgradeTimeoutTimer = setTimeout(function() {
  172. debug("client did not complete upgrade - closing transport");
  173. cleanup();
  174. if ("open" === transport.readyState) {
  175. transport.close();
  176. }
  177. }, this.server.opts.upgradeTimeout);
  178. function onPacket(packet) {
  179. if ("ping" === packet.type && "probe" === packet.data) {
  180. transport.send([{ type: "pong", data: "probe" }]);
  181. self.emit("upgrading", transport);
  182. clearInterval(self.checkIntervalTimer);
  183. self.checkIntervalTimer = setInterval(check, 100);
  184. } else if ("upgrade" === packet.type && self.readyState !== "closed") {
  185. debug("got upgrade packet - upgrading");
  186. cleanup();
  187. self.transport.discard();
  188. self.upgraded = true;
  189. self.clearTransport();
  190. self.setTransport(transport);
  191. self.emit("upgrade", transport);
  192. self.schedulePing();
  193. self.flush();
  194. if (self.readyState === "closing") {
  195. transport.close(function() {
  196. self.onClose("forced close");
  197. });
  198. }
  199. } else {
  200. cleanup();
  201. transport.close();
  202. }
  203. }
  204. // we force a polling cycle to ensure a fast upgrade
  205. function check() {
  206. if ("polling" === self.transport.name && self.transport.writable) {
  207. debug("writing a noop packet to polling for fast upgrade");
  208. self.transport.send([{ type: "noop" }]);
  209. }
  210. }
  211. function cleanup() {
  212. self.upgrading = false;
  213. clearInterval(self.checkIntervalTimer);
  214. self.checkIntervalTimer = null;
  215. clearTimeout(self.upgradeTimeoutTimer);
  216. self.upgradeTimeoutTimer = null;
  217. transport.removeListener("packet", onPacket);
  218. transport.removeListener("close", onTransportClose);
  219. transport.removeListener("error", onError);
  220. self.removeListener("close", onClose);
  221. }
  222. function onError(err) {
  223. debug("client did not complete upgrade - %s", err);
  224. cleanup();
  225. transport.close();
  226. transport = null;
  227. }
  228. function onTransportClose() {
  229. onError("transport closed");
  230. }
  231. function onClose() {
  232. onError("socket closed");
  233. }
  234. transport.on("packet", onPacket);
  235. transport.once("close", onTransportClose);
  236. transport.once("error", onError);
  237. self.once("close", onClose);
  238. }
  239. /**
  240. * Clears listeners and timers associated with current transport.
  241. *
  242. * @api private
  243. */
  244. clearTransport() {
  245. let cleanup;
  246. const toCleanUp = this.cleanupFn.length;
  247. for (let i = 0; i < toCleanUp; i++) {
  248. cleanup = this.cleanupFn.shift();
  249. cleanup();
  250. }
  251. // silence further transport errors and prevent uncaught exceptions
  252. this.transport.on("error", function() {
  253. debug("error triggered by discarded transport");
  254. });
  255. // ensure transport won't stay open
  256. this.transport.close();
  257. clearTimeout(this.pingTimeoutTimer);
  258. }
  259. /**
  260. * Called upon transport considered closed.
  261. * Possible reasons: `ping timeout`, `client error`, `parse error`,
  262. * `transport error`, `server close`, `transport close`
  263. */
  264. onClose(reason, description) {
  265. if ("closed" !== this.readyState) {
  266. this.readyState = "closed";
  267. // clear timers
  268. clearTimeout(this.pingIntervalTimer);
  269. clearTimeout(this.pingTimeoutTimer);
  270. clearInterval(this.checkIntervalTimer);
  271. this.checkIntervalTimer = null;
  272. clearTimeout(this.upgradeTimeoutTimer);
  273. const self = this;
  274. // clean writeBuffer in next tick, so developers can still
  275. // grab the writeBuffer on 'close' event
  276. process.nextTick(function() {
  277. self.writeBuffer = [];
  278. });
  279. this.packetsFn = [];
  280. this.sentCallbackFn = [];
  281. this.clearTransport();
  282. this.emit("close", reason, description);
  283. }
  284. }
  285. /**
  286. * Setup and manage send callback
  287. *
  288. * @api private
  289. */
  290. setupSendCallback() {
  291. const self = this;
  292. this.transport.on("drain", onDrain);
  293. this.cleanupFn.push(function() {
  294. self.transport.removeListener("drain", onDrain);
  295. });
  296. // the message was sent successfully, execute the callback
  297. function onDrain() {
  298. if (self.sentCallbackFn.length > 0) {
  299. const seqFn = self.sentCallbackFn.splice(0, 1)[0];
  300. if ("function" === typeof seqFn) {
  301. debug("executing send callback");
  302. seqFn(self.transport);
  303. } else if (Array.isArray(seqFn)) {
  304. debug("executing batch send callback");
  305. const l = seqFn.length;
  306. let i = 0;
  307. for (; i < l; i++) {
  308. if ("function" === typeof seqFn[i]) {
  309. seqFn[i](self.transport);
  310. }
  311. }
  312. }
  313. }
  314. }
  315. }
  316. /**
  317. * Sends a message packet.
  318. *
  319. * @param {String} message
  320. * @param {Object} options
  321. * @param {Function} callback
  322. * @return {Socket} for chaining
  323. * @api public
  324. */
  325. send(data, options, callback) {
  326. this.sendPacket("message", data, options, callback);
  327. return this;
  328. }
  329. write(data, options, callback) {
  330. this.sendPacket("message", data, options, callback);
  331. return this;
  332. }
  333. /**
  334. * Sends a packet.
  335. *
  336. * @param {String} packet type
  337. * @param {String} optional, data
  338. * @param {Object} options
  339. * @api private
  340. */
  341. sendPacket(type, data, options, callback) {
  342. if ("function" === typeof options) {
  343. callback = options;
  344. options = null;
  345. }
  346. options = options || {};
  347. options.compress = false !== options.compress;
  348. if ("closing" !== this.readyState && "closed" !== this.readyState) {
  349. debug('sending packet "%s" (%s)', type, data);
  350. const packet = {
  351. type: type,
  352. options: options
  353. };
  354. if (data) packet.data = data;
  355. // exports packetCreate event
  356. this.emit("packetCreate", packet);
  357. this.writeBuffer.push(packet);
  358. // add send callback to object, if defined
  359. if (callback) this.packetsFn.push(callback);
  360. this.flush();
  361. }
  362. }
  363. /**
  364. * Attempts to flush the packets buffer.
  365. *
  366. * @api private
  367. */
  368. flush() {
  369. if (
  370. "closed" !== this.readyState &&
  371. this.transport.writable &&
  372. this.writeBuffer.length
  373. ) {
  374. debug("flushing buffer to transport");
  375. this.emit("flush", this.writeBuffer);
  376. this.server.emit("flush", this, this.writeBuffer);
  377. const wbuf = this.writeBuffer;
  378. this.writeBuffer = [];
  379. if (!this.transport.supportsFraming) {
  380. this.sentCallbackFn.push(this.packetsFn);
  381. } else {
  382. this.sentCallbackFn.push.apply(this.sentCallbackFn, this.packetsFn);
  383. }
  384. this.packetsFn = [];
  385. this.transport.send(wbuf);
  386. this.emit("drain");
  387. this.server.emit("drain", this);
  388. }
  389. }
  390. /**
  391. * Get available upgrades for this socket.
  392. *
  393. * @api private
  394. */
  395. getAvailableUpgrades() {
  396. const availableUpgrades = [];
  397. const allUpgrades = this.server.upgrades(this.transport.name);
  398. let i = 0;
  399. const l = allUpgrades.length;
  400. for (; i < l; ++i) {
  401. const upg = allUpgrades[i];
  402. if (this.server.opts.transports.indexOf(upg) !== -1) {
  403. availableUpgrades.push(upg);
  404. }
  405. }
  406. return availableUpgrades;
  407. }
  408. /**
  409. * Closes the socket and underlying transport.
  410. *
  411. * @param {Boolean} optional, discard
  412. * @return {Socket} for chaining
  413. * @api public
  414. */
  415. close(discard) {
  416. if ("open" !== this.readyState) return;
  417. this.readyState = "closing";
  418. if (this.writeBuffer.length) {
  419. this.once("drain", this.closeTransport.bind(this, discard));
  420. return;
  421. }
  422. this.closeTransport(discard);
  423. }
  424. /**
  425. * Closes the underlying transport.
  426. *
  427. * @param {Boolean} discard
  428. * @api private
  429. */
  430. closeTransport(discard) {
  431. if (discard) this.transport.discard();
  432. this.transport.close(this.onClose.bind(this, "forced close"));
  433. }
  434. }
  435. module.exports = Socket;