index.js 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. 'use strict';
  2. var util = require('util');
  3. var Duplex = require('readable-stream').Duplex;
  4. function FirstChunkStream(options, cb) {
  5. var _this = this;
  6. var _state = {
  7. sent: false,
  8. chunks: [],
  9. size: 0
  10. };
  11. if (!(this instanceof FirstChunkStream)) {
  12. return new FirstChunkStream(options, cb);
  13. }
  14. options = options || {};
  15. if (!(cb instanceof Function)) {
  16. throw new Error('FirstChunkStream constructor requires a callback as its second argument.');
  17. }
  18. if (typeof options.chunkLength !== 'number') {
  19. throw new Error('FirstChunkStream constructor requires `options.chunkLength` to be a number.');
  20. }
  21. if (options.objectMode) {
  22. throw new Error('FirstChunkStream doesn\'t support `objectMode` yet.');
  23. }
  24. Duplex.call(this, options);
  25. // Initialize the internal state
  26. _state.manager = createReadStreamBackpressureManager(this);
  27. // Errors management
  28. // We need to execute the callback or emit en error dependending on the fact
  29. // the firstChunk is sent or not
  30. _state.errorHandler = function firstChunkStreamErrorHandler(err) {
  31. processCallback(err, Buffer.concat(_state.chunks, _state.size), _state.encoding, function () {});
  32. };
  33. this.on('error', _state.errorHandler);
  34. // Callback management
  35. function processCallback(err, buf, encoding, done) {
  36. // When doing sync writes + emiting an errror it can happen that
  37. // Remove the error listener on the next tick if an error where fired
  38. // to avoid unwanted error throwing
  39. if (err) {
  40. setImmediate(function () {
  41. _this.removeListener('error', _state.errorHandler);
  42. });
  43. } else {
  44. _this.removeListener('error', _state.errorHandler);
  45. }
  46. _state.sent = true;
  47. cb(err, buf, encoding, function (err, buf, encoding) {
  48. if (err) {
  49. setImmediate(function () {
  50. _this.emit('error', err);
  51. });
  52. return;
  53. }
  54. if (!buf) {
  55. done();
  56. return;
  57. }
  58. _state.manager.programPush(buf, encoding, done);
  59. });
  60. }
  61. // Writes management
  62. this._write = function firstChunkStreamWrite(chunk, encoding, done) {
  63. _state.encoding = encoding;
  64. if (_state.sent) {
  65. _state.manager.programPush(chunk, _state.encoding, done);
  66. } else if (chunk.length < options.chunkLength - _state.size) {
  67. _state.chunks.push(chunk);
  68. _state.size += chunk.length;
  69. done();
  70. } else {
  71. _state.chunks.push(chunk.slice(0, options.chunkLength - _state.size));
  72. chunk = chunk.slice(options.chunkLength - _state.size);
  73. _state.size += _state.chunks[_state.chunks.length - 1].length;
  74. processCallback(null, Buffer.concat(_state.chunks, _state.size), _state.encoding, function () {
  75. if (!chunk.length) {
  76. done();
  77. return;
  78. }
  79. _state.manager.programPush(chunk, _state.encoding, done);
  80. });
  81. }
  82. };
  83. this.on('finish', function firstChunkStreamFinish() {
  84. if (!_state.sent) {
  85. return processCallback(null, Buffer.concat(_state.chunks, _state.size), _state.encoding, function () {
  86. _state.manager.programPush(null, _state.encoding);
  87. });
  88. }
  89. _state.manager.programPush(null, _state.encoding);
  90. });
  91. }
  92. util.inherits(FirstChunkStream, Duplex);
  93. // Utils to manage readable stream backpressure
  94. function createReadStreamBackpressureManager(readableStream) {
  95. var manager = {
  96. waitPush: true,
  97. programmedPushs: [],
  98. programPush: function programPush(chunk, encoding, done) {
  99. done = done || function () {};
  100. // Store the current write
  101. manager.programmedPushs.push([chunk, encoding, done]);
  102. // Need to be async to avoid nested push attempts
  103. // Programm a push attempt
  104. setImmediate(manager.attemptPush);
  105. // Let's say we're ready for a read
  106. readableStream.emit('readable');
  107. readableStream.emit('drain');
  108. },
  109. attemptPush: function () {
  110. var nextPush;
  111. if (manager.waitPush) {
  112. if (manager.programmedPushs.length) {
  113. nextPush = manager.programmedPushs.shift();
  114. manager.waitPush = readableStream.push(nextPush[0], nextPush[1]);
  115. (nextPush[2])();
  116. }
  117. } else {
  118. setImmediate(function () {
  119. // Need to be async to avoid nested push attempts
  120. readableStream.emit('readable');
  121. });
  122. }
  123. }
  124. };
  125. // Patch the readable stream to manage reads
  126. readableStream._read = function streamFilterRestoreRead() {
  127. manager.waitPush = true;
  128. // Need to be async to avoid nested push attempts
  129. setImmediate(manager.attemptPush);
  130. };
  131. return manager;
  132. }
  133. module.exports = FirstChunkStream;