index.js 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. "use strict";
  2. var stream = require("stream");
  3. function DuplexWrapper(options, writable, readable) {
  4. if (typeof readable === "undefined") {
  5. readable = writable;
  6. writable = options;
  7. options = null;
  8. }
  9. stream.Duplex.call(this, options);
  10. if (typeof readable.read !== "function") {
  11. readable = (new stream.Readable(options)).wrap(readable);
  12. }
  13. this._writable = writable;
  14. this._readable = readable;
  15. this._waiting = false;
  16. var self = this;
  17. writable.once("finish", function() {
  18. self.end();
  19. });
  20. this.once("finish", function() {
  21. writable.end();
  22. });
  23. readable.on("readable", function() {
  24. if (self._waiting) {
  25. self._waiting = false;
  26. self._read();
  27. }
  28. });
  29. readable.once("end", function() {
  30. self.push(null);
  31. });
  32. if (!options || typeof options.bubbleErrors === "undefined" || options.bubbleErrors) {
  33. writable.on("error", function(err) {
  34. self.emit("error", err);
  35. });
  36. readable.on("error", function(err) {
  37. self.emit("error", err);
  38. });
  39. }
  40. }
  41. DuplexWrapper.prototype = Object.create(stream.Duplex.prototype, {constructor: {value: DuplexWrapper}});
  42. DuplexWrapper.prototype._write = function _write(input, encoding, done) {
  43. this._writable.write(input, encoding, done);
  44. };
  45. DuplexWrapper.prototype._read = function _read() {
  46. var buf;
  47. var reads = 0;
  48. while ((buf = this._readable.read()) !== null) {
  49. this.push(buf);
  50. reads++;
  51. }
  52. if (reads === 0) {
  53. this._waiting = true;
  54. }
  55. };
  56. module.exports = function duplex2(options, writable, readable) {
  57. return new DuplexWrapper(options, writable, readable);
  58. };
  59. module.exports.DuplexWrapper = DuplexWrapper;