receiver.js 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612
  1. 'use strict';
  2. const { Writable } = require('stream');
  3. const PerMessageDeflate = require('./permessage-deflate');
  4. const {
  5. BINARY_TYPES,
  6. EMPTY_BUFFER,
  7. kStatusCode,
  8. kWebSocket
  9. } = require('./constants');
  10. const { concat, toArrayBuffer, unmask } = require('./buffer-util');
  11. const { isValidStatusCode, isValidUTF8 } = require('./validation');
  12. const GET_INFO = 0;
  13. const GET_PAYLOAD_LENGTH_16 = 1;
  14. const GET_PAYLOAD_LENGTH_64 = 2;
  15. const GET_MASK = 3;
  16. const GET_DATA = 4;
  17. const INFLATING = 5;
  18. /**
  19. * HyBi Receiver implementation.
  20. *
  21. * @extends Writable
  22. */
  23. class Receiver extends Writable {
  24. /**
  25. * Creates a Receiver instance.
  26. *
  27. * @param {Object} [options] Options object
  28. * @param {String} [options.binaryType=nodebuffer] The type for binary data
  29. * @param {Object} [options.extensions] An object containing the negotiated
  30. * extensions
  31. * @param {Boolean} [options.isServer=false] Specifies whether to operate in
  32. * client or server mode
  33. * @param {Number} [options.maxPayload=0] The maximum allowed message length
  34. * @param {Boolean} [options.skipUTF8Validation=false] Specifies whether or
  35. * not to skip UTF-8 validation for text and close messages
  36. */
  37. constructor(options = {}) {
  38. super();
  39. this._binaryType = options.binaryType || BINARY_TYPES[0];
  40. this._extensions = options.extensions || {};
  41. this._isServer = !!options.isServer;
  42. this._maxPayload = options.maxPayload | 0;
  43. this._skipUTF8Validation = !!options.skipUTF8Validation;
  44. this[kWebSocket] = undefined;
  45. this._bufferedBytes = 0;
  46. this._buffers = [];
  47. this._compressed = false;
  48. this._payloadLength = 0;
  49. this._mask = undefined;
  50. this._fragmented = 0;
  51. this._masked = false;
  52. this._fin = false;
  53. this._opcode = 0;
  54. this._totalPayloadLength = 0;
  55. this._messageLength = 0;
  56. this._fragments = [];
  57. this._state = GET_INFO;
  58. this._loop = false;
  59. }
  60. /**
  61. * Implements `Writable.prototype._write()`.
  62. *
  63. * @param {Buffer} chunk The chunk of data to write
  64. * @param {String} encoding The character encoding of `chunk`
  65. * @param {Function} cb Callback
  66. * @private
  67. */
  68. _write(chunk, encoding, cb) {
  69. if (this._opcode === 0x08 && this._state == GET_INFO) return cb();
  70. this._bufferedBytes += chunk.length;
  71. this._buffers.push(chunk);
  72. this.startLoop(cb);
  73. }
  74. /**
  75. * Consumes `n` bytes from the buffered data.
  76. *
  77. * @param {Number} n The number of bytes to consume
  78. * @return {Buffer} The consumed bytes
  79. * @private
  80. */
  81. consume(n) {
  82. this._bufferedBytes -= n;
  83. if (n === this._buffers[0].length) return this._buffers.shift();
  84. if (n < this._buffers[0].length) {
  85. const buf = this._buffers[0];
  86. this._buffers[0] = buf.slice(n);
  87. return buf.slice(0, n);
  88. }
  89. const dst = Buffer.allocUnsafe(n);
  90. do {
  91. const buf = this._buffers[0];
  92. const offset = dst.length - n;
  93. if (n >= buf.length) {
  94. dst.set(this._buffers.shift(), offset);
  95. } else {
  96. dst.set(new Uint8Array(buf.buffer, buf.byteOffset, n), offset);
  97. this._buffers[0] = buf.slice(n);
  98. }
  99. n -= buf.length;
  100. } while (n > 0);
  101. return dst;
  102. }
  103. /**
  104. * Starts the parsing loop.
  105. *
  106. * @param {Function} cb Callback
  107. * @private
  108. */
  109. startLoop(cb) {
  110. let err;
  111. this._loop = true;
  112. do {
  113. switch (this._state) {
  114. case GET_INFO:
  115. err = this.getInfo();
  116. break;
  117. case GET_PAYLOAD_LENGTH_16:
  118. err = this.getPayloadLength16();
  119. break;
  120. case GET_PAYLOAD_LENGTH_64:
  121. err = this.getPayloadLength64();
  122. break;
  123. case GET_MASK:
  124. this.getMask();
  125. break;
  126. case GET_DATA:
  127. err = this.getData(cb);
  128. break;
  129. default:
  130. // `INFLATING`
  131. this._loop = false;
  132. return;
  133. }
  134. } while (this._loop);
  135. cb(err);
  136. }
  137. /**
  138. * Reads the first two bytes of a frame.
  139. *
  140. * @return {(RangeError|undefined)} A possible error
  141. * @private
  142. */
  143. getInfo() {
  144. if (this._bufferedBytes < 2) {
  145. this._loop = false;
  146. return;
  147. }
  148. const buf = this.consume(2);
  149. if ((buf[0] & 0x30) !== 0x00) {
  150. this._loop = false;
  151. return error(
  152. RangeError,
  153. 'RSV2 and RSV3 must be clear',
  154. true,
  155. 1002,
  156. 'WS_ERR_UNEXPECTED_RSV_2_3'
  157. );
  158. }
  159. const compressed = (buf[0] & 0x40) === 0x40;
  160. if (compressed && !this._extensions[PerMessageDeflate.extensionName]) {
  161. this._loop = false;
  162. return error(
  163. RangeError,
  164. 'RSV1 must be clear',
  165. true,
  166. 1002,
  167. 'WS_ERR_UNEXPECTED_RSV_1'
  168. );
  169. }
  170. this._fin = (buf[0] & 0x80) === 0x80;
  171. this._opcode = buf[0] & 0x0f;
  172. this._payloadLength = buf[1] & 0x7f;
  173. if (this._opcode === 0x00) {
  174. if (compressed) {
  175. this._loop = false;
  176. return error(
  177. RangeError,
  178. 'RSV1 must be clear',
  179. true,
  180. 1002,
  181. 'WS_ERR_UNEXPECTED_RSV_1'
  182. );
  183. }
  184. if (!this._fragmented) {
  185. this._loop = false;
  186. return error(
  187. RangeError,
  188. 'invalid opcode 0',
  189. true,
  190. 1002,
  191. 'WS_ERR_INVALID_OPCODE'
  192. );
  193. }
  194. this._opcode = this._fragmented;
  195. } else if (this._opcode === 0x01 || this._opcode === 0x02) {
  196. if (this._fragmented) {
  197. this._loop = false;
  198. return error(
  199. RangeError,
  200. `invalid opcode ${this._opcode}`,
  201. true,
  202. 1002,
  203. 'WS_ERR_INVALID_OPCODE'
  204. );
  205. }
  206. this._compressed = compressed;
  207. } else if (this._opcode > 0x07 && this._opcode < 0x0b) {
  208. if (!this._fin) {
  209. this._loop = false;
  210. return error(
  211. RangeError,
  212. 'FIN must be set',
  213. true,
  214. 1002,
  215. 'WS_ERR_EXPECTED_FIN'
  216. );
  217. }
  218. if (compressed) {
  219. this._loop = false;
  220. return error(
  221. RangeError,
  222. 'RSV1 must be clear',
  223. true,
  224. 1002,
  225. 'WS_ERR_UNEXPECTED_RSV_1'
  226. );
  227. }
  228. if (this._payloadLength > 0x7d) {
  229. this._loop = false;
  230. return error(
  231. RangeError,
  232. `invalid payload length ${this._payloadLength}`,
  233. true,
  234. 1002,
  235. 'WS_ERR_INVALID_CONTROL_PAYLOAD_LENGTH'
  236. );
  237. }
  238. } else {
  239. this._loop = false;
  240. return error(
  241. RangeError,
  242. `invalid opcode ${this._opcode}`,
  243. true,
  244. 1002,
  245. 'WS_ERR_INVALID_OPCODE'
  246. );
  247. }
  248. if (!this._fin && !this._fragmented) this._fragmented = this._opcode;
  249. this._masked = (buf[1] & 0x80) === 0x80;
  250. if (this._isServer) {
  251. if (!this._masked) {
  252. this._loop = false;
  253. return error(
  254. RangeError,
  255. 'MASK must be set',
  256. true,
  257. 1002,
  258. 'WS_ERR_EXPECTED_MASK'
  259. );
  260. }
  261. } else if (this._masked) {
  262. this._loop = false;
  263. return error(
  264. RangeError,
  265. 'MASK must be clear',
  266. true,
  267. 1002,
  268. 'WS_ERR_UNEXPECTED_MASK'
  269. );
  270. }
  271. if (this._payloadLength === 126) this._state = GET_PAYLOAD_LENGTH_16;
  272. else if (this._payloadLength === 127) this._state = GET_PAYLOAD_LENGTH_64;
  273. else return this.haveLength();
  274. }
  275. /**
  276. * Gets extended payload length (7+16).
  277. *
  278. * @return {(RangeError|undefined)} A possible error
  279. * @private
  280. */
  281. getPayloadLength16() {
  282. if (this._bufferedBytes < 2) {
  283. this._loop = false;
  284. return;
  285. }
  286. this._payloadLength = this.consume(2).readUInt16BE(0);
  287. return this.haveLength();
  288. }
  289. /**
  290. * Gets extended payload length (7+64).
  291. *
  292. * @return {(RangeError|undefined)} A possible error
  293. * @private
  294. */
  295. getPayloadLength64() {
  296. if (this._bufferedBytes < 8) {
  297. this._loop = false;
  298. return;
  299. }
  300. const buf = this.consume(8);
  301. const num = buf.readUInt32BE(0);
  302. //
  303. // The maximum safe integer in JavaScript is 2^53 - 1. An error is returned
  304. // if payload length is greater than this number.
  305. //
  306. if (num > Math.pow(2, 53 - 32) - 1) {
  307. this._loop = false;
  308. return error(
  309. RangeError,
  310. 'Unsupported WebSocket frame: payload length > 2^53 - 1',
  311. false,
  312. 1009,
  313. 'WS_ERR_UNSUPPORTED_DATA_PAYLOAD_LENGTH'
  314. );
  315. }
  316. this._payloadLength = num * Math.pow(2, 32) + buf.readUInt32BE(4);
  317. return this.haveLength();
  318. }
  319. /**
  320. * Payload length has been read.
  321. *
  322. * @return {(RangeError|undefined)} A possible error
  323. * @private
  324. */
  325. haveLength() {
  326. if (this._payloadLength && this._opcode < 0x08) {
  327. this._totalPayloadLength += this._payloadLength;
  328. if (this._totalPayloadLength > this._maxPayload && this._maxPayload > 0) {
  329. this._loop = false;
  330. return error(
  331. RangeError,
  332. 'Max payload size exceeded',
  333. false,
  334. 1009,
  335. 'WS_ERR_UNSUPPORTED_MESSAGE_LENGTH'
  336. );
  337. }
  338. }
  339. if (this._masked) this._state = GET_MASK;
  340. else this._state = GET_DATA;
  341. }
  342. /**
  343. * Reads mask bytes.
  344. *
  345. * @private
  346. */
  347. getMask() {
  348. if (this._bufferedBytes < 4) {
  349. this._loop = false;
  350. return;
  351. }
  352. this._mask = this.consume(4);
  353. this._state = GET_DATA;
  354. }
  355. /**
  356. * Reads data bytes.
  357. *
  358. * @param {Function} cb Callback
  359. * @return {(Error|RangeError|undefined)} A possible error
  360. * @private
  361. */
  362. getData(cb) {
  363. let data = EMPTY_BUFFER;
  364. if (this._payloadLength) {
  365. if (this._bufferedBytes < this._payloadLength) {
  366. this._loop = false;
  367. return;
  368. }
  369. data = this.consume(this._payloadLength);
  370. if (this._masked) unmask(data, this._mask);
  371. }
  372. if (this._opcode > 0x07) return this.controlMessage(data);
  373. if (this._compressed) {
  374. this._state = INFLATING;
  375. this.decompress(data, cb);
  376. return;
  377. }
  378. if (data.length) {
  379. //
  380. // This message is not compressed so its length is the sum of the payload
  381. // length of all fragments.
  382. //
  383. this._messageLength = this._totalPayloadLength;
  384. this._fragments.push(data);
  385. }
  386. return this.dataMessage();
  387. }
  388. /**
  389. * Decompresses data.
  390. *
  391. * @param {Buffer} data Compressed data
  392. * @param {Function} cb Callback
  393. * @private
  394. */
  395. decompress(data, cb) {
  396. const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];
  397. perMessageDeflate.decompress(data, this._fin, (err, buf) => {
  398. if (err) return cb(err);
  399. if (buf.length) {
  400. this._messageLength += buf.length;
  401. if (this._messageLength > this._maxPayload && this._maxPayload > 0) {
  402. return cb(
  403. error(
  404. RangeError,
  405. 'Max payload size exceeded',
  406. false,
  407. 1009,
  408. 'WS_ERR_UNSUPPORTED_MESSAGE_LENGTH'
  409. )
  410. );
  411. }
  412. this._fragments.push(buf);
  413. }
  414. const er = this.dataMessage();
  415. if (er) return cb(er);
  416. this.startLoop(cb);
  417. });
  418. }
  419. /**
  420. * Handles a data message.
  421. *
  422. * @return {(Error|undefined)} A possible error
  423. * @private
  424. */
  425. dataMessage() {
  426. if (this._fin) {
  427. const messageLength = this._messageLength;
  428. const fragments = this._fragments;
  429. this._totalPayloadLength = 0;
  430. this._messageLength = 0;
  431. this._fragmented = 0;
  432. this._fragments = [];
  433. if (this._opcode === 2) {
  434. let data;
  435. if (this._binaryType === 'nodebuffer') {
  436. data = concat(fragments, messageLength);
  437. } else if (this._binaryType === 'arraybuffer') {
  438. data = toArrayBuffer(concat(fragments, messageLength));
  439. } else {
  440. data = fragments;
  441. }
  442. this.emit('message', data, true);
  443. } else {
  444. const buf = concat(fragments, messageLength);
  445. if (!this._skipUTF8Validation && !isValidUTF8(buf)) {
  446. this._loop = false;
  447. return error(
  448. Error,
  449. 'invalid UTF-8 sequence',
  450. true,
  451. 1007,
  452. 'WS_ERR_INVALID_UTF8'
  453. );
  454. }
  455. this.emit('message', buf, false);
  456. }
  457. }
  458. this._state = GET_INFO;
  459. }
  460. /**
  461. * Handles a control message.
  462. *
  463. * @param {Buffer} data Data to handle
  464. * @return {(Error|RangeError|undefined)} A possible error
  465. * @private
  466. */
  467. controlMessage(data) {
  468. if (this._opcode === 0x08) {
  469. this._loop = false;
  470. if (data.length === 0) {
  471. this.emit('conclude', 1005, EMPTY_BUFFER);
  472. this.end();
  473. } else if (data.length === 1) {
  474. return error(
  475. RangeError,
  476. 'invalid payload length 1',
  477. true,
  478. 1002,
  479. 'WS_ERR_INVALID_CONTROL_PAYLOAD_LENGTH'
  480. );
  481. } else {
  482. const code = data.readUInt16BE(0);
  483. if (!isValidStatusCode(code)) {
  484. return error(
  485. RangeError,
  486. `invalid status code ${code}`,
  487. true,
  488. 1002,
  489. 'WS_ERR_INVALID_CLOSE_CODE'
  490. );
  491. }
  492. const buf = data.slice(2);
  493. if (!this._skipUTF8Validation && !isValidUTF8(buf)) {
  494. return error(
  495. Error,
  496. 'invalid UTF-8 sequence',
  497. true,
  498. 1007,
  499. 'WS_ERR_INVALID_UTF8'
  500. );
  501. }
  502. this.emit('conclude', code, buf);
  503. this.end();
  504. }
  505. } else if (this._opcode === 0x09) {
  506. this.emit('ping', data);
  507. } else {
  508. this.emit('pong', data);
  509. }
  510. this._state = GET_INFO;
  511. }
  512. }
  513. module.exports = Receiver;
  514. /**
  515. * Builds an error object.
  516. *
  517. * @param {function(new:Error|RangeError)} ErrorCtor The error constructor
  518. * @param {String} message The error message
  519. * @param {Boolean} prefix Specifies whether or not to add a default prefix to
  520. * `message`
  521. * @param {Number} statusCode The status code
  522. * @param {String} errorCode The exposed error code
  523. * @return {(Error|RangeError)} The error
  524. * @private
  525. */
  526. function error(ErrorCtor, message, prefix, statusCode, errorCode) {
  527. const err = new ErrorCtor(
  528. prefix ? `Invalid WebSocket frame: ${message}` : message
  529. );
  530. Error.captureStackTrace(err, error);
  531. err.code = errorCode;
  532. err[kStatusCode] = statusCode;
  533. return err;
  534. }