stream_reader.js 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. 'use strict';
  2. var StreamReader = function() {
  3. this._queue = [];
  4. this._queueSize = 0;
  5. this._offset = 0;
  6. };
  7. StreamReader.prototype.put = function(buffer) {
  8. if (!buffer || buffer.length === 0) return;
  9. if (!buffer.copy) buffer = new Buffer(buffer);
  10. this._queue.push(buffer);
  11. this._queueSize += buffer.length;
  12. };
  13. StreamReader.prototype.read = function(length) {
  14. if (length > this._queueSize) return null;
  15. if (length === 0) return new Buffer(0);
  16. this._queueSize -= length;
  17. var queue = this._queue,
  18. remain = length,
  19. first = queue[0],
  20. buffers, buffer;
  21. if (first.length >= length) {
  22. if (first.length === length) {
  23. return queue.shift();
  24. } else {
  25. buffer = first.slice(0, length);
  26. queue[0] = first.slice(length);
  27. return buffer;
  28. }
  29. }
  30. for (var i = 0, n = queue.length; i < n; i++) {
  31. if (remain < queue[i].length) break;
  32. remain -= queue[i].length;
  33. }
  34. buffers = queue.splice(0, i);
  35. if (remain > 0 && queue.length > 0) {
  36. buffers.push(queue[0].slice(0, remain));
  37. queue[0] = queue[0].slice(remain);
  38. }
  39. return this._concat(buffers, length);
  40. };
  41. StreamReader.prototype.eachByte = function(callback, context) {
  42. var buffer, n, index;
  43. while (this._queue.length > 0) {
  44. buffer = this._queue[0];
  45. n = buffer.length;
  46. while (this._offset < n) {
  47. index = this._offset;
  48. this._offset += 1;
  49. callback.call(context, buffer[index]);
  50. }
  51. this._offset = 0;
  52. this._queue.shift();
  53. }
  54. };
  55. StreamReader.prototype._concat = function(buffers, length) {
  56. if (Buffer.concat) return Buffer.concat(buffers, length);
  57. var buffer = new Buffer(length),
  58. offset = 0;
  59. for (var i = 0, n = buffers.length; i < n; i++) {
  60. buffers[i].copy(buffer, offset);
  61. offset += buffers[i].length;
  62. }
  63. return buffer;
  64. };
  65. module.exports = StreamReader;