Skip to content

Commit

Permalink
repository: Limit to a single pending pack file
Browse files Browse the repository at this point in the history
Use only a single not completed pack file to keep the number of open and
active pack files low. The main change here is to defer hashing the pack
file to the upload step. This prevents the pack assembly step to become
a bottleneck as the only task is now to write data to the temporary pack
file.

The tests are cleaned up to no longer reimplement packer manager
functions.
  • Loading branch information
MichaelEischer committed Jun 5, 2022
1 parent 717ef6b commit 5d5aa68
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 169 deletions.
135 changes: 64 additions & 71 deletions internal/repository/packer_manager.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package repository

import (
"bufio"
"context"
"hash"
"io"
"io/ioutil"
"os"
"runtime"
"sync"
Expand All @@ -23,57 +24,64 @@ import (
// Packer holds a pack.Packer together with a hash writer.
type Packer struct {
*pack.Packer
hw *hashing.Writer
beHw *hashing.Writer
tmpfile *os.File
bufWr *bufio.Writer
}

// packerManager keeps a list of open packs and creates new on demand.
type packerManager struct {
tpe restic.BlobType
key *crypto.Key
hasherFn func() hash.Hash
queueFn func(ctx context.Context, t restic.BlobType, p *Packer) error
tpe restic.BlobType
key *crypto.Key
queueFn func(ctx context.Context, t restic.BlobType, p *Packer) error

pm sync.Mutex
packers []*Packer
pm sync.Mutex
packer *Packer
}

const minPackSize = 4 * 1024 * 1024

// newPackerManager returns an new packer manager which writes temporary files
// to a temporary directory
func newPackerManager(key *crypto.Key, hasherFn func() hash.Hash, tpe restic.BlobType, queueFn func(ctx context.Context, t restic.BlobType, p *Packer) error) *packerManager {
func newPackerManager(key *crypto.Key, tpe restic.BlobType, queueFn func(ctx context.Context, t restic.BlobType, p *Packer) error) *packerManager {
return &packerManager{
tpe: tpe,
key: key,
hasherFn: hasherFn,
queueFn: queueFn,
tpe: tpe,
key: key,
queueFn: queueFn,
}
}

func (r *packerManager) Flush(ctx context.Context) error {
r.pm.Lock()
defer r.pm.Unlock()

debug.Log("manually flushing %d packs", len(r.packers))
for _, packer := range r.packers {
err := r.queueFn(ctx, r.tpe, packer)
if r.packer != nil {
debug.Log("manually flushing pending pack")
err := r.queueFn(ctx, r.tpe, r.packer)
if err != nil {
return err
}
r.packer = nil
}
r.packers = r.packers[:0]
return nil
}

func (r *packerManager) SaveBlob(ctx context.Context, t restic.BlobType, id restic.ID, ciphertext []byte, uncompressedLength int) error {
packer, err := r.findPacker()
if err != nil {
return err
r.pm.Lock()
defer r.pm.Unlock()

var err error
packer := r.packer
if r.packer == nil {
packer, err = r.newPacker()
if err != nil {
return err
}
}
// remember packer
r.packer = packer

// save ciphertext
// Add only appends bytes in memory to avoid being a scaling bottleneck
_, err = packer.Add(t, id, ciphertext, uncompressedLength)
if err != nil {
return err
Expand All @@ -82,87 +90,80 @@ func (r *packerManager) SaveBlob(ctx context.Context, t restic.BlobType, id rest
// if the pack is not full enough, put back to the list
if packer.Size() < minPackSize {
debug.Log("pack is not full enough (%d bytes)", packer.Size())
r.insertPacker(packer)
return nil
}
// forget full packer
r.packer = nil

// call while holding lock to prevent findPacker from creating new packers if the uploaders are busy
// else write the pack to the backend
return r.queueFn(ctx, t, packer)
}

// findPacker returns a packer for a new blob of size bytes. Either a new one is
// created or one is returned that already has some blobs.
func (r *packerManager) findPacker() (packer *Packer, err error) {
r.pm.Lock()
defer r.pm.Unlock()

// search for a suitable packer
if len(r.packers) > 0 {
p := r.packers[0]
last := len(r.packers) - 1
r.packers[0] = r.packers[last]
r.packers[last] = nil // Allow GC of stale reference.
r.packers = r.packers[:last]
return p, nil
}

// no suitable packer found, return new
func (r *packerManager) newPacker() (packer *Packer, err error) {
debug.Log("create new pack")
tmpfile, err := fs.TempFile("", "restic-temp-pack-")
if err != nil {
return nil, errors.Wrap(err, "fs.TempFile")
}

w := io.Writer(tmpfile)
beHasher := r.hasherFn()
var beHw *hashing.Writer
if beHasher != nil {
beHw = hashing.NewWriter(w, beHasher)
w = beHw
}

hw := hashing.NewWriter(w, sha256.New())
p := pack.NewPacker(r.key, hw)
bufWr := bufio.NewWriter(tmpfile)
p := pack.NewPacker(r.key, bufWr)
packer = &Packer{
Packer: p,
beHw: beHw,
hw: hw,
tmpfile: tmpfile,
bufWr: bufWr,
}

return packer, nil
}

// insertPacker appends p to s.packs.
func (r *packerManager) insertPacker(p *Packer) {
r.pm.Lock()
defer r.pm.Unlock()

r.packers = append(r.packers, p)
debug.Log("%d packers\n", len(r.packers))
}

// savePacker stores p in the backend.
func (r *Repository) savePacker(ctx context.Context, t restic.BlobType, p *Packer) error {
debug.Log("save packer for %v with %d blobs (%d bytes)\n", t, p.Packer.Count(), p.Packer.Size())
_, err := p.Packer.Finalize()
if err != nil {
return err
}
err = p.bufWr.Flush()
if err != nil {
return err
}

// calculate sha256 hash in a second pass
var rd io.Reader
rd, err = restic.NewFileReader(p.tmpfile, nil)
if err != nil {
return err
}
beHasher := r.be.Hasher()
var beHr hashing.ReadSumer
if beHasher != nil {
beHr = hashing.NewReader(rd, beHasher)
rd = beHr
}

id := restic.IDFromHash(p.hw.Sum(nil))
hr := hashing.NewReader(rd, sha256.New())
_, err = io.Copy(ioutil.Discard, hr)
if err != nil {
return err
}

id := restic.IDFromHash(hr.Sum(nil))
h := restic.Handle{Type: restic.PackFile, Name: id.String(),
ContainedBlobType: t}
var beHash []byte
if p.beHw != nil {
beHash = p.beHw.Sum(nil)
if beHr != nil {
beHash = beHr.Sum(nil)
}
rd, err := restic.NewFileReader(p.tmpfile, beHash)
rrd, err := restic.NewFileReader(p.tmpfile, beHash)
if err != nil {
return err
}

err = r.be.Save(ctx, h, rd)
err = r.be.Save(ctx, h, rrd)
if err != nil {
debug.Log("Save(%v) error: %v", h, err)
return err
Expand Down Expand Up @@ -193,11 +194,3 @@ func (r *Repository) savePacker(ctx context.Context, t restic.BlobType, p *Packe
}
return r.SaveFullIndex(ctx)
}

// countPacker returns the number of open (unfinished) packers.
func (r *packerManager) countPacker() int {
r.pm.Lock()
defer r.pm.Unlock()

return len(r.packers)
}
Loading

0 comments on commit 5d5aa68

Please sign in to comment.