Skip to content

Commit

Permalink
Improvement/import tool progress indication (#804)
Browse files Browse the repository at this point in the history
* show when progress is done, add commit indication

* import tool usability

* remove import spaces

* triger GH actions

* CR fix: do not use pointers for progress counters
  • Loading branch information
johnnyaug authored Oct 13, 2020
1 parent 58ab9d1 commit dbba26a
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 43 deletions.
10 changes: 7 additions & 3 deletions block/s3/inventory_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@ func NewInventoryIterator(inv *Inventory) *InventoryIterator {
creationTimestamp = 0
}
t := time.Unix(creationTimestamp/int64(time.Second/time.Millisecond), 0)
inventoryFileProgress := cmdutils.NewActiveProgress(fmt.Sprintf("Inventory (%s) Files Read", t.Format("2006-01-02")), cmdutils.Bar)
inventoryFileProgress.SetTotal(int64(len(inv.Manifest.Files)))
return &InventoryIterator{
Inventory: inv,
inventoryFileIndex: -1,
inventoryFileProgress: cmdutils.NewProgress(fmt.Sprintf("Inventory (%s) Files Read", t.Format("2006-01-02")), int64(len(inv.Manifest.Files))),
currentFileProgress: cmdutils.NewProgress(fmt.Sprintf("Inventory (%s) Current File", t.Format("2006-01-02")), 0),
inventoryFileProgress: inventoryFileProgress,
currentFileProgress: cmdutils.NewActiveProgress(fmt.Sprintf("Inventory (%s) Current File", t.Format("2006-01-02")), cmdutils.Bar),
}
}

