path-reservations.js 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. // A path exclusive reservation system
  2. // reserve([list, of, paths], fn)
  3. // When the fn is first in line for all its paths, it
  4. // is called with a cb that clears the reservation.
  5. //
  6. // Used by async unpack to avoid clobbering paths in use,
  7. // while still allowing maximal safe parallelization.
  8. const assert = require('assert')
  9. module.exports = () => {
  10. // path => [function or Set]
  11. // A Set object means a directory reservation
  12. // A fn is a direct reservation on that path
  13. const queues = new Map()
  14. // fn => {paths:[path,...], dirs:[path, ...]}
  15. const reservations = new Map()
  16. // return a set of parent dirs for a given path
  17. const { join } = require('path')
  18. const getDirs = path =>
  19. join(path).split(/[\\/]/).slice(0, -1).reduce((set, path) =>
  20. set.length ? set.concat(join(set[set.length - 1], path)) : [path], [])
  21. // functions currently running
  22. const running = new Set()
  23. // return the queues for each path the function cares about
  24. // fn => {paths, dirs}
  25. const getQueues = fn => {
  26. const res = reservations.get(fn)
  27. /* istanbul ignore if - unpossible */
  28. if (!res)
  29. throw new Error('function does not have any path reservations')
  30. return {
  31. paths: res.paths.map(path => queues.get(path)),
  32. dirs: [...res.dirs].map(path => queues.get(path)),
  33. }
  34. }
  35. // check if fn is first in line for all its paths, and is
  36. // included in the first set for all its dir queues
  37. const check = fn => {
  38. const {paths, dirs} = getQueues(fn)
  39. return paths.every(q => q[0] === fn) &&
  40. dirs.every(q => q[0] instanceof Set && q[0].has(fn))
  41. }
  42. // run the function if it's first in line and not already running
  43. const run = fn => {
  44. if (running.has(fn) || !check(fn))
  45. return false
  46. running.add(fn)
  47. fn(() => clear(fn))
  48. return true
  49. }
  50. const clear = fn => {
  51. if (!running.has(fn))
  52. return false
  53. const { paths, dirs } = reservations.get(fn)
  54. const next = new Set()
  55. paths.forEach(path => {
  56. const q = queues.get(path)
  57. assert.equal(q[0], fn)
  58. if (q.length === 1)
  59. queues.delete(path)
  60. else {
  61. q.shift()
  62. if (typeof q[0] === 'function')
  63. next.add(q[0])
  64. else
  65. q[0].forEach(fn => next.add(fn))
  66. }
  67. })
  68. dirs.forEach(dir => {
  69. const q = queues.get(dir)
  70. assert(q[0] instanceof Set)
  71. if (q[0].size === 1 && q.length === 1)
  72. queues.delete(dir)
  73. else if (q[0].size === 1) {
  74. q.shift()
  75. // must be a function or else the Set would've been reused
  76. next.add(q[0])
  77. } else
  78. q[0].delete(fn)
  79. })
  80. running.delete(fn)
  81. next.forEach(fn => run(fn))
  82. return true
  83. }
  84. const reserve = (paths, fn) => {
  85. const dirs = new Set(
  86. paths.map(path => getDirs(path)).reduce((a, b) => a.concat(b))
  87. )
  88. reservations.set(fn, {dirs, paths})
  89. paths.forEach(path => {
  90. const q = queues.get(path)
  91. if (!q)
  92. queues.set(path, [fn])
  93. else
  94. q.push(fn)
  95. })
  96. dirs.forEach(dir => {
  97. const q = queues.get(dir)
  98. if (!q)
  99. queues.set(dir, [new Set([fn])])
  100. else if (q[q.length - 1] instanceof Set)
  101. q[q.length - 1].add(fn)
  102. else
  103. q.push(new Set([fn]))
  104. })
  105. return run(fn)
  106. }
  107. return { check, reserve }
  108. }