Skip to content

Commit

Permalink
Merge pull request #31 from resin-os/pull-combined-progress
Browse files Browse the repository at this point in the history
Print combined progress when pulling
  • Loading branch information
petrosagg authored Oct 4, 2017
2 parents af0c925 + 36ea648 commit 7d07e4f
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 4 deletions.
4 changes: 4 additions & 0 deletions distribution/pull_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,3 +370,7 @@ func (ld *v1LayerDescriptor) Registered(diffID layer.DiffID) {
// Cache mapping from this layer's DiffID to the blobsum
ld.v1IDService.Set(ld.v1LayerID, ld.indexName, diffID)
}

func (ld *v1LayerDescriptor) Size() int64 {
return ld.layerSize
}
4 changes: 4 additions & 0 deletions distribution/pull_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,10 @@ func (ld *v2LayerDescriptor) Registered(diffID layer.DiffID) {
ld.V2MetadataService.Add(diffID, metadata.V2Metadata{Digest: ld.digest, SourceRepository: ld.repoInfo.Name.Name()})
}

func (ld *v2LayerDescriptor) Size() int64 {
return ld.src.Size
}

func (p *v2Puller) pullV2Tag(ctx context.Context, ref reference.Named) (tagUpdated bool, err error) {
manSvc, err := p.repo.Manifests(ctx)
if err != nil {
Expand Down
12 changes: 8 additions & 4 deletions distribution/xfer/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type DownloadDescriptor interface {
// if it is unknown (for example, if it has not been downloaded
// before).
DiffID() (layer.DiffID, error)
Size() int64
// Download is called to perform the download.
Download(ctx context.Context, progressOutput progress.Output) (io.ReadCloser, int64, error)
// Return the DeltaBase if any
Expand Down Expand Up @@ -114,6 +115,8 @@ func (ldm *LayerDownloadManager) Download(ctx context.Context, initialRootFS ima
platform = layer.Platform(runtime.GOOS)
}

totalProgress := progress.NewProgressSink(progressOutput, 0, "Total", "")

rootFS := initialRootFS
for _, descriptor := range layers {
key := descriptor.Key()
Expand Down Expand Up @@ -159,13 +162,14 @@ func (ldm *LayerDownloadManager) Download(ctx context.Context, initialRootFS ima

// Layer is not known to exist - download and register it.
progress.Update(progressOutput, descriptor.ID(), "Pulling fs layer")
totalProgress.Size += descriptor.Size()

var xferFunc DoFunc
if topDownload != nil {
xferFunc = ldm.makeDownloadFunc(descriptor, "", topDownload, platform)
xferFunc = ldm.makeDownloadFunc(descriptor, "", topDownload, platform, totalProgress)
defer topDownload.Transfer.Release(watcher)
} else {
xferFunc = ldm.makeDownloadFunc(descriptor, rootFS.ChainID(), nil, platform)
xferFunc = ldm.makeDownloadFunc(descriptor, rootFS.ChainID(), nil, platform, totalProgress)
}
topDownloadUncasted, watcher = ldm.tm.Transfer(transferKey, xferFunc, progressOutput)
topDownload = topDownloadUncasted.(*downloadTransfer)
Expand Down Expand Up @@ -222,7 +226,7 @@ func (ldm *LayerDownloadManager) Download(ctx context.Context, initialRootFS ima
// complete before the registration step, and registers the downloaded data
// on top of parentDownload's resulting layer. Otherwise, it registers the
// layer on top of the ChainID given by parentLayer.
func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor, parentLayer layer.ChainID, parentDownload *downloadTransfer, platform layer.Platform) DoFunc {
func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor, parentLayer layer.ChainID, parentDownload *downloadTransfer, platform layer.Platform, totalProgress io.Writer) DoFunc {
return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer {
d := &downloadTransfer{
Transfer: NewTransfer(),
Expand Down Expand Up @@ -334,7 +338,7 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor,
reader := progress.NewProgressReader(ioutils.NewCancelReadCloser(d.Transfer.Context(), downloadReader), progressOutput, size, descriptor.ID(), "Extracting")
defer reader.Close()

inflatedLayerData, err := archive.DecompressStream(reader)
inflatedLayerData, err := archive.DecompressStream(io.TeeReader(reader, totalProgress))
if err != nil {
d.err = fmt.Errorf("could not get decompression stream: %v", err)
return
Expand Down
53 changes: 53 additions & 0 deletions pkg/progress/progresssink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package progress

import (
"time"
"sync"

"golang.org/x/time/rate"
)

type Sink struct {
sync.Mutex
out Output // Where to send progress bar to
Size int64
current int64
lastUpdate int64
id string
action string
rateLimiter *rate.Limiter
}

// NewProgressSink creates a new ProgressSink.
func NewProgressSink(out Output, size int64, id, action string) *Sink {
return &Sink{
out: out,
Size: size,
id: id,
action: action,
rateLimiter: rate.NewLimiter(rate.Every(100*time.Millisecond), 1),
}
}

func (p *Sink) Write(buf []byte) (int, error) {
p.Lock()
defer p.Unlock()

n := len(buf)
p.current += int64(n)
updateEvery := int64(1024 * 512) //512kB
if p.Size > 0 {
// Update progress for every 1% if 1% < 512kB
if increment := int64(0.01 * float64(p.Size)); increment < updateEvery {
updateEvery = increment
}
}
if p.current-p.lastUpdate > updateEvery || p.current == p.Size {
if p.current == p.Size || p.rateLimiter.Allow() {
p.out.WriteProgress(Progress{ID: p.id, Action: p.action, Current: p.current, Total: p.Size, LastUpdate: p.current == p.Size})
}
p.lastUpdate = p.current
}

return n, nil
}

0 comments on commit 7d07e4f

Please sign in to comment.