Expand All @@ -52,14 +54,16 @@ func (it *InventoryIterator) Next() bool {
it.err = ErrInventoryNotSorted
return false
}
it.currentFileProgress.Incr()
it.currentFileProgress.SetCurrent(int64(it.valIndexInBuffer + 1))
it.val = val
return true
}
// value not found in buffer, need to reload the buffer
it.valIndexInBuffer = -1
if !it.moveToNextInventoryFile() {
// no more files left
it.inventoryFileProgress.SetCompleted(true)
it.currentFileProgress.SetCompleted(true)
return false
}
if !it.fillBuffer() {
Expand Down
5 changes: 2 additions & 3 deletions cmd/lakefs/cmd/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,14 @@ import (

"github.com/jedib0t/go-pretty/text"
"github.com/spf13/cobra"
"github.com/treeverse/lakefs/cmdutils"
"github.com/treeverse/lakefs/uri"

"github.com/treeverse/lakefs/block/factory"
"github.com/treeverse/lakefs/catalog"
"github.com/treeverse/lakefs/cmdutils"
"github.com/treeverse/lakefs/config"
"github.com/treeverse/lakefs/db"
"github.com/treeverse/lakefs/logging"
"github.com/treeverse/lakefs/onboard"
"github.com/treeverse/lakefs/uri"
)

const (
Expand Down
102 changes: 70 additions & 32 deletions cmdutils/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,41 @@ import (
"sync/atomic"
"time"

"github.com/jedib0t/go-pretty/text"
"github.com/vbauerster/mpb/v5"
"github.com/vbauerster/mpb/v5/decor"
)

const (
progressCounterFormat = "%d / %d ["
spinnerCounterFormat = "%d ["
progressCounterFormat = "%d / %d "
spinnerCounterFormat = "%d "
spinnerNoCounterFormat = ""
progressSuffix = "]"
progressPrefix = "["
progressBarWidth = 60
progressBarNameColumnWidth = 40
progressBarCounterColumnWidth = 20
progressBarStyle = " =>- <"
progressRefreshRate = 50 * time.Millisecond
)

const (
Bar = iota
Spinner
SpinnerNoCounter
)

type ProgressReporter interface {
Progress() []*Progress
}

type Progress struct {
label string
current *int64
total *int64
label string
current int64
total int64
completed bool
active bool
progressType int
}

type MultiBar struct {
Expand All @@ -36,40 +48,62 @@ type MultiBar struct {
ticker *time.Ticker
}

func NewProgress(label string, total int64) *Progress {
func NewProgress(label string, progressType int) *Progress {
total := 0
if progressType == Spinner || progressType == SpinnerNoCounter {
total = -1
}
return &Progress{
label: label,
current: new(int64),
total: &total,
label: label,
total: int64(total),
progressType: progressType,
}
}

func NewActiveProgress(label string, progressType int) *Progress {
res := NewProgress(label, progressType)
res.Activate()
return res
}

func (p *Progress) Label() string {
return p.label
}

func (p *Progress) Current() int64 {
return atomic.LoadInt64(p.current)
return atomic.LoadInt64(&p.current)
}

func (p *Progress) Total() int64 {
return atomic.LoadInt64(p.total)
return atomic.LoadInt64(&p.total)
}

func (p *Progress) Incr() {
atomic.AddInt64(p.current, 1)
atomic.AddInt64(&p.current, 1)
}

func (p *Progress) Add(n int64) {
atomic.AddInt64(p.current, n)
atomic.AddInt64(&p.current, n)
}

func (p *Progress) SetCurrent(n int64) {
atomic.StoreInt64(p.current, n)
atomic.StoreInt64(&p.current, n)
}

func (p *Progress) SetTotal(n int64) {
atomic.StoreInt64(p.total, n)
atomic.StoreInt64(&p.total, n)
}

func (p *Progress) Completed() bool {
return p.completed
}

func (p *Progress) SetCompleted(completed bool) {
p.completed = completed
}

func (p *Progress) Activate() {
p.active = true
}

func NewMultiBar(r ProgressReporter) *MultiBar {
Expand All @@ -93,41 +127,45 @@ func (b *MultiBar) Stop() {
b.mpb.Wait()
}

func (b *MultiBar) refresh(isCompleted bool) {
func (b *MultiBar) refresh(isStop bool) {
progress := b.reporter.Progress()
for _, p := range progress {
if p == nil || !p.active {
continue
}
total := p.Total()
bar, ok := b.mpbBars[p.label]
if !ok {
bar = createBar(b.mpb, p, total)
b.mpbBars[p.label] = bar
}
bar.SetTotal(total, isCompleted)
if !isCompleted {
bar.SetCurrent(p.Current())
if !isStop {
bar.SetTotal(total, p.Completed())
} else {
bar.SetCurrent(total)
bar.SetTotal(p.Current(), true)
}
bar.SetCurrent(p.Current())
}
}

func createBar(m *mpb.Progress, p *Progress, total int64) *mpb.Bar {
var bar *mpb.Bar
isSpinner := false
if total < 0 {
isSpinner = true
}
labelDecorator := decor.Name(p.label, decor.WC{W: progressBarNameColumnWidth, C: decor.DidentRight})
suffixOption := mpb.AppendDecorators(decor.Name(progressSuffix))
if isSpinner {
suffixOption := mpb.AppendDecorators(decor.OnComplete(decor.Name(progressSuffix), text.FgGreen.Sprintf(" done")))
if p.progressType == Spinner || p.progressType == SpinnerNoCounter {
// unknown total, render a spinner
bar = m.AddSpinner(total, mpb.SpinnerOnMiddle, suffixOption,
mpb.PrependDecorators(labelDecorator,
decor.CurrentNoUnit(spinnerCounterFormat, decor.WC{W: progressBarCounterColumnWidth})))
} else {
bar = m.AddBar(total, mpb.BarStyle(progressBarStyle), suffixOption,
var counterDecorator decor.Decorator
if p.progressType == Spinner {
counterDecorator = decor.CurrentNoUnit(spinnerCounterFormat, decor.WC{W: progressBarCounterColumnWidth})
} else {
counterDecorator = decor.Name(spinnerNoCounterFormat, decor.WC{W: progressBarCounterColumnWidth})
}
bar = m.AddSpinner(total, mpb.SpinnerOnMiddle, mpb.BarFillerClearOnComplete(), suffixOption, mpb.PrependDecorators(labelDecorator, counterDecorator, decor.OnComplete(decor.Name(progressPrefix), "")))
} else { // type == Bar
bar = m.AddBar(total, mpb.BarStyle(progressBarStyle), mpb.BarFillerClearOnComplete(), suffixOption,
mpb.PrependDecorators(labelDecorator,
decor.CountersNoUnit(progressCounterFormat, decor.WC{W: progressBarCounterColumnWidth})))
decor.CountersNoUnit(progressCounterFormat, decor.WC{W: progressBarCounterColumnWidth}),
decor.OnComplete(decor.Name(progressPrefix), "")))
}
return bar
}
17 changes: 13 additions & 4 deletions onboard/catalog_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ type CatalogRepoActions struct {
logger logging.Logger
deletedProgress *cmdutils.Progress
addedProgress *cmdutils.Progress
commitProgress *cmdutils.Progress
}

func (c *CatalogRepoActions) Progress() []*cmdutils.Progress {
return []*cmdutils.Progress{c.addedProgress, c.deletedProgress}
return []*cmdutils.Progress{c.addedProgress, c.deletedProgress, c.commitProgress}
}

func NewCatalogActions(cataloger catalog.Cataloger, repository string, committer string, logger logging.Logger) *CatalogRepoActions {
Expand All @@ -45,8 +46,9 @@ func NewCatalogActions(cataloger catalog.Cataloger, repository string, committer
repository: repository,
committer: committer,
logger: logger,
addedProgress: cmdutils.NewProgress("Objects Added or Changed", -1),
deletedProgress: cmdutils.NewProgress("Objects Deleted", -1),
addedProgress: cmdutils.NewActiveProgress("Objects Added or Changed", cmdutils.Spinner),
deletedProgress: cmdutils.NewActiveProgress("Objects Deleted", cmdutils.Spinner),
commitProgress: cmdutils.NewProgress("Committing Changes", cmdutils.SpinnerNoCounter),
}
}

Expand Down Expand Up @@ -137,6 +139,8 @@ func (c *CatalogRepoActions) ApplyImport(ctx context.Context, it Iterator, dryRu
}
}
c.addedProgress.Add(int64(len(currentBatch)))
c.addedProgress.SetCompleted(true)
c.deletedProgress.SetCompleted(true)
return &stats, nil
}

Expand All @@ -159,8 +163,13 @@ func (c *CatalogRepoActions) GetPreviousCommit(ctx context.Context) (commit *cat
}

func (c *CatalogRepoActions) Commit(ctx context.Context, commitMsg string, metadata catalog.Metadata) (*catalog.CommitLog, error) {
return c.cataloger.Commit(ctx, c.repository, DefaultBranchName,
c.commitProgress.Activate()
res, err := c.cataloger.Commit(ctx, c.repository, DefaultBranchName,
commitMsg,
c.committer,
metadata)
if err == nil {
c.commitProgress.SetCompleted(true)
}
return res, err
}
8 changes: 7 additions & 1 deletion onboard/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ type Stats struct {
PreviousImportDate time.Time
}

var ErrNoInventoryURL = errors.New("no inventory_url in commit Metadata")
var (
ErrNoInventoryURL = errors.New("no inventory_url in commit Metadata")
ErrInventoryAlreadyImported = errors.New("given inventory was already imported")
)

func CreateImporter(ctx context.Context, logger logging.Logger, config *Config) (importer *Importer, err error) {
res := &Importer{
Expand Down Expand Up @@ -74,6 +77,9 @@ func (s *Importer) diffIterator(ctx context.Context, commit catalog.CommitLog) (
if previousInventoryURL == "" {
return nil, fmt.Errorf("%w. commit_ref=%s", ErrNoInventoryURL, commit.Reference)
}
if previousInventoryURL == s.inventory.InventoryURL() {
return nil, fmt.Errorf("%w. commit_ref=%s", ErrInventoryAlreadyImported, commit.Reference)
}
previousInv, err := s.inventoryGenerator.GenerateInventory(ctx, s.logger, previousInventoryURL, true)
if err != nil {
return nil, fmt.Errorf("failed to create inventory for previous state: %w", err)
Expand Down

0 comments on commit dbba26a

Please sign in to comment.