import { Transport } from "../transport.js"; import yeast from "yeast"; import parseqs from "parseqs"; import { encodePayload, decodePayload } from "engine.io-parser"; export class Polling extends Transport { constructor() { super(...arguments); this.polling = false; } /** * Transport name. */ get name() { return "polling"; } /** * Opens the socket (triggers polling). We write a PING message to determine * when the transport is open. * * @api private */ doOpen() { this.poll(); } /** * Pauses polling. * * @param {Function} callback upon buffers are flushed and transport is paused * @api private */ pause(onPause) { this.readyState = "pausing"; const pause = () => { this.readyState = "paused"; onPause(); }; if (this.polling || !this.writable) { let total = 0; if (this.polling) { total++; this.once("pollComplete", function () { --total || pause(); }); } if (!this.writable) { total++; this.once("drain", function () { --total || pause(); }); } } else { pause(); } } /** * Starts polling cycle. * * @api public */ poll() { this.polling = true; this.doPoll(); this.emit("poll"); } /** * Overloads onData to detect payloads. * * @api private */ onData(data) { const callback = packet => { // if its the first message we consider the transport open if ("opening" === this.readyState && packet.type === "open") { this.onOpen(); } // if its a close packet, we close the ongoing requests if ("close" === packet.type) { this.onClose(); return false; } // otherwise bypass onData and handle the message this.onPacket(packet); }; // decode payload decodePayload(data, this.socket.binaryType).forEach(callback); // if an event did not trigger closing if ("closed" !== this.readyState) { // if we got data we're not polling this.polling = false; this.emit("pollComplete"); if ("open" === this.readyState) { this.poll(); } else { } } } /** * For polling, send a close packet. * * @api private */ doClose() { const close = () => { this.write([{ type: "close" }]); }; if ("open" === this.readyState) { close(); } else { // in case we're trying to close while // handshaking is in progress (GH-164) this.once("open", close); } } /** * Writes a packets payload. * * @param {Array} data packets * @param {Function} drain callback * @api private */ write(packets) { this.writable = false; encodePayload(packets, data => { this.doWrite(data, () => { this.writable = true; this.emit("drain"); }); }); } /** * Generates uri for connection. * * @api private */ uri() { let query = this.query || {}; const schema = this.opts.secure ? "https" : "http"; let port = ""; // cache busting is forced if (false !== this.opts.timestampRequests) { query[this.opts.timestampParam] = yeast(); } if (!this.supportsBinary && !query.sid) { query.b64 = 1; } // avoid port if default for schema if (this.opts.port && (("https" === schema && Number(this.opts.port) !== 443) || ("http" === schema && Number(this.opts.port) !== 80))) { port = ":" + this.opts.port; } const encodedQuery = parseqs.encode(query); const ipv6 = this.opts.hostname.indexOf(":") !== -1; return (schema + "://" + (ipv6 ? "[" + this.opts.hostname + "]" : this.opts.hostname) + port + this.opts.path + (encodedQuery.length ? "?" + encodedQuery : "")); } }