_stream_writable.js 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386
  1. // Copyright Joyent, Inc. and other Node contributors.
  2. //
  3. // Permission is hereby granted, free of charge, to any person obtaining a
  4. // copy of this software and associated documentation files (the
  5. // "Software"), to deal in the Software without restriction, including
  6. // without limitation the rights to use, copy, modify, merge, publish,
  7. // distribute, sublicense, and/or sell copies of the Software, and to permit
  8. // persons to whom the Software is furnished to do so, subject to the
  9. // following conditions:
  10. //
  11. // The above copyright notice and this permission notice shall be included
  12. // in all copies or substantial portions of the Software.
  13. //
  14. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
  15. // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
  16. // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
  17. // NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
  18. // DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
  19. // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
  20. // USE OR OTHER DEALINGS IN THE SOFTWARE.
  21. // A bit simpler than readable streams.
  22. // Implement an async ._write(chunk, cb), and it'll handle all
  23. // the drain event emission and buffering.
  24. module.exports = Writable;
  25. /*<replacement>*/
  26. var Buffer = require('buffer').Buffer;
  27. /*</replacement>*/
  28. Writable.WritableState = WritableState;
  29. /*<replacement>*/
  30. var util = require('core-util-is');
  31. util.inherits = require('inherits');
  32. /*</replacement>*/
  33. var Stream = require('stream');
  34. util.inherits(Writable, Stream);
  35. function WriteReq(chunk, encoding, cb) {
  36. this.chunk = chunk;
  37. this.encoding = encoding;
  38. this.callback = cb;
  39. }
  40. function WritableState(options, stream) {
  41. options = options || {};
  42. // the point at which write() starts returning false
  43. // Note: 0 is a valid value, means that we always return false if
  44. // the entire buffer is not flushed immediately on write()
  45. var hwm = options.highWaterMark;
  46. this.highWaterMark = (hwm || hwm === 0) ? hwm : 16 * 1024;
  47. // object stream flag to indicate whether or not this stream
  48. // contains buffers or objects.
  49. this.objectMode = !!options.objectMode;
  50. // cast to ints.
  51. this.highWaterMark = ~~this.highWaterMark;
  52. this.needDrain = false;
  53. // at the start of calling end()
  54. this.ending = false;
  55. // when end() has been called, and returned
  56. this.ended = false;
  57. // when 'finish' is emitted
  58. this.finished = false;
  59. // should we decode strings into buffers before passing to _write?
  60. // this is here so that some node-core streams can optimize string
  61. // handling at a lower level.
  62. var noDecode = options.decodeStrings === false;
  63. this.decodeStrings = !noDecode;
  64. // Crypto is kind of old and crusty. Historically, its default string
  65. // encoding is 'binary' so we have to make this configurable.
  66. // Everything else in the universe uses 'utf8', though.
  67. this.defaultEncoding = options.defaultEncoding || 'utf8';
  68. // not an actual buffer we keep track of, but a measurement
  69. // of how much we're waiting to get pushed to some underlying
  70. // socket or file.
  71. this.length = 0;
  72. // a flag to see when we're in the middle of a write.
  73. this.writing = false;
  74. // a flag to be able to tell if the onwrite cb is called immediately,
  75. // or on a later tick. We set this to true at first, becuase any
  76. // actions that shouldn't happen until "later" should generally also
  77. // not happen before the first write call.
  78. this.sync = true;
  79. // a flag to know if we're processing previously buffered items, which
  80. // may call the _write() callback in the same tick, so that we don't
  81. // end up in an overlapped onwrite situation.
  82. this.bufferProcessing = false;
  83. // the callback that's passed to _write(chunk,cb)
  84. this.onwrite = function(er) {
  85. onwrite(stream, er);
  86. };
  87. // the callback that the user supplies to write(chunk,encoding,cb)
  88. this.writecb = null;
  89. // the amount that is being written when _write is called.
  90. this.writelen = 0;
  91. this.buffer = [];
  92. // True if the error was already emitted and should not be thrown again
  93. this.errorEmitted = false;
  94. }
  95. function Writable(options) {
  96. var Duplex = require('./_stream_duplex');
  97. // Writable ctor is applied to Duplexes, though they're not
  98. // instanceof Writable, they're instanceof Readable.
  99. if (!(this instanceof Writable) && !(this instanceof Duplex))
  100. return new Writable(options);
  101. this._writableState = new WritableState(options, this);
  102. // legacy.
  103. this.writable = true;
  104. Stream.call(this);
  105. }
  106. // Otherwise people can pipe Writable streams, which is just wrong.
  107. Writable.prototype.pipe = function() {
  108. this.emit('error', new Error('Cannot pipe. Not readable.'));
  109. };
  110. function writeAfterEnd(stream, state, cb) {
  111. var er = new Error('write after end');
  112. // TODO: defer error events consistently everywhere, not just the cb
  113. stream.emit('error', er);
  114. process.nextTick(function() {
  115. cb(er);
  116. });
  117. }
  118. // If we get something that is not a buffer, string, null, or undefined,
  119. // and we're not in objectMode, then that's an error.
  120. // Otherwise stream chunks are all considered to be of length=1, and the
  121. // watermarks determine how many objects to keep in the buffer, rather than
  122. // how many bytes or characters.
  123. function validChunk(stream, state, chunk, cb) {
  124. var valid = true;
  125. if (!Buffer.isBuffer(chunk) &&
  126. 'string' !== typeof chunk &&
  127. chunk !== null &&
  128. chunk !== undefined &&
  129. !state.objectMode) {
  130. var er = new TypeError('Invalid non-string/buffer chunk');
  131. stream.emit('error', er);
  132. process.nextTick(function() {
  133. cb(er);
  134. });
  135. valid = false;
  136. }
  137. return valid;
  138. }
  139. Writable.prototype.write = function(chunk, encoding, cb) {
  140. var state = this._writableState;
  141. var ret = false;
  142. if (typeof encoding === 'function') {
  143. cb = encoding;
  144. encoding = null;
  145. }
  146. if (Buffer.isBuffer(chunk))
  147. encoding = 'buffer';
  148. else if (!encoding)
  149. encoding = state.defaultEncoding;
  150. if (typeof cb !== 'function')
  151. cb = function() {};
  152. if (state.ended)
  153. writeAfterEnd(this, state, cb);
  154. else if (validChunk(this, state, chunk, cb))
  155. ret = writeOrBuffer(this, state, chunk, encoding, cb);
  156. return ret;
  157. };
  158. function decodeChunk(state, chunk, encoding) {
  159. if (!state.objectMode &&
  160. state.decodeStrings !== false &&
  161. typeof chunk === 'string') {
  162. chunk = new Buffer(chunk, encoding);
  163. }
  164. return chunk;
  165. }
  166. // if we're already writing something, then just put this
  167. // in the queue, and wait our turn. Otherwise, call _write
  168. // If we return false, then we need a drain event, so set that flag.
  169. function writeOrBuffer(stream, state, chunk, encoding, cb) {
  170. chunk = decodeChunk(state, chunk, encoding);
  171. if (Buffer.isBuffer(chunk))
  172. encoding = 'buffer';
  173. var len = state.objectMode ? 1 : chunk.length;
  174. state.length += len;
  175. var ret = state.length < state.highWaterMark;
  176. // we must ensure that previous needDrain will not be reset to false.
  177. if (!ret)
  178. state.needDrain = true;
  179. if (state.writing)
  180. state.buffer.push(new WriteReq(chunk, encoding, cb));
  181. else
  182. doWrite(stream, state, len, chunk, encoding, cb);
  183. return ret;
  184. }
  185. function doWrite(stream, state, len, chunk, encoding, cb) {
  186. state.writelen = len;
  187. state.writecb = cb;
  188. state.writing = true;
  189. state.sync = true;
  190. stream._write(chunk, encoding, state.onwrite);
  191. state.sync = false;
  192. }
  193. function onwriteError(stream, state, sync, er, cb) {
  194. if (sync)
  195. process.nextTick(function() {
  196. cb(er);
  197. });
  198. else
  199. cb(er);
  200. stream._writableState.errorEmitted = true;
  201. stream.emit('error', er);
  202. }
  203. function onwriteStateUpdate(state) {
  204. state.writing = false;
  205. state.writecb = null;
  206. state.length -= state.writelen;
  207. state.writelen = 0;
  208. }
  209. function onwrite(stream, er) {
  210. var state = stream._writableState;
  211. var sync = state.sync;
  212. var cb = state.writecb;
  213. onwriteStateUpdate(state);
  214. if (er)
  215. onwriteError(stream, state, sync, er, cb);
  216. else {
  217. // Check if we're actually ready to finish, but don't emit yet
  218. var finished = needFinish(stream, state);
  219. if (!finished && !state.bufferProcessing && state.buffer.length)
  220. clearBuffer(stream, state);
  221. if (sync) {
  222. process.nextTick(function() {
  223. afterWrite(stream, state, finished, cb);
  224. });
  225. } else {
  226. afterWrite(stream, state, finished, cb);
  227. }
  228. }
  229. }
  230. function afterWrite(stream, state, finished, cb) {
  231. if (!finished)
  232. onwriteDrain(stream, state);
  233. cb();
  234. if (finished)
  235. finishMaybe(stream, state);
  236. }
  237. // Must force callback to be called on nextTick, so that we don't
  238. // emit 'drain' before the write() consumer gets the 'false' return
  239. // value, and has a chance to attach a 'drain' listener.
  240. function onwriteDrain(stream, state) {
  241. if (state.length === 0 && state.needDrain) {
  242. state.needDrain = false;
  243. stream.emit('drain');
  244. }
  245. }
  246. // if there's something in the buffer waiting, then process it
  247. function clearBuffer(stream, state) {
  248. state.bufferProcessing = true;
  249. for (var c = 0; c < state.buffer.length; c++) {
  250. var entry = state.buffer[c];
  251. var chunk = entry.chunk;
  252. var encoding = entry.encoding;
  253. var cb = entry.callback;
  254. var len = state.objectMode ? 1 : chunk.length;
  255. doWrite(stream, state, len, chunk, encoding, cb);
  256. // if we didn't call the onwrite immediately, then
  257. // it means that we need to wait until it does.
  258. // also, that means that the chunk and cb are currently
  259. // being processed, so move the buffer counter past them.
  260. if (state.writing) {
  261. c++;
  262. break;
  263. }
  264. }
  265. state.bufferProcessing = false;
  266. if (c < state.buffer.length)
  267. state.buffer = state.buffer.slice(c);
  268. else
  269. state.buffer.length = 0;
  270. }
  271. Writable.prototype._write = function(chunk, encoding, cb) {
  272. cb(new Error('not implemented'));
  273. };
  274. Writable.prototype.end = function(chunk, encoding, cb) {
  275. var state = this._writableState;
  276. if (typeof chunk === 'function') {
  277. cb = chunk;
  278. chunk = null;
  279. encoding = null;
  280. } else if (typeof encoding === 'function') {
  281. cb = encoding;
  282. encoding = null;
  283. }
  284. if (typeof chunk !== 'undefined' && chunk !== null)
  285. this.write(chunk, encoding);
  286. // ignore unnecessary end() calls.
  287. if (!state.ending && !state.finished)
  288. endWritable(this, state, cb);
  289. };
  290. function needFinish(stream, state) {
  291. return (state.ending &&
  292. state.length === 0 &&
  293. !state.finished &&
  294. !state.writing);
  295. }
  296. function finishMaybe(stream, state) {
  297. var need = needFinish(stream, state);
  298. if (need) {
  299. state.finished = true;
  300. stream.emit('finish');
  301. }
  302. return need;
  303. }
  304. function endWritable(stream, state, cb) {
  305. state.ending = true;
  306. finishMaybe(stream, state);
  307. if (cb) {
  308. if (state.finished)
  309. process.nextTick(cb);
  310. else
  311. stream.once('finish', cb);
  312. }
  313. state.ended = true;
  314. }