download.js 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433
  1. 'use strict';
  2. var stream = require('stream'),
  3. util = require('util');
  4. module.exports = GridFSBucketReadStream;
  5. /**
  6. * A readable stream that enables you to read buffers from GridFS.
  7. *
  8. * Do not instantiate this class directly. Use `openDownloadStream()` instead.
  9. *
  10. * @class
  11. * @extends external:Readable
  12. * @param {Collection} chunks Handle for chunks collection
  13. * @param {Collection} files Handle for files collection
  14. * @param {Object} readPreference The read preference to use
  15. * @param {Object} filter The query to use to find the file document
  16. * @param {Object} [options] Optional settings.
  17. * @param {Number} [options.sort] Optional sort for the file find query
  18. * @param {Number} [options.skip] Optional skip for the file find query
  19. * @param {Number} [options.start] Optional 0-based offset in bytes to start streaming from
  20. * @param {Number} [options.end] Optional 0-based offset in bytes to stop streaming before
  21. * @fires GridFSBucketReadStream#error
  22. * @fires GridFSBucketReadStream#file
  23. */
  24. function GridFSBucketReadStream(chunks, files, readPreference, filter, options) {
  25. this.s = {
  26. bytesRead: 0,
  27. chunks: chunks,
  28. cursor: null,
  29. expected: 0,
  30. files: files,
  31. filter: filter,
  32. init: false,
  33. expectedEnd: 0,
  34. file: null,
  35. options: options,
  36. readPreference: readPreference
  37. };
  38. stream.Readable.call(this);
  39. }
  40. util.inherits(GridFSBucketReadStream, stream.Readable);
  41. /**
  42. * An error occurred
  43. *
  44. * @event GridFSBucketReadStream#error
  45. * @type {Error}
  46. */
  47. /**
  48. * Fires when the stream loaded the file document corresponding to the
  49. * provided id.
  50. *
  51. * @event GridFSBucketReadStream#file
  52. * @type {object}
  53. */
  54. /**
  55. * Emitted when a chunk of data is available to be consumed.
  56. *
  57. * @event GridFSBucketReadStream#data
  58. * @type {object}
  59. */
  60. /**
  61. * Fired when the stream is exhausted (no more data events).
  62. *
  63. * @event GridFSBucketReadStream#end
  64. * @type {object}
  65. */
  66. /**
  67. * Fired when the stream is exhausted and the underlying cursor is killed
  68. *
  69. * @event GridFSBucketReadStream#close
  70. * @type {object}
  71. */
  72. /**
  73. * Reads from the cursor and pushes to the stream.
  74. * Private Impl, do not call directly
  75. * @ignore
  76. * @method
  77. */
  78. GridFSBucketReadStream.prototype._read = function() {
  79. var _this = this;
  80. if (this.destroyed) {
  81. return;
  82. }
  83. waitForFile(_this, function() {
  84. doRead(_this);
  85. });
  86. };
  87. /**
  88. * Sets the 0-based offset in bytes to start streaming from. Throws
  89. * an error if this stream has entered flowing mode
  90. * (e.g. if you've already called `on('data')`)
  91. * @method
  92. * @param {Number} start Offset in bytes to start reading at
  93. * @return {GridFSBucketReadStream} Reference to Self
  94. */
  95. GridFSBucketReadStream.prototype.start = function(start) {
  96. throwIfInitialized(this);
  97. this.s.options.start = start;
  98. return this;
  99. };
  100. /**
  101. * Sets the 0-based offset in bytes to start streaming from. Throws
  102. * an error if this stream has entered flowing mode
  103. * (e.g. if you've already called `on('data')`)
  104. * @method
  105. * @param {Number} end Offset in bytes to stop reading at
  106. * @return {GridFSBucketReadStream} Reference to self
  107. */
  108. GridFSBucketReadStream.prototype.end = function(end) {
  109. throwIfInitialized(this);
  110. this.s.options.end = end;
  111. return this;
  112. };
  113. /**
  114. * Marks this stream as aborted (will never push another `data` event)
  115. * and kills the underlying cursor. Will emit the 'end' event, and then
  116. * the 'close' event once the cursor is successfully killed.
  117. *
  118. * @method
  119. * @param {GridFSBucket~errorCallback} [callback] called when the cursor is successfully closed or an error occurred.
  120. * @fires GridFSBucketWriteStream#close
  121. * @fires GridFSBucketWriteStream#end
  122. */
  123. GridFSBucketReadStream.prototype.abort = function(callback) {
  124. var _this = this;
  125. this.push(null);
  126. this.destroyed = true;
  127. if (this.s.cursor) {
  128. this.s.cursor.close(function(error) {
  129. _this.emit('close');
  130. callback && callback(error);
  131. });
  132. } else {
  133. if (!this.s.init) {
  134. // If not initialized, fire close event because we will never
  135. // get a cursor
  136. _this.emit('close');
  137. }
  138. callback && callback();
  139. }
  140. };
  141. /**
  142. * @ignore
  143. */
  144. function throwIfInitialized(self) {
  145. if (self.s.init) {
  146. throw new Error('You cannot change options after the stream has entered' + 'flowing mode!');
  147. }
  148. }
  149. /**
  150. * @ignore
  151. */
  152. function doRead(_this) {
  153. if (_this.destroyed) {
  154. return;
  155. }
  156. _this.s.cursor.next(function(error, doc) {
  157. if (_this.destroyed) {
  158. return;
  159. }
  160. if (error) {
  161. return __handleError(_this, error);
  162. }
  163. if (!doc) {
  164. _this.push(null);
  165. process.nextTick(() => {
  166. _this.s.cursor.close(function(error) {
  167. if (error) {
  168. __handleError(_this, error);
  169. return;
  170. }
  171. _this.emit('close');
  172. });
  173. });
  174. return;
  175. }
  176. var bytesRemaining = _this.s.file.length - _this.s.bytesRead;
  177. var expectedN = _this.s.expected++;
  178. var expectedLength = Math.min(_this.s.file.chunkSize, bytesRemaining);
  179. if (doc.n > expectedN) {
  180. var errmsg = 'ChunkIsMissing: Got unexpected n: ' + doc.n + ', expected: ' + expectedN;
  181. return __handleError(_this, new Error(errmsg));
  182. }
  183. if (doc.n < expectedN) {
  184. errmsg = 'ExtraChunk: Got unexpected n: ' + doc.n + ', expected: ' + expectedN;
  185. return __handleError(_this, new Error(errmsg));
  186. }
  187. var buf = Buffer.isBuffer(doc.data) ? doc.data : doc.data.buffer;
  188. if (buf.length !== expectedLength) {
  189. if (bytesRemaining <= 0) {
  190. errmsg = 'ExtraChunk: Got unexpected n: ' + doc.n;
  191. return __handleError(_this, new Error(errmsg));
  192. }
  193. errmsg =
  194. 'ChunkIsWrongSize: Got unexpected length: ' + buf.length + ', expected: ' + expectedLength;
  195. return __handleError(_this, new Error(errmsg));
  196. }
  197. _this.s.bytesRead += buf.length;
  198. if (buf.length === 0) {
  199. return _this.push(null);
  200. }
  201. var sliceStart = null;
  202. var sliceEnd = null;
  203. if (_this.s.bytesToSkip != null) {
  204. sliceStart = _this.s.bytesToSkip;
  205. _this.s.bytesToSkip = 0;
  206. }
  207. const atEndOfStream = expectedN === _this.s.expectedEnd - 1;
  208. const bytesLeftToRead = _this.s.options.end - _this.s.bytesToSkip;
  209. if (atEndOfStream && _this.s.bytesToTrim != null) {
  210. sliceEnd = _this.s.file.chunkSize - _this.s.bytesToTrim;
  211. } else if (_this.s.options.end && bytesLeftToRead < doc.data.length()) {
  212. sliceEnd = bytesLeftToRead;
  213. }
  214. if (sliceStart != null || sliceEnd != null) {
  215. buf = buf.slice(sliceStart || 0, sliceEnd || buf.length);
  216. }
  217. _this.push(buf);
  218. });
  219. }
  220. /**
  221. * @ignore
  222. */
  223. function init(self) {
  224. var findOneOptions = {};
  225. if (self.s.readPreference) {
  226. findOneOptions.readPreference = self.s.readPreference;
  227. }
  228. if (self.s.options && self.s.options.sort) {
  229. findOneOptions.sort = self.s.options.sort;
  230. }
  231. if (self.s.options && self.s.options.skip) {
  232. findOneOptions.skip = self.s.options.skip;
  233. }
  234. self.s.files.findOne(self.s.filter, findOneOptions, function(error, doc) {
  235. if (error) {
  236. return __handleError(self, error);
  237. }
  238. if (!doc) {
  239. var identifier = self.s.filter._id ? self.s.filter._id.toString() : self.s.filter.filename;
  240. var errmsg = 'FileNotFound: file ' + identifier + ' was not found';
  241. var err = new Error(errmsg);
  242. err.code = 'ENOENT';
  243. return __handleError(self, err);
  244. }
  245. // If document is empty, kill the stream immediately and don't
  246. // execute any reads
  247. if (doc.length <= 0) {
  248. self.push(null);
  249. return;
  250. }
  251. if (self.destroyed) {
  252. // If user destroys the stream before we have a cursor, wait
  253. // until the query is done to say we're 'closed' because we can't
  254. // cancel a query.
  255. self.emit('close');
  256. return;
  257. }
  258. try {
  259. self.s.bytesToSkip = handleStartOption(self, doc, self.s.options);
  260. } catch (error) {
  261. return __handleError(self, error);
  262. }
  263. var filter = { files_id: doc._id };
  264. // Currently (MongoDB 3.4.4) skip function does not support the index,
  265. // it needs to retrieve all the documents first and then skip them. (CS-25811)
  266. // As work around we use $gte on the "n" field.
  267. if (self.s.options && self.s.options.start != null) {
  268. var skip = Math.floor(self.s.options.start / doc.chunkSize);
  269. if (skip > 0) {
  270. filter['n'] = { $gte: skip };
  271. }
  272. }
  273. self.s.cursor = self.s.chunks.find(filter).sort({ n: 1 });
  274. if (self.s.readPreference) {
  275. self.s.cursor.setReadPreference(self.s.readPreference);
  276. }
  277. self.s.expectedEnd = Math.ceil(doc.length / doc.chunkSize);
  278. self.s.file = doc;
  279. try {
  280. self.s.bytesToTrim = handleEndOption(self, doc, self.s.cursor, self.s.options);
  281. } catch (error) {
  282. return __handleError(self, error);
  283. }
  284. self.emit('file', doc);
  285. });
  286. }
  287. /**
  288. * @ignore
  289. */
  290. function waitForFile(_this, callback) {
  291. if (_this.s.file) {
  292. return callback();
  293. }
  294. if (!_this.s.init) {
  295. init(_this);
  296. _this.s.init = true;
  297. }
  298. _this.once('file', function() {
  299. callback();
  300. });
  301. }
  302. /**
  303. * @ignore
  304. */
  305. function handleStartOption(stream, doc, options) {
  306. if (options && options.start != null) {
  307. if (options.start > doc.length) {
  308. throw new Error(
  309. 'Stream start (' +
  310. options.start +
  311. ') must not be ' +
  312. 'more than the length of the file (' +
  313. doc.length +
  314. ')'
  315. );
  316. }
  317. if (options.start < 0) {
  318. throw new Error('Stream start (' + options.start + ') must not be ' + 'negative');
  319. }
  320. if (options.end != null && options.end < options.start) {
  321. throw new Error(
  322. 'Stream start (' +
  323. options.start +
  324. ') must not be ' +
  325. 'greater than stream end (' +
  326. options.end +
  327. ')'
  328. );
  329. }
  330. stream.s.bytesRead = Math.floor(options.start / doc.chunkSize) * doc.chunkSize;
  331. stream.s.expected = Math.floor(options.start / doc.chunkSize);
  332. return options.start - stream.s.bytesRead;
  333. }
  334. }
  335. /**
  336. * @ignore
  337. */
  338. function handleEndOption(stream, doc, cursor, options) {
  339. if (options && options.end != null) {
  340. if (options.end > doc.length) {
  341. throw new Error(
  342. 'Stream end (' +
  343. options.end +
  344. ') must not be ' +
  345. 'more than the length of the file (' +
  346. doc.length +
  347. ')'
  348. );
  349. }
  350. if (options.start < 0) {
  351. throw new Error('Stream end (' + options.end + ') must not be ' + 'negative');
  352. }
  353. var start = options.start != null ? Math.floor(options.start / doc.chunkSize) : 0;
  354. cursor.limit(Math.ceil(options.end / doc.chunkSize) - start);
  355. stream.s.expectedEnd = Math.ceil(options.end / doc.chunkSize);
  356. return Math.ceil(options.end / doc.chunkSize) * doc.chunkSize - options.end;
  357. }
  358. }
  359. /**
  360. * @ignore
  361. */
  362. function __handleError(_this, error) {
  363. _this.emit('error', error);
  364. }