upload.js 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538
  1. 'use strict';
  2. var core = require('../core');
  3. var crypto = require('crypto');
  4. var stream = require('stream');
  5. var util = require('util');
  6. var Buffer = require('safe-buffer').Buffer;
  7. var ERROR_NAMESPACE_NOT_FOUND = 26;
  8. module.exports = GridFSBucketWriteStream;
  9. /**
  10. * A writable stream that enables you to write buffers to GridFS.
  11. *
  12. * Do not instantiate this class directly. Use `openUploadStream()` instead.
  13. *
  14. * @class
  15. * @extends external:Writable
  16. * @param {GridFSBucket} bucket Handle for this stream's corresponding bucket
  17. * @param {string} filename The value of the 'filename' key in the files doc
  18. * @param {object} [options] Optional settings.
  19. * @param {string|number|object} [options.id] Custom file id for the GridFS file.
  20. * @param {number} [options.chunkSizeBytes] The chunk size to use, in bytes
  21. * @param {number} [options.w] The write concern
  22. * @param {number} [options.wtimeout] The write concern timeout
  23. * @param {number} [options.j] The journal write concern
  24. * @param {boolean} [options.disableMD5=false] If true, disables adding an md5 field to file data
  25. * @fires GridFSBucketWriteStream#error
  26. * @fires GridFSBucketWriteStream#finish
  27. */
  28. function GridFSBucketWriteStream(bucket, filename, options) {
  29. options = options || {};
  30. this.bucket = bucket;
  31. this.chunks = bucket.s._chunksCollection;
  32. this.filename = filename;
  33. this.files = bucket.s._filesCollection;
  34. this.options = options;
  35. // Signals the write is all done
  36. this.done = false;
  37. this.id = options.id ? options.id : core.BSON.ObjectId();
  38. this.chunkSizeBytes = this.options.chunkSizeBytes;
  39. this.bufToStore = Buffer.alloc(this.chunkSizeBytes);
  40. this.length = 0;
  41. this.md5 = !options.disableMD5 && crypto.createHash('md5');
  42. this.n = 0;
  43. this.pos = 0;
  44. this.state = {
  45. streamEnd: false,
  46. outstandingRequests: 0,
  47. errored: false,
  48. aborted: false,
  49. promiseLibrary: this.bucket.s.promiseLibrary
  50. };
  51. if (!this.bucket.s.calledOpenUploadStream) {
  52. this.bucket.s.calledOpenUploadStream = true;
  53. var _this = this;
  54. checkIndexes(this, function() {
  55. _this.bucket.s.checkedIndexes = true;
  56. _this.bucket.emit('index');
  57. });
  58. }
  59. }
  60. util.inherits(GridFSBucketWriteStream, stream.Writable);
  61. /**
  62. * An error occurred
  63. *
  64. * @event GridFSBucketWriteStream#error
  65. * @type {Error}
  66. */
  67. /**
  68. * `end()` was called and the write stream successfully wrote the file
  69. * metadata and all the chunks to MongoDB.
  70. *
  71. * @event GridFSBucketWriteStream#finish
  72. * @type {object}
  73. */
  74. /**
  75. * Write a buffer to the stream.
  76. *
  77. * @method
  78. * @param {Buffer} chunk Buffer to write
  79. * @param {String} encoding Optional encoding for the buffer
  80. * @param {GridFSBucket~errorCallback} callback Function to call when the chunk was added to the buffer, or if the entire chunk was persisted to MongoDB if this chunk caused a flush.
  81. * @return {Boolean} False if this write required flushing a chunk to MongoDB. True otherwise.
  82. */
  83. GridFSBucketWriteStream.prototype.write = function(chunk, encoding, callback) {
  84. var _this = this;
  85. return waitForIndexes(this, function() {
  86. return doWrite(_this, chunk, encoding, callback);
  87. });
  88. };
  89. /**
  90. * Places this write stream into an aborted state (all future writes fail)
  91. * and deletes all chunks that have already been written.
  92. *
  93. * @method
  94. * @param {GridFSBucket~errorCallback} callback called when chunks are successfully removed or error occurred
  95. * @return {Promise} if no callback specified
  96. */
  97. GridFSBucketWriteStream.prototype.abort = function(callback) {
  98. if (this.state.streamEnd) {
  99. var error = new Error('Cannot abort a stream that has already completed');
  100. if (typeof callback === 'function') {
  101. return callback(error);
  102. }
  103. return this.state.promiseLibrary.reject(error);
  104. }
  105. if (this.state.aborted) {
  106. error = new Error('Cannot call abort() on a stream twice');
  107. if (typeof callback === 'function') {
  108. return callback(error);
  109. }
  110. return this.state.promiseLibrary.reject(error);
  111. }
  112. this.state.aborted = true;
  113. this.chunks.deleteMany({ files_id: this.id }, function(error) {
  114. if (typeof callback === 'function') callback(error);
  115. });
  116. };
  117. /**
  118. * Tells the stream that no more data will be coming in. The stream will
  119. * persist the remaining data to MongoDB, write the files document, and
  120. * then emit a 'finish' event.
  121. *
  122. * @method
  123. * @param {Buffer} chunk Buffer to write
  124. * @param {String} encoding Optional encoding for the buffer
  125. * @param {GridFSBucket~errorCallback} callback Function to call when all files and chunks have been persisted to MongoDB
  126. */
  127. GridFSBucketWriteStream.prototype.end = function(chunk, encoding, callback) {
  128. var _this = this;
  129. if (typeof chunk === 'function') {
  130. (callback = chunk), (chunk = null), (encoding = null);
  131. } else if (typeof encoding === 'function') {
  132. (callback = encoding), (encoding = null);
  133. }
  134. if (checkAborted(this, callback)) {
  135. return;
  136. }
  137. this.state.streamEnd = true;
  138. if (callback) {
  139. this.once('finish', function(result) {
  140. callback(null, result);
  141. });
  142. }
  143. if (!chunk) {
  144. waitForIndexes(this, function() {
  145. writeRemnant(_this);
  146. });
  147. return;
  148. }
  149. this.write(chunk, encoding, function() {
  150. writeRemnant(_this);
  151. });
  152. };
  153. /**
  154. * @ignore
  155. */
  156. function __handleError(_this, error, callback) {
  157. if (_this.state.errored) {
  158. return;
  159. }
  160. _this.state.errored = true;
  161. if (callback) {
  162. return callback(error);
  163. }
  164. _this.emit('error', error);
  165. }
  166. /**
  167. * @ignore
  168. */
  169. function createChunkDoc(filesId, n, data) {
  170. return {
  171. _id: core.BSON.ObjectId(),
  172. files_id: filesId,
  173. n: n,
  174. data: data
  175. };
  176. }
  177. /**
  178. * @ignore
  179. */
  180. function checkChunksIndex(_this, callback) {
  181. _this.chunks.listIndexes().toArray(function(error, indexes) {
  182. if (error) {
  183. // Collection doesn't exist so create index
  184. if (error.code === ERROR_NAMESPACE_NOT_FOUND) {
  185. var index = { files_id: 1, n: 1 };
  186. _this.chunks.createIndex(index, { background: false, unique: true }, function(error) {
  187. if (error) {
  188. return callback(error);
  189. }
  190. callback();
  191. });
  192. return;
  193. }
  194. return callback(error);
  195. }
  196. var hasChunksIndex = false;
  197. indexes.forEach(function(index) {
  198. if (index.key) {
  199. var keys = Object.keys(index.key);
  200. if (keys.length === 2 && index.key.files_id === 1 && index.key.n === 1) {
  201. hasChunksIndex = true;
  202. }
  203. }
  204. });
  205. if (hasChunksIndex) {
  206. callback();
  207. } else {
  208. index = { files_id: 1, n: 1 };
  209. var indexOptions = getWriteOptions(_this);
  210. indexOptions.background = false;
  211. indexOptions.unique = true;
  212. _this.chunks.createIndex(index, indexOptions, function(error) {
  213. if (error) {
  214. return callback(error);
  215. }
  216. callback();
  217. });
  218. }
  219. });
  220. }
  221. /**
  222. * @ignore
  223. */
  224. function checkDone(_this, callback) {
  225. if (_this.done) return true;
  226. if (_this.state.streamEnd && _this.state.outstandingRequests === 0 && !_this.state.errored) {
  227. // Set done so we dont' trigger duplicate createFilesDoc
  228. _this.done = true;
  229. // Create a new files doc
  230. var filesDoc = createFilesDoc(
  231. _this.id,
  232. _this.length,
  233. _this.chunkSizeBytes,
  234. _this.md5 && _this.md5.digest('hex'),
  235. _this.filename,
  236. _this.options.contentType,
  237. _this.options.aliases,
  238. _this.options.metadata
  239. );
  240. if (checkAborted(_this, callback)) {
  241. return false;
  242. }
  243. _this.files.insertOne(filesDoc, getWriteOptions(_this), function(error) {
  244. if (error) {
  245. return __handleError(_this, error, callback);
  246. }
  247. _this.emit('finish', filesDoc);
  248. });
  249. return true;
  250. }
  251. return false;
  252. }
  253. /**
  254. * @ignore
  255. */
  256. function checkIndexes(_this, callback) {
  257. _this.files.findOne({}, { _id: 1 }, function(error, doc) {
  258. if (error) {
  259. return callback(error);
  260. }
  261. if (doc) {
  262. return callback();
  263. }
  264. _this.files.listIndexes().toArray(function(error, indexes) {
  265. if (error) {
  266. // Collection doesn't exist so create index
  267. if (error.code === ERROR_NAMESPACE_NOT_FOUND) {
  268. var index = { filename: 1, uploadDate: 1 };
  269. _this.files.createIndex(index, { background: false }, function(error) {
  270. if (error) {
  271. return callback(error);
  272. }
  273. checkChunksIndex(_this, callback);
  274. });
  275. return;
  276. }
  277. return callback(error);
  278. }
  279. var hasFileIndex = false;
  280. indexes.forEach(function(index) {
  281. var keys = Object.keys(index.key);
  282. if (keys.length === 2 && index.key.filename === 1 && index.key.uploadDate === 1) {
  283. hasFileIndex = true;
  284. }
  285. });
  286. if (hasFileIndex) {
  287. checkChunksIndex(_this, callback);
  288. } else {
  289. index = { filename: 1, uploadDate: 1 };
  290. var indexOptions = getWriteOptions(_this);
  291. indexOptions.background = false;
  292. _this.files.createIndex(index, indexOptions, function(error) {
  293. if (error) {
  294. return callback(error);
  295. }
  296. checkChunksIndex(_this, callback);
  297. });
  298. }
  299. });
  300. });
  301. }
  302. /**
  303. * @ignore
  304. */
  305. function createFilesDoc(_id, length, chunkSize, md5, filename, contentType, aliases, metadata) {
  306. var ret = {
  307. _id: _id,
  308. length: length,
  309. chunkSize: chunkSize,
  310. uploadDate: new Date(),
  311. filename: filename
  312. };
  313. if (md5) {
  314. ret.md5 = md5;
  315. }
  316. if (contentType) {
  317. ret.contentType = contentType;
  318. }
  319. if (aliases) {
  320. ret.aliases = aliases;
  321. }
  322. if (metadata) {
  323. ret.metadata = metadata;
  324. }
  325. return ret;
  326. }
  327. /**
  328. * @ignore
  329. */
  330. function doWrite(_this, chunk, encoding, callback) {
  331. if (checkAborted(_this, callback)) {
  332. return false;
  333. }
  334. var inputBuf = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding);
  335. _this.length += inputBuf.length;
  336. // Input is small enough to fit in our buffer
  337. if (_this.pos + inputBuf.length < _this.chunkSizeBytes) {
  338. inputBuf.copy(_this.bufToStore, _this.pos);
  339. _this.pos += inputBuf.length;
  340. callback && callback();
  341. // Note that we reverse the typical semantics of write's return value
  342. // to be compatible with node's `.pipe()` function.
  343. // True means client can keep writing.
  344. return true;
  345. }
  346. // Otherwise, buffer is too big for current chunk, so we need to flush
  347. // to MongoDB.
  348. var inputBufRemaining = inputBuf.length;
  349. var spaceRemaining = _this.chunkSizeBytes - _this.pos;
  350. var numToCopy = Math.min(spaceRemaining, inputBuf.length);
  351. var outstandingRequests = 0;
  352. while (inputBufRemaining > 0) {
  353. var inputBufPos = inputBuf.length - inputBufRemaining;
  354. inputBuf.copy(_this.bufToStore, _this.pos, inputBufPos, inputBufPos + numToCopy);
  355. _this.pos += numToCopy;
  356. spaceRemaining -= numToCopy;
  357. if (spaceRemaining === 0) {
  358. if (_this.md5) {
  359. _this.md5.update(_this.bufToStore);
  360. }
  361. var doc = createChunkDoc(_this.id, _this.n, Buffer.from(_this.bufToStore));
  362. ++_this.state.outstandingRequests;
  363. ++outstandingRequests;
  364. if (checkAborted(_this, callback)) {
  365. return false;
  366. }
  367. _this.chunks.insertOne(doc, getWriteOptions(_this), function(error) {
  368. if (error) {
  369. return __handleError(_this, error);
  370. }
  371. --_this.state.outstandingRequests;
  372. --outstandingRequests;
  373. if (!outstandingRequests) {
  374. _this.emit('drain', doc);
  375. callback && callback();
  376. checkDone(_this);
  377. }
  378. });
  379. spaceRemaining = _this.chunkSizeBytes;
  380. _this.pos = 0;
  381. ++_this.n;
  382. }
  383. inputBufRemaining -= numToCopy;
  384. numToCopy = Math.min(spaceRemaining, inputBufRemaining);
  385. }
  386. // Note that we reverse the typical semantics of write's return value
  387. // to be compatible with node's `.pipe()` function.
  388. // False means the client should wait for the 'drain' event.
  389. return false;
  390. }
  391. /**
  392. * @ignore
  393. */
  394. function getWriteOptions(_this) {
  395. var obj = {};
  396. if (_this.options.writeConcern) {
  397. obj.w = _this.options.writeConcern.w;
  398. obj.wtimeout = _this.options.writeConcern.wtimeout;
  399. obj.j = _this.options.writeConcern.j;
  400. }
  401. return obj;
  402. }
  403. /**
  404. * @ignore
  405. */
  406. function waitForIndexes(_this, callback) {
  407. if (_this.bucket.s.checkedIndexes) {
  408. return callback(false);
  409. }
  410. _this.bucket.once('index', function() {
  411. callback(true);
  412. });
  413. return true;
  414. }
  415. /**
  416. * @ignore
  417. */
  418. function writeRemnant(_this, callback) {
  419. // Buffer is empty, so don't bother to insert
  420. if (_this.pos === 0) {
  421. return checkDone(_this, callback);
  422. }
  423. ++_this.state.outstandingRequests;
  424. // Create a new buffer to make sure the buffer isn't bigger than it needs
  425. // to be.
  426. var remnant = Buffer.alloc(_this.pos);
  427. _this.bufToStore.copy(remnant, 0, 0, _this.pos);
  428. if (_this.md5) {
  429. _this.md5.update(remnant);
  430. }
  431. var doc = createChunkDoc(_this.id, _this.n, remnant);
  432. // If the stream was aborted, do not write remnant
  433. if (checkAborted(_this, callback)) {
  434. return false;
  435. }
  436. _this.chunks.insertOne(doc, getWriteOptions(_this), function(error) {
  437. if (error) {
  438. return __handleError(_this, error);
  439. }
  440. --_this.state.outstandingRequests;
  441. checkDone(_this);
  442. });
  443. }
  444. /**
  445. * @ignore
  446. */
  447. function checkAborted(_this, callback) {
  448. if (_this.state.aborted) {
  449. if (typeof callback === 'function') {
  450. callback(new Error('this stream has been aborted'));
  451. }
  452. return true;
  453. }
  454. return false;
  455. }