stream_reader.js 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. 'use strict';
  2. var Buffer = require('safe-buffer').Buffer;
  3. var StreamReader = function() {
  4. this._queue = [];
  5. this._queueSize = 0;
  6. this._offset = 0;
  7. };
  8. StreamReader.prototype.put = function(buffer) {
  9. if (!buffer || buffer.length === 0) return;
  10. if (!Buffer.isBuffer(buffer)) buffer = Buffer.from(buffer);
  11. this._queue.push(buffer);
  12. this._queueSize += buffer.length;
  13. };
  14. StreamReader.prototype.read = function(length) {
  15. if (length > this._queueSize) return null;
  16. if (length === 0) return Buffer.alloc(0);
  17. this._queueSize -= length;
  18. var queue = this._queue,
  19. remain = length,
  20. first = queue[0],
  21. buffers, buffer;
  22. if (first.length >= length) {
  23. if (first.length === length) {
  24. return queue.shift();
  25. } else {
  26. buffer = first.slice(0, length);
  27. queue[0] = first.slice(length);
  28. return buffer;
  29. }
  30. }
  31. for (var i = 0, n = queue.length; i < n; i++) {
  32. if (remain < queue[i].length) break;
  33. remain -= queue[i].length;
  34. }
  35. buffers = queue.splice(0, i);
  36. if (remain > 0 && queue.length > 0) {
  37. buffers.push(queue[0].slice(0, remain));
  38. queue[0] = queue[0].slice(remain);
  39. }
  40. return Buffer.concat(buffers, length);
  41. };
  42. StreamReader.prototype.eachByte = function(callback, context) {
  43. var buffer, n, index;
  44. while (this._queue.length > 0) {
  45. buffer = this._queue[0];
  46. n = buffer.length;
  47. while (this._offset < n) {
  48. index = this._offset;
  49. this._offset += 1;
  50. callback.call(context, buffer[index]);
  51. }
  52. this._offset = 0;
  53. this._queue.shift();
  54. }
  55. };
  56. module.exports = StreamReader;