download.js 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.GridFSBucketReadStream = void 0;
  4. const stream_1 = require("stream");
  5. const error_1 = require("../error");
  6. /**
  7. * A readable stream that enables you to read buffers from GridFS.
  8. *
  9. * Do not instantiate this class directly. Use `openDownloadStream()` instead.
  10. * @public
  11. */
  12. class GridFSBucketReadStream extends stream_1.Readable {
  13. /** @internal
  14. * @param chunks - Handle for chunks collection
  15. * @param files - Handle for files collection
  16. * @param readPreference - The read preference to use
  17. * @param filter - The filter to use to find the file document
  18. */
  19. constructor(chunks, files, readPreference, filter, options) {
  20. super();
  21. this.s = {
  22. bytesToTrim: 0,
  23. bytesToSkip: 0,
  24. bytesRead: 0,
  25. chunks,
  26. expected: 0,
  27. files,
  28. filter,
  29. init: false,
  30. expectedEnd: 0,
  31. options: {
  32. start: 0,
  33. end: 0,
  34. ...options
  35. },
  36. readPreference
  37. };
  38. }
  39. /**
  40. * Reads from the cursor and pushes to the stream.
  41. * Private Impl, do not call directly
  42. * @internal
  43. */
  44. _read() {
  45. if (this.destroyed)
  46. return;
  47. waitForFile(this, () => doRead(this));
  48. }
  49. /**
  50. * Sets the 0-based offset in bytes to start streaming from. Throws
  51. * an error if this stream has entered flowing mode
  52. * (e.g. if you've already called `on('data')`)
  53. *
  54. * @param start - 0-based offset in bytes to start streaming from
  55. */
  56. start(start = 0) {
  57. throwIfInitialized(this);
  58. this.s.options.start = start;
  59. return this;
  60. }
  61. /**
  62. * Sets the 0-based offset in bytes to start streaming from. Throws
  63. * an error if this stream has entered flowing mode
  64. * (e.g. if you've already called `on('data')`)
  65. *
  66. * @param end - Offset in bytes to stop reading at
  67. */
  68. end(end = 0) {
  69. throwIfInitialized(this);
  70. this.s.options.end = end;
  71. return this;
  72. }
  73. /**
  74. * Marks this stream as aborted (will never push another `data` event)
  75. * and kills the underlying cursor. Will emit the 'end' event, and then
  76. * the 'close' event once the cursor is successfully killed.
  77. *
  78. * @param callback - called when the cursor is successfully closed or an error occurred.
  79. */
  80. abort(callback) {
  81. this.push(null);
  82. this.destroyed = true;
  83. if (this.s.cursor) {
  84. this.s.cursor.close(error => {
  85. this.emit(GridFSBucketReadStream.CLOSE);
  86. callback && callback(error);
  87. });
  88. }
  89. else {
  90. if (!this.s.init) {
  91. // If not initialized, fire close event because we will never
  92. // get a cursor
  93. this.emit(GridFSBucketReadStream.CLOSE);
  94. }
  95. callback && callback();
  96. }
  97. }
  98. }
  99. exports.GridFSBucketReadStream = GridFSBucketReadStream;
  100. /**
  101. * An error occurred
  102. * @event
  103. */
  104. GridFSBucketReadStream.ERROR = 'error';
  105. /**
  106. * Fires when the stream loaded the file document corresponding to the provided id.
  107. * @event
  108. */
  109. GridFSBucketReadStream.FILE = 'file';
  110. /**
  111. * Emitted when a chunk of data is available to be consumed.
  112. * @event
  113. */
  114. GridFSBucketReadStream.DATA = 'data';
  115. /**
  116. * Fired when the stream is exhausted (no more data events).
  117. * @event
  118. */
  119. GridFSBucketReadStream.END = 'end';
  120. /**
  121. * Fired when the stream is exhausted and the underlying cursor is killed
  122. * @event
  123. */
  124. GridFSBucketReadStream.CLOSE = 'close';
  125. function throwIfInitialized(stream) {
  126. if (stream.s.init) {
  127. throw new error_1.MongoGridFSStreamError('Options cannot be changed after the stream is initialized');
  128. }
  129. }
  130. function doRead(stream) {
  131. if (stream.destroyed)
  132. return;
  133. if (!stream.s.cursor)
  134. return;
  135. if (!stream.s.file)
  136. return;
  137. stream.s.cursor.next((error, doc) => {
  138. if (stream.destroyed) {
  139. return;
  140. }
  141. if (error) {
  142. stream.emit(GridFSBucketReadStream.ERROR, error);
  143. return;
  144. }
  145. if (!doc) {
  146. stream.push(null);
  147. process.nextTick(() => {
  148. if (!stream.s.cursor)
  149. return;
  150. stream.s.cursor.close(error => {
  151. if (error) {
  152. stream.emit(GridFSBucketReadStream.ERROR, error);
  153. return;
  154. }
  155. stream.emit(GridFSBucketReadStream.CLOSE);
  156. });
  157. });
  158. return;
  159. }
  160. if (!stream.s.file)
  161. return;
  162. const bytesRemaining = stream.s.file.length - stream.s.bytesRead;
  163. const expectedN = stream.s.expected++;
  164. const expectedLength = Math.min(stream.s.file.chunkSize, bytesRemaining);
  165. if (doc.n > expectedN) {
  166. return stream.emit(GridFSBucketReadStream.ERROR, new error_1.MongoGridFSChunkError(`ChunkIsMissing: Got unexpected n: ${doc.n}, expected: ${expectedN}`));
  167. }
  168. if (doc.n < expectedN) {
  169. return stream.emit(GridFSBucketReadStream.ERROR, new error_1.MongoGridFSChunkError(`ExtraChunk: Got unexpected n: ${doc.n}, expected: ${expectedN}`));
  170. }
  171. let buf = Buffer.isBuffer(doc.data) ? doc.data : doc.data.buffer;
  172. if (buf.byteLength !== expectedLength) {
  173. if (bytesRemaining <= 0) {
  174. return stream.emit(GridFSBucketReadStream.ERROR, new error_1.MongoGridFSChunkError(`ExtraChunk: Got unexpected n: ${doc.n}, expected file length ${stream.s.file.length} bytes but already read ${stream.s.bytesRead} bytes`));
  175. }
  176. return stream.emit(GridFSBucketReadStream.ERROR, new error_1.MongoGridFSChunkError(`ChunkIsWrongSize: Got unexpected length: ${buf.byteLength}, expected: ${expectedLength}`));
  177. }
  178. stream.s.bytesRead += buf.byteLength;
  179. if (buf.byteLength === 0) {
  180. return stream.push(null);
  181. }
  182. let sliceStart = null;
  183. let sliceEnd = null;
  184. if (stream.s.bytesToSkip != null) {
  185. sliceStart = stream.s.bytesToSkip;
  186. stream.s.bytesToSkip = 0;
  187. }
  188. const atEndOfStream = expectedN === stream.s.expectedEnd - 1;
  189. const bytesLeftToRead = stream.s.options.end - stream.s.bytesToSkip;
  190. if (atEndOfStream && stream.s.bytesToTrim != null) {
  191. sliceEnd = stream.s.file.chunkSize - stream.s.bytesToTrim;
  192. }
  193. else if (stream.s.options.end && bytesLeftToRead < doc.data.byteLength) {
  194. sliceEnd = bytesLeftToRead;
  195. }
  196. if (sliceStart != null || sliceEnd != null) {
  197. buf = buf.slice(sliceStart || 0, sliceEnd || buf.byteLength);
  198. }
  199. stream.push(buf);
  200. return;
  201. });
  202. }
  203. function init(stream) {
  204. const findOneOptions = {};
  205. if (stream.s.readPreference) {
  206. findOneOptions.readPreference = stream.s.readPreference;
  207. }
  208. if (stream.s.options && stream.s.options.sort) {
  209. findOneOptions.sort = stream.s.options.sort;
  210. }
  211. if (stream.s.options && stream.s.options.skip) {
  212. findOneOptions.skip = stream.s.options.skip;
  213. }
  214. stream.s.files.findOne(stream.s.filter, findOneOptions, (error, doc) => {
  215. if (error) {
  216. return stream.emit(GridFSBucketReadStream.ERROR, error);
  217. }
  218. if (!doc) {
  219. const identifier = stream.s.filter._id
  220. ? stream.s.filter._id.toString()
  221. : stream.s.filter.filename;
  222. const errmsg = `FileNotFound: file ${identifier} was not found`;
  223. // TODO(NODE-3483)
  224. const err = new error_1.MongoRuntimeError(errmsg);
  225. err.code = 'ENOENT'; // TODO: NODE-3338 set property as part of constructor
  226. return stream.emit(GridFSBucketReadStream.ERROR, err);
  227. }
  228. // If document is empty, kill the stream immediately and don't
  229. // execute any reads
  230. if (doc.length <= 0) {
  231. stream.push(null);
  232. return;
  233. }
  234. if (stream.destroyed) {
  235. // If user destroys the stream before we have a cursor, wait
  236. // until the query is done to say we're 'closed' because we can't
  237. // cancel a query.
  238. stream.emit(GridFSBucketReadStream.CLOSE);
  239. return;
  240. }
  241. try {
  242. stream.s.bytesToSkip = handleStartOption(stream, doc, stream.s.options);
  243. }
  244. catch (error) {
  245. return stream.emit(GridFSBucketReadStream.ERROR, error);
  246. }
  247. const filter = { files_id: doc._id };
  248. // Currently (MongoDB 3.4.4) skip function does not support the index,
  249. // it needs to retrieve all the documents first and then skip them. (CS-25811)
  250. // As work around we use $gte on the "n" field.
  251. if (stream.s.options && stream.s.options.start != null) {
  252. const skip = Math.floor(stream.s.options.start / doc.chunkSize);
  253. if (skip > 0) {
  254. filter['n'] = { $gte: skip };
  255. }
  256. }
  257. stream.s.cursor = stream.s.chunks.find(filter).sort({ n: 1 });
  258. if (stream.s.readPreference) {
  259. stream.s.cursor.withReadPreference(stream.s.readPreference);
  260. }
  261. stream.s.expectedEnd = Math.ceil(doc.length / doc.chunkSize);
  262. stream.s.file = doc;
  263. try {
  264. stream.s.bytesToTrim = handleEndOption(stream, doc, stream.s.cursor, stream.s.options);
  265. }
  266. catch (error) {
  267. return stream.emit(GridFSBucketReadStream.ERROR, error);
  268. }
  269. stream.emit(GridFSBucketReadStream.FILE, doc);
  270. return;
  271. });
  272. }
  273. function waitForFile(stream, callback) {
  274. if (stream.s.file) {
  275. return callback();
  276. }
  277. if (!stream.s.init) {
  278. init(stream);
  279. stream.s.init = true;
  280. }
  281. stream.once('file', () => {
  282. callback();
  283. });
  284. }
  285. function handleStartOption(stream, doc, options) {
  286. if (options && options.start != null) {
  287. if (options.start > doc.length) {
  288. throw new error_1.MongoInvalidArgumentError(`Stream start (${options.start}) must not be more than the length of the file (${doc.length})`);
  289. }
  290. if (options.start < 0) {
  291. throw new error_1.MongoInvalidArgumentError(`Stream start (${options.start}) must not be negative`);
  292. }
  293. if (options.end != null && options.end < options.start) {
  294. throw new error_1.MongoInvalidArgumentError(`Stream start (${options.start}) must not be greater than stream end (${options.end})`);
  295. }
  296. stream.s.bytesRead = Math.floor(options.start / doc.chunkSize) * doc.chunkSize;
  297. stream.s.expected = Math.floor(options.start / doc.chunkSize);
  298. return options.start - stream.s.bytesRead;
  299. }
  300. throw new error_1.MongoInvalidArgumentError('Start option must be defined');
  301. }
  302. function handleEndOption(stream, doc, cursor, options) {
  303. if (options && options.end != null) {
  304. if (options.end > doc.length) {
  305. throw new error_1.MongoInvalidArgumentError(`Stream end (${options.end}) must not be more than the length of the file (${doc.length})`);
  306. }
  307. if (options.start == null || options.start < 0) {
  308. throw new error_1.MongoInvalidArgumentError(`Stream end (${options.end}) must not be negative`);
  309. }
  310. const start = options.start != null ? Math.floor(options.start / doc.chunkSize) : 0;
  311. cursor.limit(Math.ceil(options.end / doc.chunkSize) - start);
  312. stream.s.expectedEnd = Math.ceil(options.end / doc.chunkSize);
  313. return Math.ceil(options.end / doc.chunkSize) * doc.chunkSize - options.end;
  314. }
  315. throw new error_1.MongoInvalidArgumentError('End option must be defined');
  316. }
  317. //# sourceMappingURL=download.js.map