polling.js 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. import { Transport } from "../transport.js";
  2. import debugModule from "debug"; // debug()
  3. import yeast from "yeast";
  4. import parseqs from "parseqs";
  5. import { encodePayload, decodePayload } from "engine.io-parser";
  6. const debug = debugModule("engine.io-client:polling"); // debug()
  7. export class Polling extends Transport {
  8. constructor() {
  9. super(...arguments);
  10. this.polling = false;
  11. }
  12. /**
  13. * Transport name.
  14. */
  15. get name() {
  16. return "polling";
  17. }
  18. /**
  19. * Opens the socket (triggers polling). We write a PING message to determine
  20. * when the transport is open.
  21. *
  22. * @api private
  23. */
  24. doOpen() {
  25. this.poll();
  26. }
  27. /**
  28. * Pauses polling.
  29. *
  30. * @param {Function} callback upon buffers are flushed and transport is paused
  31. * @api private
  32. */
  33. pause(onPause) {
  34. this.readyState = "pausing";
  35. const pause = () => {
  36. debug("paused");
  37. this.readyState = "paused";
  38. onPause();
  39. };
  40. if (this.polling || !this.writable) {
  41. let total = 0;
  42. if (this.polling) {
  43. debug("we are currently polling - waiting to pause");
  44. total++;
  45. this.once("pollComplete", function () {
  46. debug("pre-pause polling complete");
  47. --total || pause();
  48. });
  49. }
  50. if (!this.writable) {
  51. debug("we are currently writing - waiting to pause");
  52. total++;
  53. this.once("drain", function () {
  54. debug("pre-pause writing complete");
  55. --total || pause();
  56. });
  57. }
  58. }
  59. else {
  60. pause();
  61. }
  62. }
  63. /**
  64. * Starts polling cycle.
  65. *
  66. * @api public
  67. */
  68. poll() {
  69. debug("polling");
  70. this.polling = true;
  71. this.doPoll();
  72. this.emit("poll");
  73. }
  74. /**
  75. * Overloads onData to detect payloads.
  76. *
  77. * @api private
  78. */
  79. onData(data) {
  80. debug("polling got data %s", data);
  81. const callback = packet => {
  82. // if its the first message we consider the transport open
  83. if ("opening" === this.readyState && packet.type === "open") {
  84. this.onOpen();
  85. }
  86. // if its a close packet, we close the ongoing requests
  87. if ("close" === packet.type) {
  88. this.onClose();
  89. return false;
  90. }
  91. // otherwise bypass onData and handle the message
  92. this.onPacket(packet);
  93. };
  94. // decode payload
  95. decodePayload(data, this.socket.binaryType).forEach(callback);
  96. // if an event did not trigger closing
  97. if ("closed" !== this.readyState) {
  98. // if we got data we're not polling
  99. this.polling = false;
  100. this.emit("pollComplete");
  101. if ("open" === this.readyState) {
  102. this.poll();
  103. }
  104. else {
  105. debug('ignoring poll - transport state "%s"', this.readyState);
  106. }
  107. }
  108. }
  109. /**
  110. * For polling, send a close packet.
  111. *
  112. * @api private
  113. */
  114. doClose() {
  115. const close = () => {
  116. debug("writing close packet");
  117. this.write([{ type: "close" }]);
  118. };
  119. if ("open" === this.readyState) {
  120. debug("transport open - closing");
  121. close();
  122. }
  123. else {
  124. // in case we're trying to close while
  125. // handshaking is in progress (GH-164)
  126. debug("transport not open - deferring close");
  127. this.once("open", close);
  128. }
  129. }
  130. /**
  131. * Writes a packets payload.
  132. *
  133. * @param {Array} data packets
  134. * @param {Function} drain callback
  135. * @api private
  136. */
  137. write(packets) {
  138. this.writable = false;
  139. encodePayload(packets, data => {
  140. this.doWrite(data, () => {
  141. this.writable = true;
  142. this.emit("drain");
  143. });
  144. });
  145. }
  146. /**
  147. * Generates uri for connection.
  148. *
  149. * @api private
  150. */
  151. uri() {
  152. let query = this.query || {};
  153. const schema = this.opts.secure ? "https" : "http";
  154. let port = "";
  155. // cache busting is forced
  156. if (false !== this.opts.timestampRequests) {
  157. query[this.opts.timestampParam] = yeast();
  158. }
  159. if (!this.supportsBinary && !query.sid) {
  160. query.b64 = 1;
  161. }
  162. // avoid port if default for schema
  163. if (this.opts.port &&
  164. (("https" === schema && Number(this.opts.port) !== 443) ||
  165. ("http" === schema && Number(this.opts.port) !== 80))) {
  166. port = ":" + this.opts.port;
  167. }
  168. const encodedQuery = parseqs.encode(query);
  169. const ipv6 = this.opts.hostname.indexOf(":") !== -1;
  170. return (schema +
  171. "://" +
  172. (ipv6 ? "[" + this.opts.hostname + "]" : this.opts.hostname) +
  173. port +
  174. this.opts.path +
  175. (encodedQuery.length ? "?" + encodedQuery : ""));
  176. }
  177. }