socket.js 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560
  1. import { transports } from "./transports/index.js";
  2. import { installTimerFunctions } from "./util.js";
  3. import parseqs from "parseqs";
  4. import parseuri from "parseuri";
  5. import { Emitter } from "@socket.io/component-emitter";
  6. import { protocol } from "engine.io-parser";
  7. export class Socket extends Emitter {
  8. /**
  9. * Socket constructor.
  10. *
  11. * @param {String|Object} uri or options
  12. * @param {Object} opts - options
  13. * @api public
  14. */
  15. constructor(uri, opts = {}) {
  16. super();
  17. if (uri && "object" === typeof uri) {
  18. opts = uri;
  19. uri = null;
  20. }
  21. if (uri) {
  22. uri = parseuri(uri);
  23. opts.hostname = uri.host;
  24. opts.secure = uri.protocol === "https" || uri.protocol === "wss";
  25. opts.port = uri.port;
  26. if (uri.query)
  27. opts.query = uri.query;
  28. }
  29. else if (opts.host) {
  30. opts.hostname = parseuri(opts.host).host;
  31. }
  32. installTimerFunctions(this, opts);
  33. this.secure =
  34. null != opts.secure
  35. ? opts.secure
  36. : typeof location !== "undefined" && "https:" === location.protocol;
  37. if (opts.hostname && !opts.port) {
  38. // if no port is specified manually, use the protocol default
  39. opts.port = this.secure ? "443" : "80";
  40. }
  41. this.hostname =
  42. opts.hostname ||
  43. (typeof location !== "undefined" ? location.hostname : "localhost");
  44. this.port =
  45. opts.port ||
  46. (typeof location !== "undefined" && location.port
  47. ? location.port
  48. : this.secure
  49. ? "443"
  50. : "80");
  51. this.transports = opts.transports || ["polling", "websocket"];
  52. this.readyState = "";
  53. this.writeBuffer = [];
  54. this.prevBufferLen = 0;
  55. this.opts = Object.assign({
  56. path: "/engine.io",
  57. agent: false,
  58. withCredentials: false,
  59. upgrade: true,
  60. timestampParam: "t",
  61. rememberUpgrade: false,
  62. rejectUnauthorized: true,
  63. perMessageDeflate: {
  64. threshold: 1024
  65. },
  66. transportOptions: {},
  67. closeOnBeforeunload: true
  68. }, opts);
  69. this.opts.path = this.opts.path.replace(/\/$/, "") + "/";
  70. if (typeof this.opts.query === "string") {
  71. this.opts.query = parseqs.decode(this.opts.query);
  72. }
  73. // set on handshake
  74. this.id = null;
  75. this.upgrades = null;
  76. this.pingInterval = null;
  77. this.pingTimeout = null;
  78. // set on heartbeat
  79. this.pingTimeoutTimer = null;
  80. if (typeof addEventListener === "function") {
  81. if (this.opts.closeOnBeforeunload) {
  82. // Firefox closes the connection when the "beforeunload" event is emitted but not Chrome. This event listener
  83. // ensures every browser behaves the same (no "disconnect" event at the Socket.IO level when the page is
  84. // closed/reloaded)
  85. addEventListener("beforeunload", () => {
  86. if (this.transport) {
  87. // silently close the transport
  88. this.transport.removeAllListeners();
  89. this.transport.close();
  90. }
  91. }, false);
  92. }
  93. if (this.hostname !== "localhost") {
  94. this.offlineEventListener = () => {
  95. this.onClose("transport close");
  96. };
  97. addEventListener("offline", this.offlineEventListener, false);
  98. }
  99. }
  100. this.open();
  101. }
  102. /**
  103. * Creates transport of the given type.
  104. *
  105. * @param {String} transport name
  106. * @return {Transport}
  107. * @api private
  108. */
  109. createTransport(name) {
  110. const query = clone(this.opts.query);
  111. // append engine.io protocol identifier
  112. query.EIO = protocol;
  113. // transport name
  114. query.transport = name;
  115. // session id if we already have one
  116. if (this.id)
  117. query.sid = this.id;
  118. const opts = Object.assign({}, this.opts.transportOptions[name], this.opts, {
  119. query,
  120. socket: this,
  121. hostname: this.hostname,
  122. secure: this.secure,
  123. port: this.port
  124. });
  125. return new transports[name](opts);
  126. }
  127. /**
  128. * Initializes transport to use and starts probe.
  129. *
  130. * @api private
  131. */
  132. open() {
  133. let transport;
  134. if (this.opts.rememberUpgrade &&
  135. Socket.priorWebsocketSuccess &&
  136. this.transports.indexOf("websocket") !== -1) {
  137. transport = "websocket";
  138. }
  139. else if (0 === this.transports.length) {
  140. // Emit error on next tick so it can be listened to
  141. this.setTimeoutFn(() => {
  142. this.emitReserved("error", "No transports available");
  143. }, 0);
  144. return;
  145. }
  146. else {
  147. transport = this.transports[0];
  148. }
  149. this.readyState = "opening";
  150. // Retry with the next transport if the transport is disabled (jsonp: false)
  151. try {
  152. transport = this.createTransport(transport);
  153. }
  154. catch (e) {
  155. this.transports.shift();
  156. this.open();
  157. return;
  158. }
  159. transport.open();
  160. this.setTransport(transport);
  161. }
  162. /**
  163. * Sets the current transport. Disables the existing one (if any).
  164. *
  165. * @api private
  166. */
  167. setTransport(transport) {
  168. if (this.transport) {
  169. this.transport.removeAllListeners();
  170. }
  171. // set up transport
  172. this.transport = transport;
  173. // set up transport listeners
  174. transport
  175. .on("drain", this.onDrain.bind(this))
  176. .on("packet", this.onPacket.bind(this))
  177. .on("error", this.onError.bind(this))
  178. .on("close", () => {
  179. this.onClose("transport close");
  180. });
  181. }
  182. /**
  183. * Probes a transport.
  184. *
  185. * @param {String} transport name
  186. * @api private
  187. */
  188. probe(name) {
  189. let transport = this.createTransport(name);
  190. let failed = false;
  191. Socket.priorWebsocketSuccess = false;
  192. const onTransportOpen = () => {
  193. if (failed)
  194. return;
  195. transport.send([{ type: "ping", data: "probe" }]);
  196. transport.once("packet", msg => {
  197. if (failed)
  198. return;
  199. if ("pong" === msg.type && "probe" === msg.data) {
  200. this.upgrading = true;
  201. this.emitReserved("upgrading", transport);
  202. if (!transport)
  203. return;
  204. Socket.priorWebsocketSuccess = "websocket" === transport.name;
  205. this.transport.pause(() => {
  206. if (failed)
  207. return;
  208. if ("closed" === this.readyState)
  209. return;
  210. cleanup();
  211. this.setTransport(transport);
  212. transport.send([{ type: "upgrade" }]);
  213. this.emitReserved("upgrade", transport);
  214. transport = null;
  215. this.upgrading = false;
  216. this.flush();
  217. });
  218. }
  219. else {
  220. const err = new Error("probe error");
  221. // @ts-ignore
  222. err.transport = transport.name;
  223. this.emitReserved("upgradeError", err);
  224. }
  225. });
  226. };
  227. function freezeTransport() {
  228. if (failed)
  229. return;
  230. // Any callback called by transport should be ignored since now
  231. failed = true;
  232. cleanup();
  233. transport.close();
  234. transport = null;
  235. }
  236. // Handle any error that happens while probing
  237. const onerror = err => {
  238. const error = new Error("probe error: " + err);
  239. // @ts-ignore
  240. error.transport = transport.name;
  241. freezeTransport();
  242. this.emitReserved("upgradeError", error);
  243. };
  244. function onTransportClose() {
  245. onerror("transport closed");
  246. }
  247. // When the socket is closed while we're probing
  248. function onclose() {
  249. onerror("socket closed");
  250. }
  251. // When the socket is upgraded while we're probing
  252. function onupgrade(to) {
  253. if (transport && to.name !== transport.name) {
  254. freezeTransport();
  255. }
  256. }
  257. // Remove all listeners on the transport and on self
  258. const cleanup = () => {
  259. transport.removeListener("open", onTransportOpen);
  260. transport.removeListener("error", onerror);
  261. transport.removeListener("close", onTransportClose);
  262. this.off("close", onclose);
  263. this.off("upgrading", onupgrade);
  264. };
  265. transport.once("open", onTransportOpen);
  266. transport.once("error", onerror);
  267. transport.once("close", onTransportClose);
  268. this.once("close", onclose);
  269. this.once("upgrading", onupgrade);
  270. transport.open();
  271. }
  272. /**
  273. * Called when connection is deemed open.
  274. *
  275. * @api private
  276. */
  277. onOpen() {
  278. this.readyState = "open";
  279. Socket.priorWebsocketSuccess = "websocket" === this.transport.name;
  280. this.emitReserved("open");
  281. this.flush();
  282. // we check for `readyState` in case an `open`
  283. // listener already closed the socket
  284. if ("open" === this.readyState &&
  285. this.opts.upgrade &&
  286. this.transport.pause) {
  287. let i = 0;
  288. const l = this.upgrades.length;
  289. for (; i < l; i++) {
  290. this.probe(this.upgrades[i]);
  291. }
  292. }
  293. }
  294. /**
  295. * Handles a packet.
  296. *
  297. * @api private
  298. */
  299. onPacket(packet) {
  300. if ("opening" === this.readyState ||
  301. "open" === this.readyState ||
  302. "closing" === this.readyState) {
  303. this.emitReserved("packet", packet);
  304. // Socket is live - any packet counts
  305. this.emitReserved("heartbeat");
  306. switch (packet.type) {
  307. case "open":
  308. this.onHandshake(JSON.parse(packet.data));
  309. break;
  310. case "ping":
  311. this.resetPingTimeout();
  312. this.sendPacket("pong");
  313. this.emitReserved("ping");
  314. this.emitReserved("pong");
  315. break;
  316. case "error":
  317. const err = new Error("server error");
  318. // @ts-ignore
  319. err.code = packet.data;
  320. this.onError(err);
  321. break;
  322. case "message":
  323. this.emitReserved("data", packet.data);
  324. this.emitReserved("message", packet.data);
  325. break;
  326. }
  327. }
  328. else {
  329. }
  330. }
  331. /**
  332. * Called upon handshake completion.
  333. *
  334. * @param {Object} data - handshake obj
  335. * @api private
  336. */
  337. onHandshake(data) {
  338. this.emitReserved("handshake", data);
  339. this.id = data.sid;
  340. this.transport.query.sid = data.sid;
  341. this.upgrades = this.filterUpgrades(data.upgrades);
  342. this.pingInterval = data.pingInterval;
  343. this.pingTimeout = data.pingTimeout;
  344. this.onOpen();
  345. // In case open handler closes socket
  346. if ("closed" === this.readyState)
  347. return;
  348. this.resetPingTimeout();
  349. }
  350. /**
  351. * Sets and resets ping timeout timer based on server pings.
  352. *
  353. * @api private
  354. */
  355. resetPingTimeout() {
  356. this.clearTimeoutFn(this.pingTimeoutTimer);
  357. this.pingTimeoutTimer = this.setTimeoutFn(() => {
  358. this.onClose("ping timeout");
  359. }, this.pingInterval + this.pingTimeout);
  360. if (this.opts.autoUnref) {
  361. this.pingTimeoutTimer.unref();
  362. }
  363. }
  364. /**
  365. * Called on `drain` event
  366. *
  367. * @api private
  368. */
  369. onDrain() {
  370. this.writeBuffer.splice(0, this.prevBufferLen);
  371. // setting prevBufferLen = 0 is very important
  372. // for example, when upgrading, upgrade packet is sent over,
  373. // and a nonzero prevBufferLen could cause problems on `drain`
  374. this.prevBufferLen = 0;
  375. if (0 === this.writeBuffer.length) {
  376. this.emitReserved("drain");
  377. }
  378. else {
  379. this.flush();
  380. }
  381. }
  382. /**
  383. * Flush write buffers.
  384. *
  385. * @api private
  386. */
  387. flush() {
  388. if ("closed" !== this.readyState &&
  389. this.transport.writable &&
  390. !this.upgrading &&
  391. this.writeBuffer.length) {
  392. this.transport.send(this.writeBuffer);
  393. // keep track of current length of writeBuffer
  394. // splice writeBuffer and callbackBuffer on `drain`
  395. this.prevBufferLen = this.writeBuffer.length;
  396. this.emitReserved("flush");
  397. }
  398. }
  399. /**
  400. * Sends a message.
  401. *
  402. * @param {String} message.
  403. * @param {Function} callback function.
  404. * @param {Object} options.
  405. * @return {Socket} for chaining.
  406. * @api public
  407. */
  408. write(msg, options, fn) {
  409. this.sendPacket("message", msg, options, fn);
  410. return this;
  411. }
  412. send(msg, options, fn) {
  413. this.sendPacket("message", msg, options, fn);
  414. return this;
  415. }
  416. /**
  417. * Sends a packet.
  418. *
  419. * @param {String} packet type.
  420. * @param {String} data.
  421. * @param {Object} options.
  422. * @param {Function} callback function.
  423. * @api private
  424. */
  425. sendPacket(type, data, options, fn) {
  426. if ("function" === typeof data) {
  427. fn = data;
  428. data = undefined;
  429. }
  430. if ("function" === typeof options) {
  431. fn = options;
  432. options = null;
  433. }
  434. if ("closing" === this.readyState || "closed" === this.readyState) {
  435. return;
  436. }
  437. options = options || {};
  438. options.compress = false !== options.compress;
  439. const packet = {
  440. type: type,
  441. data: data,
  442. options: options
  443. };
  444. this.emitReserved("packetCreate", packet);
  445. this.writeBuffer.push(packet);
  446. if (fn)
  447. this.once("flush", fn);
  448. this.flush();
  449. }
  450. /**
  451. * Closes the connection.
  452. *
  453. * @api public
  454. */
  455. close() {
  456. const close = () => {
  457. this.onClose("forced close");
  458. this.transport.close();
  459. };
  460. const cleanupAndClose = () => {
  461. this.off("upgrade", cleanupAndClose);
  462. this.off("upgradeError", cleanupAndClose);
  463. close();
  464. };
  465. const waitForUpgrade = () => {
  466. // wait for upgrade to finish since we can't send packets while pausing a transport
  467. this.once("upgrade", cleanupAndClose);
  468. this.once("upgradeError", cleanupAndClose);
  469. };
  470. if ("opening" === this.readyState || "open" === this.readyState) {
  471. this.readyState = "closing";
  472. if (this.writeBuffer.length) {
  473. this.once("drain", () => {
  474. if (this.upgrading) {
  475. waitForUpgrade();
  476. }
  477. else {
  478. close();
  479. }
  480. });
  481. }
  482. else if (this.upgrading) {
  483. waitForUpgrade();
  484. }
  485. else {
  486. close();
  487. }
  488. }
  489. return this;
  490. }
  491. /**
  492. * Called upon transport error
  493. *
  494. * @api private
  495. */
  496. onError(err) {
  497. Socket.priorWebsocketSuccess = false;
  498. this.emitReserved("error", err);
  499. this.onClose("transport error", err);
  500. }
  501. /**
  502. * Called upon transport close.
  503. *
  504. * @api private
  505. */
  506. onClose(reason, desc) {
  507. if ("opening" === this.readyState ||
  508. "open" === this.readyState ||
  509. "closing" === this.readyState) {
  510. // clear timers
  511. this.clearTimeoutFn(this.pingTimeoutTimer);
  512. // stop event from firing again for transport
  513. this.transport.removeAllListeners("close");
  514. // ensure transport won't stay open
  515. this.transport.close();
  516. // ignore further transport communication
  517. this.transport.removeAllListeners();
  518. if (typeof removeEventListener === "function") {
  519. removeEventListener("offline", this.offlineEventListener, false);
  520. }
  521. // set ready state
  522. this.readyState = "closed";
  523. // clear session id
  524. this.id = null;
  525. // emit close event
  526. this.emitReserved("close", reason, desc);
  527. // clean buffers after, so users can still
  528. // grab the buffers on `close` event
  529. this.writeBuffer = [];
  530. this.prevBufferLen = 0;
  531. }
  532. }
  533. /**
  534. * Filters upgrades, returning only those matching client transports.
  535. *
  536. * @param {Array} server upgrades
  537. * @api private
  538. *
  539. */
  540. filterUpgrades(upgrades) {
  541. const filteredUpgrades = [];
  542. let i = 0;
  543. const j = upgrades.length;
  544. for (; i < j; i++) {
  545. if (~this.transports.indexOf(upgrades[i]))
  546. filteredUpgrades.push(upgrades[i]);
  547. }
  548. return filteredUpgrades;
  549. }
  550. }
  551. Socket.protocol = protocol;
  552. function clone(obj) {
  553. const o = {};
  554. for (let i in obj) {
  555. if (obj.hasOwnProperty(i)) {
  556. o[i] = obj[i];
  557. }
  558. }
  559. return o;
  560. }