From dbba26af6720875826fd22853623fbc53cdb4655 Mon Sep 17 00:00:00 2001 From: johnnyaug Date: Tue, 13 Oct 2020 13:51:31 +0300 Subject: [PATCH] Improvement/import tool progress indication (#804) * show when progress is done, add commit indication * import tool usability * remove import spaces * triger GH actions * CR fix: do not use pointers for progress counters --- block/s3/inventory_iterator.go | 10 +++- cmd/lakefs/cmd/import.go | 5 +- cmdutils/progress.go | 102 ++++++++++++++++++++++----------- onboard/catalog_actions.go | 17 ++++-- onboard/import.go | 8 ++- 5 files changed, 99 insertions(+), 43 deletions(-) diff --git a/block/s3/inventory_iterator.go b/block/s3/inventory_iterator.go index 247607445e6..f1002d069d8 100644 --- a/block/s3/inventory_iterator.go +++ b/block/s3/inventory_iterator.go @@ -31,11 +31,13 @@ func NewInventoryIterator(inv *Inventory) *InventoryIterator { creationTimestamp = 0 } t := time.Unix(creationTimestamp/int64(time.Second/time.Millisecond), 0) + inventoryFileProgress := cmdutils.NewActiveProgress(fmt.Sprintf("Inventory (%s) Files Read", t.Format("2006-01-02")), cmdutils.Bar) + inventoryFileProgress.SetTotal(int64(len(inv.Manifest.Files))) return &InventoryIterator{ Inventory: inv, inventoryFileIndex: -1, - inventoryFileProgress: cmdutils.NewProgress(fmt.Sprintf("Inventory (%s) Files Read", t.Format("2006-01-02")), int64(len(inv.Manifest.Files))), - currentFileProgress: cmdutils.NewProgress(fmt.Sprintf("Inventory (%s) Current File", t.Format("2006-01-02")), 0), + inventoryFileProgress: inventoryFileProgress, + currentFileProgress: cmdutils.NewActiveProgress(fmt.Sprintf("Inventory (%s) Current File", t.Format("2006-01-02")), cmdutils.Bar), } } @@ -52,7 +54,7 @@ func (it *InventoryIterator) Next() bool { it.err = ErrInventoryNotSorted return false } - it.currentFileProgress.Incr() + it.currentFileProgress.SetCurrent(int64(it.valIndexInBuffer + 1)) it.val = val return true } @@ -60,6 +62,8 @@ func (it *InventoryIterator) Next() bool { it.valIndexInBuffer = -1 if !it.moveToNextInventoryFile() { // no more files left + it.inventoryFileProgress.SetCompleted(true) + it.currentFileProgress.SetCompleted(true) return false } if !it.fillBuffer() { diff --git a/cmd/lakefs/cmd/import.go b/cmd/lakefs/cmd/import.go index 579b4831842..561c15c46ea 100644 --- a/cmd/lakefs/cmd/import.go +++ b/cmd/lakefs/cmd/import.go @@ -10,15 +10,14 @@ import ( "github.com/jedib0t/go-pretty/text" "github.com/spf13/cobra" - "github.com/treeverse/lakefs/cmdutils" - "github.com/treeverse/lakefs/uri" - "github.com/treeverse/lakefs/block/factory" "github.com/treeverse/lakefs/catalog" + "github.com/treeverse/lakefs/cmdutils" "github.com/treeverse/lakefs/config" "github.com/treeverse/lakefs/db" "github.com/treeverse/lakefs/logging" "github.com/treeverse/lakefs/onboard" + "github.com/treeverse/lakefs/uri" ) const ( diff --git a/cmdutils/progress.go b/cmdutils/progress.go index 7655ef3ab2b..3ce6c71b6d2 100644 --- a/cmdutils/progress.go +++ b/cmdutils/progress.go @@ -4,14 +4,17 @@ import ( "sync/atomic" "time" + "github.com/jedib0t/go-pretty/text" "github.com/vbauerster/mpb/v5" "github.com/vbauerster/mpb/v5/decor" ) const ( - progressCounterFormat = "%d / %d [" - spinnerCounterFormat = "%d [" + progressCounterFormat = "%d / %d " + spinnerCounterFormat = "%d " + spinnerNoCounterFormat = "" progressSuffix = "]" + progressPrefix = "[" progressBarWidth = 60 progressBarNameColumnWidth = 40 progressBarCounterColumnWidth = 20 @@ -19,14 +22,23 @@ const ( progressRefreshRate = 50 * time.Millisecond ) +const ( + Bar = iota + Spinner + SpinnerNoCounter +) + type ProgressReporter interface { Progress() []*Progress } type Progress struct { - label string - current *int64 - total *int64 + label string + current int64 + total int64 + completed bool + active bool + progressType int } type MultiBar struct { @@ -36,40 +48,62 @@ type MultiBar struct { ticker *time.Ticker } -func NewProgress(label string, total int64) *Progress { +func NewProgress(label string, progressType int) *Progress { + total := 0 + if progressType == Spinner || progressType == SpinnerNoCounter { + total = -1 + } return &Progress{ - label: label, - current: new(int64), - total: &total, + label: label, + total: int64(total), + progressType: progressType, } } +func NewActiveProgress(label string, progressType int) *Progress { + res := NewProgress(label, progressType) + res.Activate() + return res +} + func (p *Progress) Label() string { return p.label } func (p *Progress) Current() int64 { - return atomic.LoadInt64(p.current) + return atomic.LoadInt64(&p.current) } func (p *Progress) Total() int64 { - return atomic.LoadInt64(p.total) + return atomic.LoadInt64(&p.total) } func (p *Progress) Incr() { - atomic.AddInt64(p.current, 1) + atomic.AddInt64(&p.current, 1) } func (p *Progress) Add(n int64) { - atomic.AddInt64(p.current, n) + atomic.AddInt64(&p.current, n) } func (p *Progress) SetCurrent(n int64) { - atomic.StoreInt64(p.current, n) + atomic.StoreInt64(&p.current, n) } func (p *Progress) SetTotal(n int64) { - atomic.StoreInt64(p.total, n) + atomic.StoreInt64(&p.total, n) +} + +func (p *Progress) Completed() bool { + return p.completed +} + +func (p *Progress) SetCompleted(completed bool) { + p.completed = completed +} + +func (p *Progress) Activate() { + p.active = true } func NewMultiBar(r ProgressReporter) *MultiBar { @@ -93,41 +127,45 @@ func (b *MultiBar) Stop() { b.mpb.Wait() } -func (b *MultiBar) refresh(isCompleted bool) { +func (b *MultiBar) refresh(isStop bool) { progress := b.reporter.Progress() for _, p := range progress { + if p == nil || !p.active { + continue + } total := p.Total() bar, ok := b.mpbBars[p.label] if !ok { bar = createBar(b.mpb, p, total) b.mpbBars[p.label] = bar } - bar.SetTotal(total, isCompleted) - if !isCompleted { - bar.SetCurrent(p.Current()) + if !isStop { + bar.SetTotal(total, p.Completed()) } else { - bar.SetCurrent(total) + bar.SetTotal(p.Current(), true) } + bar.SetCurrent(p.Current()) } } func createBar(m *mpb.Progress, p *Progress, total int64) *mpb.Bar { var bar *mpb.Bar - isSpinner := false - if total < 0 { - isSpinner = true - } labelDecorator := decor.Name(p.label, decor.WC{W: progressBarNameColumnWidth, C: decor.DidentRight}) - suffixOption := mpb.AppendDecorators(decor.Name(progressSuffix)) - if isSpinner { + suffixOption := mpb.AppendDecorators(decor.OnComplete(decor.Name(progressSuffix), text.FgGreen.Sprintf(" done"))) + if p.progressType == Spinner || p.progressType == SpinnerNoCounter { // unknown total, render a spinner - bar = m.AddSpinner(total, mpb.SpinnerOnMiddle, suffixOption, - mpb.PrependDecorators(labelDecorator, - decor.CurrentNoUnit(spinnerCounterFormat, decor.WC{W: progressBarCounterColumnWidth}))) - } else { - bar = m.AddBar(total, mpb.BarStyle(progressBarStyle), suffixOption, + var counterDecorator decor.Decorator + if p.progressType == Spinner { + counterDecorator = decor.CurrentNoUnit(spinnerCounterFormat, decor.WC{W: progressBarCounterColumnWidth}) + } else { + counterDecorator = decor.Name(spinnerNoCounterFormat, decor.WC{W: progressBarCounterColumnWidth}) + } + bar = m.AddSpinner(total, mpb.SpinnerOnMiddle, mpb.BarFillerClearOnComplete(), suffixOption, mpb.PrependDecorators(labelDecorator, counterDecorator, decor.OnComplete(decor.Name(progressPrefix), ""))) + } else { // type == Bar + bar = m.AddBar(total, mpb.BarStyle(progressBarStyle), mpb.BarFillerClearOnComplete(), suffixOption, mpb.PrependDecorators(labelDecorator, - decor.CountersNoUnit(progressCounterFormat, decor.WC{W: progressBarCounterColumnWidth}))) + decor.CountersNoUnit(progressCounterFormat, decor.WC{W: progressBarCounterColumnWidth}), + decor.OnComplete(decor.Name(progressPrefix), ""))) } return bar } diff --git a/onboard/catalog_actions.go b/onboard/catalog_actions.go index f3af8b52cbb..44bdbae8a7e 100644 --- a/onboard/catalog_actions.go +++ b/onboard/catalog_actions.go @@ -33,10 +33,11 @@ type CatalogRepoActions struct { logger logging.Logger deletedProgress *cmdutils.Progress addedProgress *cmdutils.Progress + commitProgress *cmdutils.Progress } func (c *CatalogRepoActions) Progress() []*cmdutils.Progress { - return []*cmdutils.Progress{c.addedProgress, c.deletedProgress} + return []*cmdutils.Progress{c.addedProgress, c.deletedProgress, c.commitProgress} } func NewCatalogActions(cataloger catalog.Cataloger, repository string, committer string, logger logging.Logger) *CatalogRepoActions { @@ -45,8 +46,9 @@ func NewCatalogActions(cataloger catalog.Cataloger, repository string, committer repository: repository, committer: committer, logger: logger, - addedProgress: cmdutils.NewProgress("Objects Added or Changed", -1), - deletedProgress: cmdutils.NewProgress("Objects Deleted", -1), + addedProgress: cmdutils.NewActiveProgress("Objects Added or Changed", cmdutils.Spinner), + deletedProgress: cmdutils.NewActiveProgress("Objects Deleted", cmdutils.Spinner), + commitProgress: cmdutils.NewProgress("Committing Changes", cmdutils.SpinnerNoCounter), } } @@ -137,6 +139,8 @@ func (c *CatalogRepoActions) ApplyImport(ctx context.Context, it Iterator, dryRu } } c.addedProgress.Add(int64(len(currentBatch))) + c.addedProgress.SetCompleted(true) + c.deletedProgress.SetCompleted(true) return &stats, nil } @@ -159,8 +163,13 @@ func (c *CatalogRepoActions) GetPreviousCommit(ctx context.Context) (commit *cat } func (c *CatalogRepoActions) Commit(ctx context.Context, commitMsg string, metadata catalog.Metadata) (*catalog.CommitLog, error) { - return c.cataloger.Commit(ctx, c.repository, DefaultBranchName, + c.commitProgress.Activate() + res, err := c.cataloger.Commit(ctx, c.repository, DefaultBranchName, commitMsg, c.committer, metadata) + if err == nil { + c.commitProgress.SetCompleted(true) + } + return res, err } diff --git a/onboard/import.go b/onboard/import.go index c19a839e944..029ec29c6dd 100644 --- a/onboard/import.go +++ b/onboard/import.go @@ -45,7 +45,10 @@ type Stats struct { PreviousImportDate time.Time } -var ErrNoInventoryURL = errors.New("no inventory_url in commit Metadata") +var ( + ErrNoInventoryURL = errors.New("no inventory_url in commit Metadata") + ErrInventoryAlreadyImported = errors.New("given inventory was already imported") +) func CreateImporter(ctx context.Context, logger logging.Logger, config *Config) (importer *Importer, err error) { res := &Importer{ @@ -74,6 +77,9 @@ func (s *Importer) diffIterator(ctx context.Context, commit catalog.CommitLog) ( if previousInventoryURL == "" { return nil, fmt.Errorf("%w. commit_ref=%s", ErrNoInventoryURL, commit.Reference) } + if previousInventoryURL == s.inventory.InventoryURL() { + return nil, fmt.Errorf("%w. commit_ref=%s", ErrInventoryAlreadyImported, commit.Reference) + } previousInv, err := s.inventoryGenerator.GenerateInventory(ctx, s.logger, previousInventoryURL, true) if err != nil { return nil, fmt.Errorf("failed to create inventory for previous state: %w", err)