write.js 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. 'use strict'
  2. const util = require('util')
  3. const contentPath = require('./path')
  4. const fixOwner = require('../util/fix-owner')
  5. const fs = require('fs')
  6. const moveFile = require('../util/move-file')
  7. const Minipass = require('minipass')
  8. const Pipeline = require('minipass-pipeline')
  9. const Flush = require('minipass-flush')
  10. const path = require('path')
  11. const rimraf = util.promisify(require('rimraf'))
  12. const ssri = require('ssri')
  13. const uniqueFilename = require('unique-filename')
  14. const { disposer } = require('./../util/disposer')
  15. const fsm = require('fs-minipass')
  16. const writeFile = util.promisify(fs.writeFile)
  17. module.exports = write
  18. function write (cache, data, opts = {}) {
  19. const { algorithms, size, integrity } = opts
  20. if (algorithms && algorithms.length > 1)
  21. throw new Error('opts.algorithms only supports a single algorithm for now')
  22. if (typeof size === 'number' && data.length !== size)
  23. return Promise.reject(sizeError(size, data.length))
  24. const sri = ssri.fromData(data, algorithms ? { algorithms } : {})
  25. if (integrity && !ssri.checkData(data, integrity, opts))
  26. return Promise.reject(checksumError(integrity, sri))
  27. return disposer(makeTmp(cache, opts), makeTmpDisposer,
  28. (tmp) => {
  29. return writeFile(tmp.target, data, { flag: 'wx' })
  30. .then(() => moveToDestination(tmp, cache, sri, opts))
  31. })
  32. .then(() => ({ integrity: sri, size: data.length }))
  33. }
  34. module.exports.stream = writeStream
  35. // writes proxied to the 'inputStream' that is passed to the Promise
  36. // 'end' is deferred until content is handled.
  37. class CacacheWriteStream extends Flush {
  38. constructor (cache, opts) {
  39. super()
  40. this.opts = opts
  41. this.cache = cache
  42. this.inputStream = new Minipass()
  43. this.inputStream.on('error', er => this.emit('error', er))
  44. this.inputStream.on('drain', () => this.emit('drain'))
  45. this.handleContentP = null
  46. }
  47. write (chunk, encoding, cb) {
  48. if (!this.handleContentP) {
  49. this.handleContentP = handleContent(
  50. this.inputStream,
  51. this.cache,
  52. this.opts
  53. )
  54. }
  55. return this.inputStream.write(chunk, encoding, cb)
  56. }
  57. flush (cb) {
  58. this.inputStream.end(() => {
  59. if (!this.handleContentP) {
  60. const e = new Error('Cache input stream was empty')
  61. e.code = 'ENODATA'
  62. // empty streams are probably emitting end right away.
  63. // defer this one tick by rejecting a promise on it.
  64. return Promise.reject(e).catch(cb)
  65. }
  66. this.handleContentP.then(
  67. (res) => {
  68. res.integrity && this.emit('integrity', res.integrity)
  69. res.size !== null && this.emit('size', res.size)
  70. cb()
  71. },
  72. (er) => cb(er)
  73. )
  74. })
  75. }
  76. }
  77. function writeStream (cache, opts = {}) {
  78. return new CacacheWriteStream(cache, opts)
  79. }
  80. function handleContent (inputStream, cache, opts) {
  81. return disposer(makeTmp(cache, opts), makeTmpDisposer, (tmp) => {
  82. return pipeToTmp(inputStream, cache, tmp.target, opts)
  83. .then((res) => {
  84. return moveToDestination(
  85. tmp,
  86. cache,
  87. res.integrity,
  88. opts
  89. ).then(() => res)
  90. })
  91. })
  92. }
  93. function pipeToTmp (inputStream, cache, tmpTarget, opts) {
  94. let integrity
  95. let size
  96. const hashStream = ssri.integrityStream({
  97. integrity: opts.integrity,
  98. algorithms: opts.algorithms,
  99. size: opts.size,
  100. })
  101. hashStream.on('integrity', i => {
  102. integrity = i
  103. })
  104. hashStream.on('size', s => {
  105. size = s
  106. })
  107. const outStream = new fsm.WriteStream(tmpTarget, {
  108. flags: 'wx',
  109. })
  110. // NB: this can throw if the hashStream has a problem with
  111. // it, and the data is fully written. but pipeToTmp is only
  112. // called in promisory contexts where that is handled.
  113. const pipeline = new Pipeline(
  114. inputStream,
  115. hashStream,
  116. outStream
  117. )
  118. return pipeline.promise()
  119. .then(() => ({ integrity, size }))
  120. .catch(er => rimraf(tmpTarget).then(() => {
  121. throw er
  122. }))
  123. }
  124. function makeTmp (cache, opts) {
  125. const tmpTarget = uniqueFilename(path.join(cache, 'tmp'), opts.tmpPrefix)
  126. return fixOwner.mkdirfix(cache, path.dirname(tmpTarget)).then(() => ({
  127. target: tmpTarget,
  128. moved: false,
  129. }))
  130. }
  131. function makeTmpDisposer (tmp) {
  132. if (tmp.moved)
  133. return Promise.resolve()
  134. return rimraf(tmp.target)
  135. }
  136. function moveToDestination (tmp, cache, sri, opts) {
  137. const destination = contentPath(cache, sri)
  138. const destDir = path.dirname(destination)
  139. return fixOwner
  140. .mkdirfix(cache, destDir)
  141. .then(() => {
  142. return moveFile(tmp.target, destination)
  143. })
  144. .then(() => {
  145. tmp.moved = true
  146. return fixOwner.chownr(cache, destination)
  147. })
  148. }
  149. function sizeError (expected, found) {
  150. const err = new Error(`Bad data size: expected inserted data to be ${expected} bytes, but got ${found} instead`)
  151. err.expected = expected
  152. err.found = found
  153. err.code = 'EBADSIZE'
  154. return err
  155. }
  156. function checksumError (expected, found) {
  157. const err = new Error(`Integrity check failed:
  158. Wanted: ${expected}
  159. Found: ${found}`)
  160. err.code = 'EINTEGRITY'
  161. err.expected = expected
  162. err.found = found
  163. return err
  164. }