graceful-fs.js 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429
  1. var fs = require('fs')
  2. var polyfills = require('./polyfills.js')
  3. var legacy = require('./legacy-streams.js')
  4. var clone = require('./clone.js')
  5. var util = require('util')
  6. /* istanbul ignore next - node 0.x polyfill */
  7. var gracefulQueue
  8. var previousSymbol
  9. /* istanbul ignore else - node 0.x polyfill */
  10. if (typeof Symbol === 'function' && typeof Symbol.for === 'function') {
  11. gracefulQueue = Symbol.for('graceful-fs.queue')
  12. // This is used in testing by future versions
  13. previousSymbol = Symbol.for('graceful-fs.previous')
  14. } else {
  15. gracefulQueue = '___graceful-fs.queue'
  16. previousSymbol = '___graceful-fs.previous'
  17. }
  18. function noop () {}
  19. function publishQueue(context, queue) {
  20. Object.defineProperty(context, gracefulQueue, {
  21. get: function() {
  22. return queue
  23. }
  24. })
  25. }
  26. var debug = noop
  27. if (util.debuglog)
  28. debug = util.debuglog('gfs4')
  29. else if (/\bgfs4\b/i.test(process.env.NODE_DEBUG || ''))
  30. debug = function() {
  31. var m = util.format.apply(util, arguments)
  32. m = 'GFS4: ' + m.split(/\n/).join('\nGFS4: ')
  33. console.error(m)
  34. }
  35. // Once time initialization
  36. if (!fs[gracefulQueue]) {
  37. // This queue can be shared by multiple loaded instances
  38. var queue = global[gracefulQueue] || []
  39. publishQueue(fs, queue)
  40. // Patch fs.close/closeSync to shared queue version, because we need
  41. // to retry() whenever a close happens *anywhere* in the program.
  42. // This is essential when multiple graceful-fs instances are
  43. // in play at the same time.
  44. fs.close = (function (fs$close) {
  45. function close (fd, cb) {
  46. return fs$close.call(fs, fd, function (err) {
  47. // This function uses the graceful-fs shared queue
  48. if (!err) {
  49. resetQueue()
  50. }
  51. if (typeof cb === 'function')
  52. cb.apply(this, arguments)
  53. })
  54. }
  55. Object.defineProperty(close, previousSymbol, {
  56. value: fs$close
  57. })
  58. return close
  59. })(fs.close)
  60. fs.closeSync = (function (fs$closeSync) {
  61. function closeSync (fd) {
  62. // This function uses the graceful-fs shared queue
  63. fs$closeSync.apply(fs, arguments)
  64. resetQueue()
  65. }
  66. Object.defineProperty(closeSync, previousSymbol, {
  67. value: fs$closeSync
  68. })
  69. return closeSync
  70. })(fs.closeSync)
  71. if (/\bgfs4\b/i.test(process.env.NODE_DEBUG || '')) {
  72. process.on('exit', function() {
  73. debug(fs[gracefulQueue])
  74. require('assert').equal(fs[gracefulQueue].length, 0)
  75. })
  76. }
  77. }
  78. if (!global[gracefulQueue]) {
  79. publishQueue(global, fs[gracefulQueue]);
  80. }
  81. module.exports = patch(clone(fs))
  82. if (process.env.TEST_GRACEFUL_FS_GLOBAL_PATCH && !fs.__patched) {
  83. module.exports = patch(fs)
  84. fs.__patched = true;
  85. }
  86. function patch (fs) {
  87. // Everything that references the open() function needs to be in here
  88. polyfills(fs)
  89. fs.gracefulify = patch
  90. fs.createReadStream = createReadStream
  91. fs.createWriteStream = createWriteStream
  92. var fs$readFile = fs.readFile
  93. fs.readFile = readFile
  94. function readFile (path, options, cb) {
  95. if (typeof options === 'function')
  96. cb = options, options = null
  97. return go$readFile(path, options, cb)
  98. function go$readFile (path, options, cb, startTime) {
  99. return fs$readFile(path, options, function (err) {
  100. if (err && (err.code === 'EMFILE' || err.code === 'ENFILE'))
  101. enqueue([go$readFile, [path, options, cb], err, startTime || Date.now(), Date.now()])
  102. else {
  103. if (typeof cb === 'function')
  104. cb.apply(this, arguments)
  105. }
  106. })
  107. }
  108. }
  109. var fs$writeFile = fs.writeFile
  110. fs.writeFile = writeFile
  111. function writeFile (path, data, options, cb) {
  112. if (typeof options === 'function')
  113. cb = options, options = null
  114. return go$writeFile(path, data, options, cb)
  115. function go$writeFile (path, data, options, cb, startTime) {
  116. return fs$writeFile(path, data, options, function (err) {
  117. if (err && (err.code === 'EMFILE' || err.code === 'ENFILE'))
  118. enqueue([go$writeFile, [path, data, options, cb], err, startTime || Date.now(), Date.now()])
  119. else {
  120. if (typeof cb === 'function')
  121. cb.apply(this, arguments)
  122. }
  123. })
  124. }
  125. }
  126. var fs$appendFile = fs.appendFile
  127. if (fs$appendFile)
  128. fs.appendFile = appendFile
  129. function appendFile (path, data, options, cb) {
  130. if (typeof options === 'function')
  131. cb = options, options = null
  132. return go$appendFile(path, data, options, cb)
  133. function go$appendFile (path, data, options, cb, startTime) {
  134. return fs$appendFile(path, data, options, function (err) {
  135. if (err && (err.code === 'EMFILE' || err.code === 'ENFILE'))
  136. enqueue([go$appendFile, [path, data, options, cb], err, startTime || Date.now(), Date.now()])
  137. else {
  138. if (typeof cb === 'function')
  139. cb.apply(this, arguments)
  140. }
  141. })
  142. }
  143. }
  144. var fs$copyFile = fs.copyFile
  145. if (fs$copyFile)
  146. fs.copyFile = copyFile
  147. function copyFile (src, dest, flags, cb) {
  148. if (typeof flags === 'function') {
  149. cb = flags
  150. flags = 0
  151. }
  152. return go$copyFile(src, dest, flags, cb)
  153. function go$copyFile (src, dest, flags, cb, startTime) {
  154. return fs$copyFile(src, dest, flags, function (err) {
  155. if (err && (err.code === 'EMFILE' || err.code === 'ENFILE'))
  156. enqueue([go$copyFile, [src, dest, flags, cb], err, startTime || Date.now(), Date.now()])
  157. else {
  158. if (typeof cb === 'function')
  159. cb.apply(this, arguments)
  160. }
  161. })
  162. }
  163. }
  164. var fs$readdir = fs.readdir
  165. fs.readdir = readdir
  166. function readdir (path, options, cb) {
  167. if (typeof options === 'function')
  168. cb = options, options = null
  169. return go$readdir(path, options, cb)
  170. function go$readdir (path, options, cb, startTime) {
  171. return fs$readdir(path, options, function (err, files) {
  172. if (err && (err.code === 'EMFILE' || err.code === 'ENFILE'))
  173. enqueue([go$readdir, [path, options, cb], err, startTime || Date.now(), Date.now()])
  174. else {
  175. if (files && files.sort)
  176. files.sort()
  177. if (typeof cb === 'function')
  178. cb.call(this, err, files)
  179. }
  180. })
  181. }
  182. }
  183. if (process.version.substr(0, 4) === 'v0.8') {
  184. var legStreams = legacy(fs)
  185. ReadStream = legStreams.ReadStream
  186. WriteStream = legStreams.WriteStream
  187. }
  188. var fs$ReadStream = fs.ReadStream
  189. if (fs$ReadStream) {
  190. ReadStream.prototype = Object.create(fs$ReadStream.prototype)
  191. ReadStream.prototype.open = ReadStream$open
  192. }
  193. var fs$WriteStream = fs.WriteStream
  194. if (fs$WriteStream) {
  195. WriteStream.prototype = Object.create(fs$WriteStream.prototype)
  196. WriteStream.prototype.open = WriteStream$open
  197. }
  198. Object.defineProperty(fs, 'ReadStream', {
  199. get: function () {
  200. return ReadStream
  201. },
  202. set: function (val) {
  203. ReadStream = val
  204. },
  205. enumerable: true,
  206. configurable: true
  207. })
  208. Object.defineProperty(fs, 'WriteStream', {
  209. get: function () {
  210. return WriteStream
  211. },
  212. set: function (val) {
  213. WriteStream = val
  214. },
  215. enumerable: true,
  216. configurable: true
  217. })
  218. // legacy names
  219. var FileReadStream = ReadStream
  220. Object.defineProperty(fs, 'FileReadStream', {
  221. get: function () {
  222. return FileReadStream
  223. },
  224. set: function (val) {
  225. FileReadStream = val
  226. },
  227. enumerable: true,
  228. configurable: true
  229. })
  230. var FileWriteStream = WriteStream
  231. Object.defineProperty(fs, 'FileWriteStream', {
  232. get: function () {
  233. return FileWriteStream
  234. },
  235. set: function (val) {
  236. FileWriteStream = val
  237. },
  238. enumerable: true,
  239. configurable: true
  240. })
  241. function ReadStream (path, options) {
  242. if (this instanceof ReadStream)
  243. return fs$ReadStream.apply(this, arguments), this
  244. else
  245. return ReadStream.apply(Object.create(ReadStream.prototype), arguments)
  246. }
  247. function ReadStream$open () {
  248. var that = this
  249. open(that.path, that.flags, that.mode, function (err, fd) {
  250. if (err) {
  251. if (that.autoClose)
  252. that.destroy()
  253. that.emit('error', err)
  254. } else {
  255. that.fd = fd
  256. that.emit('open', fd)
  257. that.read()
  258. }
  259. })
  260. }
  261. function WriteStream (path, options) {
  262. if (this instanceof WriteStream)
  263. return fs$WriteStream.apply(this, arguments), this
  264. else
  265. return WriteStream.apply(Object.create(WriteStream.prototype), arguments)
  266. }
  267. function WriteStream$open () {
  268. var that = this
  269. open(that.path, that.flags, that.mode, function (err, fd) {
  270. if (err) {
  271. that.destroy()
  272. that.emit('error', err)
  273. } else {
  274. that.fd = fd
  275. that.emit('open', fd)
  276. }
  277. })
  278. }
  279. function createReadStream (path, options) {
  280. return new fs.ReadStream(path, options)
  281. }
  282. function createWriteStream (path, options) {
  283. return new fs.WriteStream(path, options)
  284. }
  285. var fs$open = fs.open
  286. fs.open = open
  287. function open (path, flags, mode, cb) {
  288. if (typeof mode === 'function')
  289. cb = mode, mode = null
  290. return go$open(path, flags, mode, cb)
  291. function go$open (path, flags, mode, cb, startTime) {
  292. return fs$open(path, flags, mode, function (err, fd) {
  293. if (err && (err.code === 'EMFILE' || err.code === 'ENFILE'))
  294. enqueue([go$open, [path, flags, mode, cb], err, startTime || Date.now(), Date.now()])
  295. else {
  296. if (typeof cb === 'function')
  297. cb.apply(this, arguments)
  298. }
  299. })
  300. }
  301. }
  302. return fs
  303. }
  304. function enqueue (elem) {
  305. debug('ENQUEUE', elem[0].name, elem[1])
  306. fs[gracefulQueue].push(elem)
  307. retry()
  308. }
  309. // keep track of the timeout between retry() calls
  310. var retryTimer
  311. // reset the startTime and lastTime to now
  312. // this resets the start of the 60 second overall timeout as well as the
  313. // delay between attempts so that we'll retry these jobs sooner
  314. function resetQueue () {
  315. var now = Date.now()
  316. for (var i = 0; i < fs[gracefulQueue].length; ++i) {
  317. // entries that are only a length of 2 are from an older version, don't
  318. // bother modifying those since they'll be retried anyway.
  319. if (fs[gracefulQueue][i].length > 2) {
  320. fs[gracefulQueue][i][3] = now // startTime
  321. fs[gracefulQueue][i][4] = now // lastTime
  322. }
  323. }
  324. // call retry to make sure we're actively processing the queue
  325. retry()
  326. }
  327. function retry () {
  328. // clear the timer and remove it to help prevent unintended concurrency
  329. clearTimeout(retryTimer)
  330. retryTimer = undefined
  331. if (fs[gracefulQueue].length === 0)
  332. return
  333. var elem = fs[gracefulQueue].shift()
  334. var fn = elem[0]
  335. var args = elem[1]
  336. // these items may be unset if they were added by an older graceful-fs
  337. var err = elem[2]
  338. var startTime = elem[3]
  339. var lastTime = elem[4]
  340. // if we don't have a startTime we have no way of knowing if we've waited
  341. // long enough, so go ahead and retry this item now
  342. if (startTime === undefined) {
  343. debug('RETRY', fn.name, args)
  344. fn.apply(null, args)
  345. } else if (Date.now() - startTime >= 60000) {
  346. // it's been more than 60 seconds total, bail now
  347. debug('TIMEOUT', fn.name, args)
  348. var cb = args.pop()
  349. if (typeof cb === 'function')
  350. cb.call(null, err)
  351. } else {
  352. // the amount of time between the last attempt and right now
  353. var sinceAttempt = Date.now() - lastTime
  354. // the amount of time between when we first tried, and when we last tried
  355. // rounded up to at least 1
  356. var sinceStart = Math.max(lastTime - startTime, 1)
  357. // backoff. wait longer than the total time we've been retrying, but only
  358. // up to a maximum of 100ms
  359. var desiredDelay = Math.min(sinceStart * 1.2, 100)
  360. // it's been long enough since the last retry, do it again
  361. if (sinceAttempt >= desiredDelay) {
  362. debug('RETRY', fn.name, args)
  363. fn.apply(null, args.concat([startTime]))
  364. } else {
  365. // if we can't do this job yet, push it to the end of the queue
  366. // and let the next iteration check again
  367. fs[gracefulQueue].push(elem)
  368. }
  369. }
  370. // schedule our next run if one isn't already scheduled
  371. if (retryTimer === undefined) {
  372. retryTimer = setTimeout(retry, 0)
  373. }
  374. }