socket.js 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583
  1. import { transports } from "./transports/index.js";
  2. import { installTimerFunctions } from "./util.js";
  3. import parseqs from "parseqs";
  4. import parseuri from "parseuri";
  5. import debugModule from "debug"; // debug()
  6. import { Emitter } from "@socket.io/component-emitter";
  7. import { protocol } from "engine.io-parser";
  8. const debug = debugModule("engine.io-client:socket"); // debug()
  9. export class Socket extends Emitter {
  10. /**
  11. * Socket constructor.
  12. *
  13. * @param {String|Object} uri or options
  14. * @param {Object} opts - options
  15. * @api public
  16. */
  17. constructor(uri, opts = {}) {
  18. super();
  19. if (uri && "object" === typeof uri) {
  20. opts = uri;
  21. uri = null;
  22. }
  23. if (uri) {
  24. uri = parseuri(uri);
  25. opts.hostname = uri.host;
  26. opts.secure = uri.protocol === "https" || uri.protocol === "wss";
  27. opts.port = uri.port;
  28. if (uri.query)
  29. opts.query = uri.query;
  30. }
  31. else if (opts.host) {
  32. opts.hostname = parseuri(opts.host).host;
  33. }
  34. installTimerFunctions(this, opts);
  35. this.secure =
  36. null != opts.secure
  37. ? opts.secure
  38. : typeof location !== "undefined" && "https:" === location.protocol;
  39. if (opts.hostname && !opts.port) {
  40. // if no port is specified manually, use the protocol default
  41. opts.port = this.secure ? "443" : "80";
  42. }
  43. this.hostname =
  44. opts.hostname ||
  45. (typeof location !== "undefined" ? location.hostname : "localhost");
  46. this.port =
  47. opts.port ||
  48. (typeof location !== "undefined" && location.port
  49. ? location.port
  50. : this.secure
  51. ? "443"
  52. : "80");
  53. this.transports = opts.transports || ["polling", "websocket"];
  54. this.readyState = "";
  55. this.writeBuffer = [];
  56. this.prevBufferLen = 0;
  57. this.opts = Object.assign({
  58. path: "/engine.io",
  59. agent: false,
  60. withCredentials: false,
  61. upgrade: true,
  62. timestampParam: "t",
  63. rememberUpgrade: false,
  64. rejectUnauthorized: true,
  65. perMessageDeflate: {
  66. threshold: 1024
  67. },
  68. transportOptions: {},
  69. closeOnBeforeunload: true
  70. }, opts);
  71. this.opts.path = this.opts.path.replace(/\/$/, "") + "/";
  72. if (typeof this.opts.query === "string") {
  73. this.opts.query = parseqs.decode(this.opts.query);
  74. }
  75. // set on handshake
  76. this.id = null;
  77. this.upgrades = null;
  78. this.pingInterval = null;
  79. this.pingTimeout = null;
  80. // set on heartbeat
  81. this.pingTimeoutTimer = null;
  82. if (typeof addEventListener === "function") {
  83. if (this.opts.closeOnBeforeunload) {
  84. // Firefox closes the connection when the "beforeunload" event is emitted but not Chrome. This event listener
  85. // ensures every browser behaves the same (no "disconnect" event at the Socket.IO level when the page is
  86. // closed/reloaded)
  87. addEventListener("beforeunload", () => {
  88. if (this.transport) {
  89. // silently close the transport
  90. this.transport.removeAllListeners();
  91. this.transport.close();
  92. }
  93. }, false);
  94. }
  95. if (this.hostname !== "localhost") {
  96. this.offlineEventListener = () => {
  97. this.onClose("transport close");
  98. };
  99. addEventListener("offline", this.offlineEventListener, false);
  100. }
  101. }
  102. this.open();
  103. }
  104. /**
  105. * Creates transport of the given type.
  106. *
  107. * @param {String} transport name
  108. * @return {Transport}
  109. * @api private
  110. */
  111. createTransport(name) {
  112. debug('creating transport "%s"', name);
  113. const query = clone(this.opts.query);
  114. // append engine.io protocol identifier
  115. query.EIO = protocol;
  116. // transport name
  117. query.transport = name;
  118. // session id if we already have one
  119. if (this.id)
  120. query.sid = this.id;
  121. const opts = Object.assign({}, this.opts.transportOptions[name], this.opts, {
  122. query,
  123. socket: this,
  124. hostname: this.hostname,
  125. secure: this.secure,
  126. port: this.port
  127. });
  128. debug("options: %j", opts);
  129. return new transports[name](opts);
  130. }
  131. /**
  132. * Initializes transport to use and starts probe.
  133. *
  134. * @api private
  135. */
  136. open() {
  137. let transport;
  138. if (this.opts.rememberUpgrade &&
  139. Socket.priorWebsocketSuccess &&
  140. this.transports.indexOf("websocket") !== -1) {
  141. transport = "websocket";
  142. }
  143. else if (0 === this.transports.length) {
  144. // Emit error on next tick so it can be listened to
  145. this.setTimeoutFn(() => {
  146. this.emitReserved("error", "No transports available");
  147. }, 0);
  148. return;
  149. }
  150. else {
  151. transport = this.transports[0];
  152. }
  153. this.readyState = "opening";
  154. // Retry with the next transport if the transport is disabled (jsonp: false)
  155. try {
  156. transport = this.createTransport(transport);
  157. }
  158. catch (e) {
  159. debug("error while creating transport: %s", e);
  160. this.transports.shift();
  161. this.open();
  162. return;
  163. }
  164. transport.open();
  165. this.setTransport(transport);
  166. }
  167. /**
  168. * Sets the current transport. Disables the existing one (if any).
  169. *
  170. * @api private
  171. */
  172. setTransport(transport) {
  173. debug("setting transport %s", transport.name);
  174. if (this.transport) {
  175. debug("clearing existing transport %s", this.transport.name);
  176. this.transport.removeAllListeners();
  177. }
  178. // set up transport
  179. this.transport = transport;
  180. // set up transport listeners
  181. transport
  182. .on("drain", this.onDrain.bind(this))
  183. .on("packet", this.onPacket.bind(this))
  184. .on("error", this.onError.bind(this))
  185. .on("close", () => {
  186. this.onClose("transport close");
  187. });
  188. }
  189. /**
  190. * Probes a transport.
  191. *
  192. * @param {String} transport name
  193. * @api private
  194. */
  195. probe(name) {
  196. debug('probing transport "%s"', name);
  197. let transport = this.createTransport(name);
  198. let failed = false;
  199. Socket.priorWebsocketSuccess = false;
  200. const onTransportOpen = () => {
  201. if (failed)
  202. return;
  203. debug('probe transport "%s" opened', name);
  204. transport.send([{ type: "ping", data: "probe" }]);
  205. transport.once("packet", msg => {
  206. if (failed)
  207. return;
  208. if ("pong" === msg.type && "probe" === msg.data) {
  209. debug('probe transport "%s" pong', name);
  210. this.upgrading = true;
  211. this.emitReserved("upgrading", transport);
  212. if (!transport)
  213. return;
  214. Socket.priorWebsocketSuccess = "websocket" === transport.name;
  215. debug('pausing current transport "%s"', this.transport.name);
  216. this.transport.pause(() => {
  217. if (failed)
  218. return;
  219. if ("closed" === this.readyState)
  220. return;
  221. debug("changing transport and sending upgrade packet");
  222. cleanup();
  223. this.setTransport(transport);
  224. transport.send([{ type: "upgrade" }]);
  225. this.emitReserved("upgrade", transport);
  226. transport = null;
  227. this.upgrading = false;
  228. this.flush();
  229. });
  230. }
  231. else {
  232. debug('probe transport "%s" failed', name);
  233. const err = new Error("probe error");
  234. // @ts-ignore
  235. err.transport = transport.name;
  236. this.emitReserved("upgradeError", err);
  237. }
  238. });
  239. };
  240. function freezeTransport() {
  241. if (failed)
  242. return;
  243. // Any callback called by transport should be ignored since now
  244. failed = true;
  245. cleanup();
  246. transport.close();
  247. transport = null;
  248. }
  249. // Handle any error that happens while probing
  250. const onerror = err => {
  251. const error = new Error("probe error: " + err);
  252. // @ts-ignore
  253. error.transport = transport.name;
  254. freezeTransport();
  255. debug('probe transport "%s" failed because of error: %s', name, err);
  256. this.emitReserved("upgradeError", error);
  257. };
  258. function onTransportClose() {
  259. onerror("transport closed");
  260. }
  261. // When the socket is closed while we're probing
  262. function onclose() {
  263. onerror("socket closed");
  264. }
  265. // When the socket is upgraded while we're probing
  266. function onupgrade(to) {
  267. if (transport && to.name !== transport.name) {
  268. debug('"%s" works - aborting "%s"', to.name, transport.name);
  269. freezeTransport();
  270. }
  271. }
  272. // Remove all listeners on the transport and on self
  273. const cleanup = () => {
  274. transport.removeListener("open", onTransportOpen);
  275. transport.removeListener("error", onerror);
  276. transport.removeListener("close", onTransportClose);
  277. this.off("close", onclose);
  278. this.off("upgrading", onupgrade);
  279. };
  280. transport.once("open", onTransportOpen);
  281. transport.once("error", onerror);
  282. transport.once("close", onTransportClose);
  283. this.once("close", onclose);
  284. this.once("upgrading", onupgrade);
  285. transport.open();
  286. }
  287. /**
  288. * Called when connection is deemed open.
  289. *
  290. * @api private
  291. */
  292. onOpen() {
  293. debug("socket open");
  294. this.readyState = "open";
  295. Socket.priorWebsocketSuccess = "websocket" === this.transport.name;
  296. this.emitReserved("open");
  297. this.flush();
  298. // we check for `readyState` in case an `open`
  299. // listener already closed the socket
  300. if ("open" === this.readyState &&
  301. this.opts.upgrade &&
  302. this.transport.pause) {
  303. debug("starting upgrade probes");
  304. let i = 0;
  305. const l = this.upgrades.length;
  306. for (; i < l; i++) {
  307. this.probe(this.upgrades[i]);
  308. }
  309. }
  310. }
  311. /**
  312. * Handles a packet.
  313. *
  314. * @api private
  315. */
  316. onPacket(packet) {
  317. if ("opening" === this.readyState ||
  318. "open" === this.readyState ||
  319. "closing" === this.readyState) {
  320. debug('socket receive: type "%s", data "%s"', packet.type, packet.data);
  321. this.emitReserved("packet", packet);
  322. // Socket is live - any packet counts
  323. this.emitReserved("heartbeat");
  324. switch (packet.type) {
  325. case "open":
  326. this.onHandshake(JSON.parse(packet.data));
  327. break;
  328. case "ping":
  329. this.resetPingTimeout();
  330. this.sendPacket("pong");
  331. this.emitReserved("ping");
  332. this.emitReserved("pong");
  333. break;
  334. case "error":
  335. const err = new Error("server error");
  336. // @ts-ignore
  337. err.code = packet.data;
  338. this.onError(err);
  339. break;
  340. case "message":
  341. this.emitReserved("data", packet.data);
  342. this.emitReserved("message", packet.data);
  343. break;
  344. }
  345. }
  346. else {
  347. debug('packet received with socket readyState "%s"', this.readyState);
  348. }
  349. }
  350. /**
  351. * Called upon handshake completion.
  352. *
  353. * @param {Object} data - handshake obj
  354. * @api private
  355. */
  356. onHandshake(data) {
  357. this.emitReserved("handshake", data);
  358. this.id = data.sid;
  359. this.transport.query.sid = data.sid;
  360. this.upgrades = this.filterUpgrades(data.upgrades);
  361. this.pingInterval = data.pingInterval;
  362. this.pingTimeout = data.pingTimeout;
  363. this.onOpen();
  364. // In case open handler closes socket
  365. if ("closed" === this.readyState)
  366. return;
  367. this.resetPingTimeout();
  368. }
  369. /**
  370. * Sets and resets ping timeout timer based on server pings.
  371. *
  372. * @api private
  373. */
  374. resetPingTimeout() {
  375. this.clearTimeoutFn(this.pingTimeoutTimer);
  376. this.pingTimeoutTimer = this.setTimeoutFn(() => {
  377. this.onClose("ping timeout");
  378. }, this.pingInterval + this.pingTimeout);
  379. if (this.opts.autoUnref) {
  380. this.pingTimeoutTimer.unref();
  381. }
  382. }
  383. /**
  384. * Called on `drain` event
  385. *
  386. * @api private
  387. */
  388. onDrain() {
  389. this.writeBuffer.splice(0, this.prevBufferLen);
  390. // setting prevBufferLen = 0 is very important
  391. // for example, when upgrading, upgrade packet is sent over,
  392. // and a nonzero prevBufferLen could cause problems on `drain`
  393. this.prevBufferLen = 0;
  394. if (0 === this.writeBuffer.length) {
  395. this.emitReserved("drain");
  396. }
  397. else {
  398. this.flush();
  399. }
  400. }
  401. /**
  402. * Flush write buffers.
  403. *
  404. * @api private
  405. */
  406. flush() {
  407. if ("closed" !== this.readyState &&
  408. this.transport.writable &&
  409. !this.upgrading &&
  410. this.writeBuffer.length) {
  411. debug("flushing %d packets in socket", this.writeBuffer.length);
  412. this.transport.send(this.writeBuffer);
  413. // keep track of current length of writeBuffer
  414. // splice writeBuffer and callbackBuffer on `drain`
  415. this.prevBufferLen = this.writeBuffer.length;
  416. this.emitReserved("flush");
  417. }
  418. }
  419. /**
  420. * Sends a message.
  421. *
  422. * @param {String} message.
  423. * @param {Function} callback function.
  424. * @param {Object} options.
  425. * @return {Socket} for chaining.
  426. * @api public
  427. */
  428. write(msg, options, fn) {
  429. this.sendPacket("message", msg, options, fn);
  430. return this;
  431. }
  432. send(msg, options, fn) {
  433. this.sendPacket("message", msg, options, fn);
  434. return this;
  435. }
  436. /**
  437. * Sends a packet.
  438. *
  439. * @param {String} packet type.
  440. * @param {String} data.
  441. * @param {Object} options.
  442. * @param {Function} callback function.
  443. * @api private
  444. */
  445. sendPacket(type, data, options, fn) {
  446. if ("function" === typeof data) {
  447. fn = data;
  448. data = undefined;
  449. }
  450. if ("function" === typeof options) {
  451. fn = options;
  452. options = null;
  453. }
  454. if ("closing" === this.readyState || "closed" === this.readyState) {
  455. return;
  456. }
  457. options = options || {};
  458. options.compress = false !== options.compress;
  459. const packet = {
  460. type: type,
  461. data: data,
  462. options: options
  463. };
  464. this.emitReserved("packetCreate", packet);
  465. this.writeBuffer.push(packet);
  466. if (fn)
  467. this.once("flush", fn);
  468. this.flush();
  469. }
  470. /**
  471. * Closes the connection.
  472. *
  473. * @api public
  474. */
  475. close() {
  476. const close = () => {
  477. this.onClose("forced close");
  478. debug("socket closing - telling transport to close");
  479. this.transport.close();
  480. };
  481. const cleanupAndClose = () => {
  482. this.off("upgrade", cleanupAndClose);
  483. this.off("upgradeError", cleanupAndClose);
  484. close();
  485. };
  486. const waitForUpgrade = () => {
  487. // wait for upgrade to finish since we can't send packets while pausing a transport
  488. this.once("upgrade", cleanupAndClose);
  489. this.once("upgradeError", cleanupAndClose);
  490. };
  491. if ("opening" === this.readyState || "open" === this.readyState) {
  492. this.readyState = "closing";
  493. if (this.writeBuffer.length) {
  494. this.once("drain", () => {
  495. if (this.upgrading) {
  496. waitForUpgrade();
  497. }
  498. else {
  499. close();
  500. }
  501. });
  502. }
  503. else if (this.upgrading) {
  504. waitForUpgrade();
  505. }
  506. else {
  507. close();
  508. }
  509. }
  510. return this;
  511. }
  512. /**
  513. * Called upon transport error
  514. *
  515. * @api private
  516. */
  517. onError(err) {
  518. debug("socket error %j", err);
  519. Socket.priorWebsocketSuccess = false;
  520. this.emitReserved("error", err);
  521. this.onClose("transport error", err);
  522. }
  523. /**
  524. * Called upon transport close.
  525. *
  526. * @api private
  527. */
  528. onClose(reason, desc) {
  529. if ("opening" === this.readyState ||
  530. "open" === this.readyState ||
  531. "closing" === this.readyState) {
  532. debug('socket close with reason: "%s"', reason);
  533. // clear timers
  534. this.clearTimeoutFn(this.pingTimeoutTimer);
  535. // stop event from firing again for transport
  536. this.transport.removeAllListeners("close");
  537. // ensure transport won't stay open
  538. this.transport.close();
  539. // ignore further transport communication
  540. this.transport.removeAllListeners();
  541. if (typeof removeEventListener === "function") {
  542. removeEventListener("offline", this.offlineEventListener, false);
  543. }
  544. // set ready state
  545. this.readyState = "closed";
  546. // clear session id
  547. this.id = null;
  548. // emit close event
  549. this.emitReserved("close", reason, desc);
  550. // clean buffers after, so users can still
  551. // grab the buffers on `close` event
  552. this.writeBuffer = [];
  553. this.prevBufferLen = 0;
  554. }
  555. }
  556. /**
  557. * Filters upgrades, returning only those matching client transports.
  558. *
  559. * @param {Array} server upgrades
  560. * @api private
  561. *
  562. */
  563. filterUpgrades(upgrades) {
  564. const filteredUpgrades = [];
  565. let i = 0;
  566. const j = upgrades.length;
  567. for (; i < j; i++) {
  568. if (~this.transports.indexOf(upgrades[i]))
  569. filteredUpgrades.push(upgrades[i]);
  570. }
  571. return filteredUpgrades;
  572. }
  573. }
  574. Socket.protocol = protocol;
  575. function clone(obj) {
  576. const o = {};
  577. for (let i in obj) {
  578. if (obj.hasOwnProperty(i)) {
  579. o[i] = obj[i];
  580. }
  581. }
  582. return o;
  583. }