handle.js 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437
  1. var assert = require('assert');
  2. var util = require('util');
  3. var EventEmitter = require('events').EventEmitter;
  4. var Buffer = require('buffer').Buffer;
  5. var Queue = require('./queue');
  6. // Node.js version
  7. var mode = /^v0\.8\./.test(process.version) ? 'rusty' :
  8. /^v0\.(9|10)\./.test(process.version) ? 'old' :
  9. 'modern';
  10. function Handle(stream, options) {
  11. EventEmitter.call(this);
  12. this._stream = stream;
  13. this._flowing = false;
  14. this._reading = false;
  15. this._options = options || {};
  16. this.onread = null;
  17. // Pending requests
  18. this.pending = new Queue();
  19. // Start handle once `onread` is set
  20. if (mode === 'rusty') {
  21. var self = this;
  22. Object.defineProperty(this, 'onread', {
  23. set: function(value) {
  24. Object.defineProperty(self, 'onread', {
  25. value: value
  26. });
  27. process.nextTick(function() {
  28. self.readStart();
  29. });
  30. }
  31. });
  32. }
  33. // NOTE: v0.8 has some odd .pause()/.resume() semantics in http.js
  34. if (mode === 'rusty')
  35. this.writeQueueSize = 0;
  36. else if (mode !== 'modern')
  37. this.writeQueueSize = 1;
  38. if (mode === 'rusty') {
  39. if (this._stream)
  40. this._rustyInit();
  41. else
  42. this.once('stream', this._rustyInit);
  43. }
  44. }
  45. util.inherits(Handle, EventEmitter);
  46. module.exports = Handle;
  47. Handle.mode = mode;
  48. Handle.create = function create(stream, options) {
  49. return new Handle(stream, options);
  50. };
  51. Handle.prototype._queueReq = function _queueReq(type, req) {
  52. return this.pending.append(type, req);
  53. };
  54. Handle.prototype._pendingList = function _pendingList() {
  55. var list = [];
  56. while (!this.pending.isEmpty())
  57. list.push(this.pending.first().dequeue());
  58. return list;
  59. };
  60. Handle.prototype.setStream = function setStream(stream) {
  61. assert(this._stream === null, 'Can\'t set stream two times');
  62. this._stream = stream;
  63. this.emit('stream', stream);
  64. };
  65. Handle.prototype.readStart = function readStart() {
  66. this._reading = true;
  67. if (!this._stream) {
  68. this.once('stream', this.readStart);
  69. return 0;
  70. }
  71. if (!this._flowing) {
  72. this._flowing = true;
  73. this._flow();
  74. }
  75. this._stream.resume();
  76. return 0;
  77. };
  78. Handle.prototype.readStop = function readStop() {
  79. this._reading = false;
  80. if (!this._stream) {
  81. this.once('stream', this.readStop);
  82. return 0;
  83. }
  84. this._stream.pause();
  85. return 0;
  86. };
  87. if (mode === 'modern') {
  88. var uv = process.binding('uv');
  89. Handle.prototype._flow = function flow() {
  90. var self = this;
  91. this._stream.on('data', function(chunk) {
  92. self.onread(chunk.length, chunk);
  93. });
  94. this._stream.on('end', function() {
  95. self.onread(uv.UV_EOF, new Buffer(0));
  96. });
  97. this._stream.on('close', function() {
  98. setImmediate(function() {
  99. if (self._reading)
  100. self.onread(uv.UV_ECONNRESET, new Buffer(0));
  101. });
  102. });
  103. };
  104. Handle.prototype._close = function _close() {
  105. var list = this._pendingList();
  106. var self = this;
  107. setImmediate(function() {
  108. for (var i = 0; i < list.length; i++) {
  109. var req = list[i];
  110. req.oncomplete(uv.UV_ECANCELED, self, req);
  111. }
  112. });
  113. this.readStop();
  114. };
  115. } else if (mode === 'old') {
  116. Handle.prototype._flow = function flow() {
  117. var self = this;
  118. this._stream.on('data', function(chunk) {
  119. self.onread(chunk, 0, chunk.length);
  120. });
  121. this._stream.on('end', function() {
  122. var errno = process._errno;
  123. process._errno = 'EOF';
  124. self.onread(null, 0, 0);
  125. if (process._errno === 'EOF')
  126. process._errno = errno;
  127. });
  128. this._stream.on('close', function() {
  129. setImmediate(function() {
  130. if (!self._reading)
  131. return;
  132. var errno = process._errno;
  133. process._errno = 'ECONNRESET';
  134. self.onread(null, 0, 0);
  135. if (process._errno === 'ECONNRESET')
  136. process._errno = errno;
  137. });
  138. });
  139. };
  140. Handle.prototype._close = function _close() {
  141. var list = this._pendingList();
  142. var self = this;
  143. setImmediate(function() {
  144. for (var i = 0; i < list.length; i++) {
  145. process._errno = 'CANCELED';
  146. var req = list[i];
  147. req.oncomplete(-1, self, req);
  148. }
  149. });
  150. this.readStop();
  151. };
  152. } else {
  153. Handle.prototype._rustyInit = function _rustyInit() {
  154. var self = this;
  155. this._stream.on('close', function() {
  156. process.nextTick(function() {
  157. if (!self._reading)
  158. return;
  159. var errno = global.errno;
  160. global.errno = 'ECONNRESET';
  161. self.onread(null, 0, 0);
  162. if (global.errno === 'ECONNRESET')
  163. global.errno = errno;
  164. });
  165. });
  166. };
  167. Handle.prototype._flow = function flow() {
  168. var self = this;
  169. this._stream.on('data', function(chunk) {
  170. self.onread(chunk, 0, chunk.length);
  171. });
  172. this._stream.on('end', function() {
  173. var errno = global.errno;
  174. global.errno = 'EOF';
  175. self.onread(null, 0, 0);
  176. if (global.errno === 'EOF')
  177. global.errno = errno;
  178. });
  179. };
  180. Handle.prototype._close = function _close() {
  181. var list = this._pendingList();
  182. var self = this;
  183. process.nextTick(function() {
  184. for (var i = 0; i < list.length; i++) {
  185. var req = list[i];
  186. global.errno = 'CANCELED';
  187. req.oncomplete(-1, self, req);
  188. }
  189. });
  190. this.readStop();
  191. };
  192. }
  193. if (mode === 'modern') {
  194. Handle.prototype.shutdown = function shutdown(req) {
  195. var wrap = this._queueReq('shutdown', req);
  196. if (!this._stream) {
  197. this.once('stream', function() {
  198. this._shutdown(wrap);
  199. });
  200. return 0;
  201. }
  202. return this._shutdown(wrap);
  203. };
  204. Handle.prototype._shutdown = function _shutdown(wrap) {
  205. var self = this;
  206. this._stream.end(function() {
  207. var req = wrap.dequeue();
  208. if (!req)
  209. return;
  210. req.oncomplete(0, self, req);
  211. });
  212. return 0;
  213. };
  214. } else {
  215. Handle.prototype.shutdown = function shutdown(req) {
  216. if (!req)
  217. req = {};
  218. var wrap = this._queueReq('shutdown', req);
  219. if (!this._stream) {
  220. this.once('stream', function() {
  221. this._shutdown(wrap);
  222. });
  223. return req;
  224. }
  225. this._shutdown(wrap);
  226. return req;
  227. };
  228. Handle.prototype._shutdown = function _shutdown(wrap) {
  229. var self = this;
  230. this._stream.end(function() {
  231. var req = wrap.dequeue();
  232. if (!req)
  233. return;
  234. req.oncomplete(0, self, req);
  235. });
  236. };
  237. }
  238. if (mode !== 'rusty') {
  239. Handle.prototype.close = function close(callback) {
  240. this._close();
  241. if (!this._stream) {
  242. this.once('stream', function() {
  243. this.close(callback);
  244. });
  245. return 0;
  246. }
  247. if (this._options.close)
  248. this._options.close(callback);
  249. else
  250. process.nextTick(callback);
  251. return 0;
  252. };
  253. } else {
  254. Handle.prototype.close = function close() {
  255. this._close();
  256. if (!this._stream)
  257. this.once('stream', this.close);
  258. else if (this._options.close)
  259. this._options.close(function() {});
  260. return 0;
  261. };
  262. }
  263. if (mode === 'modern') {
  264. Handle.prototype.writeEnc = function writeEnc(req, data, enc) {
  265. var wrap = this._queueReq('write', req);
  266. if (!this._stream) {
  267. this.once('stream', function() {
  268. this._writeEnc(wrap, req, data, enc);
  269. });
  270. return 0;
  271. }
  272. return this._writeEnc(wrap, req, data, enc);
  273. };
  274. Handle.prototype._writeEnc = function _writeEnc(wrap, req, data, enc) {
  275. var self = this;
  276. req.async = true;
  277. req.bytes = data.length;
  278. if (wrap.isEmpty())
  279. return 0;
  280. this._stream.write(data, enc, function() {
  281. var req = wrap.dequeue();
  282. if (!req)
  283. return;
  284. req.oncomplete(0, self, req);
  285. });
  286. return 0;
  287. };
  288. } else {
  289. Handle.prototype.writeEnc = function writeEnc(data, ignored, enc, req) {
  290. if (!req)
  291. req = { bytes: data.length };
  292. var wrap = this._queueReq('write', req);
  293. if (!this._stream) {
  294. this.once('stream', function() {
  295. this._writeEnc(data, ignored, enc, wrap);
  296. });
  297. return req;
  298. }
  299. this._writeEnc(data, ignored, enc, wrap);
  300. return req;
  301. };
  302. Handle.prototype._writeEnc = function _writeEnc(data, ignored, enc, wrap) {
  303. var self = this;
  304. var buffer = new Buffer(data, enc);
  305. if (wrap.isEmpty())
  306. return;
  307. this._stream.write(buffer, function() {
  308. var req = wrap.dequeue();
  309. if (!req)
  310. return;
  311. req.oncomplete(0, self, req);
  312. });
  313. };
  314. }
  315. Handle.prototype.writeBuffer = function writeBuffer(req, data) {
  316. return this.writeEnc(req, data, null);
  317. };
  318. Handle.prototype.writeAsciiString = function writeAsciiString(req, data) {
  319. return this.writeEnc(req, data, 'ascii');
  320. };
  321. Handle.prototype.writeUtf8String = function writeUtf8String(req, data) {
  322. return this.writeEnc(req, data, 'utf8');
  323. };
  324. Handle.prototype.writeUcs2String = function writeUcs2String(req, data) {
  325. return this.writeEnc(req, data, 'ucs2');
  326. };
  327. Handle.prototype.writeBinaryString = function writeBinaryString(req, data) {
  328. return this.writeEnc(req, data, 'binary');
  329. };
  330. Handle.prototype.writeLatin1String = function writeLatin1String(req, data) {
  331. return this.writeEnc(req, data, 'binary');
  332. };
  333. // v0.8
  334. Handle.prototype.getsockname = function getsockname() {
  335. if (this._options.getPeerName)
  336. return this._options.getPeerName();
  337. return null;
  338. };
  339. if (mode === 'modern') {
  340. Handle.prototype.getpeername = function getpeername(out) {
  341. var res = this.getsockname();
  342. if (!res)
  343. return -1;
  344. Object.keys(res).forEach(function(key) {
  345. out[key] = res[key];
  346. });
  347. return 0;
  348. };
  349. } else {
  350. // v0.10
  351. Handle.prototype.getpeername = function getpeername() {
  352. return this.getsockname();
  353. };
  354. }