123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189 |
- 'use strict'
- const util = require('util')
- const contentPath = require('./path')
- const fixOwner = require('../util/fix-owner')
- const fs = require('fs')
- const moveFile = require('../util/move-file')
- const Minipass = require('minipass')
- const Pipeline = require('minipass-pipeline')
- const Flush = require('minipass-flush')
- const path = require('path')
- const rimraf = util.promisify(require('rimraf'))
- const ssri = require('ssri')
- const uniqueFilename = require('unique-filename')
- const { disposer } = require('./../util/disposer')
- const fsm = require('fs-minipass')
- const writeFile = util.promisify(fs.writeFile)
- module.exports = write
- function write (cache, data, opts = {}) {
- const { algorithms, size, integrity } = opts
- if (algorithms && algorithms.length > 1)
- throw new Error('opts.algorithms only supports a single algorithm for now')
- if (typeof size === 'number' && data.length !== size)
- return Promise.reject(sizeError(size, data.length))
- const sri = ssri.fromData(data, algorithms ? { algorithms } : {})
- if (integrity && !ssri.checkData(data, integrity, opts))
- return Promise.reject(checksumError(integrity, sri))
- return disposer(makeTmp(cache, opts), makeTmpDisposer,
- (tmp) => {
- return writeFile(tmp.target, data, { flag: 'wx' })
- .then(() => moveToDestination(tmp, cache, sri, opts))
- })
- .then(() => ({ integrity: sri, size: data.length }))
- }
- module.exports.stream = writeStream
- // writes proxied to the 'inputStream' that is passed to the Promise
- // 'end' is deferred until content is handled.
- class CacacheWriteStream extends Flush {
- constructor (cache, opts) {
- super()
- this.opts = opts
- this.cache = cache
- this.inputStream = new Minipass()
- this.inputStream.on('error', er => this.emit('error', er))
- this.inputStream.on('drain', () => this.emit('drain'))
- this.handleContentP = null
- }
- write (chunk, encoding, cb) {
- if (!this.handleContentP) {
- this.handleContentP = handleContent(
- this.inputStream,
- this.cache,
- this.opts
- )
- }
- return this.inputStream.write(chunk, encoding, cb)
- }
- flush (cb) {
- this.inputStream.end(() => {
- if (!this.handleContentP) {
- const e = new Error('Cache input stream was empty')
- e.code = 'ENODATA'
- // empty streams are probably emitting end right away.
- // defer this one tick by rejecting a promise on it.
- return Promise.reject(e).catch(cb)
- }
- this.handleContentP.then(
- (res) => {
- res.integrity && this.emit('integrity', res.integrity)
- res.size !== null && this.emit('size', res.size)
- cb()
- },
- (er) => cb(er)
- )
- })
- }
- }
- function writeStream (cache, opts = {}) {
- return new CacacheWriteStream(cache, opts)
- }
- function handleContent (inputStream, cache, opts) {
- return disposer(makeTmp(cache, opts), makeTmpDisposer, (tmp) => {
- return pipeToTmp(inputStream, cache, tmp.target, opts)
- .then((res) => {
- return moveToDestination(
- tmp,
- cache,
- res.integrity,
- opts
- ).then(() => res)
- })
- })
- }
- function pipeToTmp (inputStream, cache, tmpTarget, opts) {
- let integrity
- let size
- const hashStream = ssri.integrityStream({
- integrity: opts.integrity,
- algorithms: opts.algorithms,
- size: opts.size,
- })
- hashStream.on('integrity', i => {
- integrity = i
- })
- hashStream.on('size', s => {
- size = s
- })
- const outStream = new fsm.WriteStream(tmpTarget, {
- flags: 'wx',
- })
- // NB: this can throw if the hashStream has a problem with
- // it, and the data is fully written. but pipeToTmp is only
- // called in promisory contexts where that is handled.
- const pipeline = new Pipeline(
- inputStream,
- hashStream,
- outStream
- )
- return pipeline.promise()
- .then(() => ({ integrity, size }))
- .catch(er => rimraf(tmpTarget).then(() => {
- throw er
- }))
- }
- function makeTmp (cache, opts) {
- const tmpTarget = uniqueFilename(path.join(cache, 'tmp'), opts.tmpPrefix)
- return fixOwner.mkdirfix(cache, path.dirname(tmpTarget)).then(() => ({
- target: tmpTarget,
- moved: false,
- }))
- }
- function makeTmpDisposer (tmp) {
- if (tmp.moved)
- return Promise.resolve()
- return rimraf(tmp.target)
- }
- function moveToDestination (tmp, cache, sri, opts) {
- const destination = contentPath(cache, sri)
- const destDir = path.dirname(destination)
- return fixOwner
- .mkdirfix(cache, destDir)
- .then(() => {
- return moveFile(tmp.target, destination)
- })
- .then(() => {
- tmp.moved = true
- return fixOwner.chownr(cache, destination)
- })
- }
- function sizeError (expected, found) {
- const err = new Error(`Bad data size: expected inserted data to be ${expected} bytes, but got ${found} instead`)
- err.expected = expected
- err.found = found
- err.code = 'EBADSIZE'
- return err
- }
- function checksumError (expected, found) {
- const err = new Error(`Integrity check failed:
- Wanted: ${expected}
- Found: ${found}`)
- err.code = 'EINTEGRITY'
- err.expected = expected
- err.found = found
- return err
- }
|