From d708b370b24ca68eb3970019d1744b3c1a5c9dc1 Mon Sep 17 00:00:00 2001 From: johnnyaug Date: Mon, 12 Oct 2020 15:52:46 +0300 Subject: [PATCH 1/5] show when progress is done, add commit indication --- block/s3/inventory_iterator.go | 2 +- cmdutils/progress.go | 31 ++++++++++++++++++++++++------- onboard/import.go | 2 ++ 3 files changed, 27 insertions(+), 8 deletions(-) diff --git a/block/s3/inventory_iterator.go b/block/s3/inventory_iterator.go index bd4c964af26..1361e940314 100644 --- a/block/s3/inventory_iterator.go +++ b/block/s3/inventory_iterator.go @@ -52,7 +52,7 @@ func (it *InventoryIterator) Next() bool { it.err = ErrInventoryNotSorted return false } - it.currentFileProgress.Incr() + it.currentFileProgress.SetCurrent(int64(it.valIndexInBuffer + 1)) it.val = val return true } diff --git a/cmdutils/progress.go b/cmdutils/progress.go index 7655ef3ab2b..d44faf02ee3 100644 --- a/cmdutils/progress.go +++ b/cmdutils/progress.go @@ -4,6 +4,8 @@ import ( "sync/atomic" "time" + "github.com/jedib0t/go-pretty/text" + "github.com/vbauerster/mpb/v5" "github.com/vbauerster/mpb/v5/decor" ) @@ -24,9 +26,10 @@ type ProgressReporter interface { } type Progress struct { - label string - current *int64 - total *int64 + label string + current *int64 + total *int64 + completed bool } type MultiBar struct { @@ -72,6 +75,14 @@ func (p *Progress) SetTotal(n int64) { atomic.StoreInt64(p.total, n) } +func (p *Progress) Completed() bool { + return p.completed +} + +func (p *Progress) SetCompleted(completed bool) { + p.completed = completed +} + func NewMultiBar(r ProgressReporter) *MultiBar { ticker := time.NewTicker(progressRefreshRate) m := mpb.New(mpb.WithWidth(progressBarWidth)) @@ -102,12 +113,13 @@ func (b *MultiBar) refresh(isCompleted bool) { bar = createBar(b.mpb, p, total) b.mpbBars[p.label] = bar } - bar.SetTotal(total, isCompleted) if !isCompleted { - bar.SetCurrent(p.Current()) + bar.SetTotal(total, false) } else { - bar.SetCurrent(total) + p.SetCompleted(true) + bar.SetTotal(p.Current(), true) } + bar.SetCurrent(p.Current()) } } @@ -118,7 +130,12 @@ func createBar(m *mpb.Progress, p *Progress, total int64) *mpb.Bar { isSpinner = true } labelDecorator := decor.Name(p.label, decor.WC{W: progressBarNameColumnWidth, C: decor.DidentRight}) - suffixOption := mpb.AppendDecorators(decor.Name(progressSuffix)) + suffixOption := mpb.AppendDecorators(decor.Name(progressSuffix), decor.Any(func(statistics decor.Statistics) string { + if p.Completed() { + return text.FgGreen.Sprintf(" done") + } + return "" + })) if isSpinner { // unknown total, render a spinner bar = m.AddSpinner(total, mpb.SpinnerOnMiddle, suffixOption, diff --git a/onboard/import.go b/onboard/import.go index c19a839e944..725902582d7 100644 --- a/onboard/import.go +++ b/onboard/import.go @@ -98,6 +98,7 @@ func (s *Importer) Import(ctx context.Context, dryRun bool) (*Stats, error) { } s.progress = append(dataToImport.Progress(), s.CatalogActions.Progress()...) stats, err := s.CatalogActions.ApplyImport(ctx, dataToImport, dryRun) + time.Sleep(10 * time.Second) if err != nil { return nil, err } @@ -108,6 +109,7 @@ func (s *Importer) Import(ctx context.Context, dryRun bool) (*Stats, error) { } if !dryRun { commitMetadata := CreateCommitMetadata(s.inventory, *stats) + s.progress = append(s.progress, cmdutils.NewProgress("Commit Changes", -1)) commitLog, err := s.CatalogActions.Commit(ctx, fmt.Sprintf(CommitMsgTemplate, s.inventory.SourceName()), commitMetadata) if err != nil { return nil, err From 9d6b294a8491d7679ec40288b52a5d13b549ddc1 Mon Sep 17 00:00:00 2001 From: johnnyaug Date: Mon, 12 Oct 2020 19:26:47 +0300 Subject: [PATCH 2/5] import tool usability --- block/s3/inventory_iterator.go | 8 +++- cmdutils/progress.go | 85 +++++++++++++++++++++------------- onboard/catalog_actions.go | 17 +++++-- onboard/import.go | 10 ++-- 4 files changed, 80 insertions(+), 40 deletions(-) diff --git a/block/s3/inventory_iterator.go b/block/s3/inventory_iterator.go index 1361e940314..76859f2fa96 100644 --- a/block/s3/inventory_iterator.go +++ b/block/s3/inventory_iterator.go @@ -31,11 +31,13 @@ func NewInventoryIterator(inv *Inventory) *InventoryIterator { creationTimestamp = 0 } t := time.Unix(creationTimestamp/int64(time.Second/time.Millisecond), 0) + inventoryFileProgress := cmdutils.NewActiveProgress(fmt.Sprintf("Inventory (%s) Files Read", t.Format("2006-01-02")), cmdutils.Bar) + inventoryFileProgress.SetTotal(int64(len(inv.Manifest.Files))) return &InventoryIterator{ Inventory: inv, inventoryFileIndex: -1, - inventoryFileProgress: cmdutils.NewProgress(fmt.Sprintf("Inventory (%s) Files Read", t.Format("2006-01-02")), int64(len(inv.Manifest.Files))), - currentFileProgress: cmdutils.NewProgress(fmt.Sprintf("Inventory (%s) Current File", t.Format("2006-01-02")), 0), + inventoryFileProgress: inventoryFileProgress, + currentFileProgress: cmdutils.NewActiveProgress(fmt.Sprintf("Inventory (%s) Current File", t.Format("2006-01-02")), cmdutils.Bar), } } @@ -60,6 +62,8 @@ func (it *InventoryIterator) Next() bool { it.valIndexInBuffer = -1 if !it.moveToNextInventoryFile() { // no more files left + it.inventoryFileProgress.SetCompleted(true) + it.currentFileProgress.SetCompleted(true) return false } if !it.fillBuffer() { diff --git a/cmdutils/progress.go b/cmdutils/progress.go index d44faf02ee3..f488b581d44 100644 --- a/cmdutils/progress.go +++ b/cmdutils/progress.go @@ -11,9 +11,11 @@ import ( ) const ( - progressCounterFormat = "%d / %d [" - spinnerCounterFormat = "%d [" + progressCounterFormat = "%d / %d " + spinnerCounterFormat = "%d " + spinnerNoCounterFormat = "" progressSuffix = "]" + progressPrefix = "[" progressBarWidth = 60 progressBarNameColumnWidth = 40 progressBarCounterColumnWidth = 20 @@ -21,15 +23,23 @@ const ( progressRefreshRate = 50 * time.Millisecond ) +const ( + Bar = iota + Spinner + SpinnerNoCounter +) + type ProgressReporter interface { Progress() []*Progress } type Progress struct { - label string - current *int64 - total *int64 - completed bool + label string + current *int64 + total *int64 + completed bool + active bool + progressType int } type MultiBar struct { @@ -39,14 +49,25 @@ type MultiBar struct { ticker *time.Ticker } -func NewProgress(label string, total int64) *Progress { +func NewProgress(label string, progressType int) *Progress { + total := int64(0) + if progressType == Spinner || progressType == SpinnerNoCounter { + total = int64(-1) + } return &Progress{ - label: label, - current: new(int64), - total: &total, + label: label, + current: new(int64), + total: &total, + progressType: progressType, } } +func NewActiveProgress(label string, progressType int) *Progress { + res := NewProgress(label, progressType) + res.Activate() + return res +} + func (p *Progress) Label() string { return p.label } @@ -83,6 +104,10 @@ func (p *Progress) SetCompleted(completed bool) { p.completed = completed } +func (p *Progress) Activate() { + p.active = true +} + func NewMultiBar(r ProgressReporter) *MultiBar { ticker := time.NewTicker(progressRefreshRate) m := mpb.New(mpb.WithWidth(progressBarWidth)) @@ -104,19 +129,21 @@ func (b *MultiBar) Stop() { b.mpb.Wait() } -func (b *MultiBar) refresh(isCompleted bool) { +func (b *MultiBar) refresh(isStop bool) { progress := b.reporter.Progress() for _, p := range progress { + if p == nil || !p.active { + continue + } total := p.Total() bar, ok := b.mpbBars[p.label] if !ok { bar = createBar(b.mpb, p, total) b.mpbBars[p.label] = bar } - if !isCompleted { - bar.SetTotal(total, false) + if !isStop { + bar.SetTotal(total, p.Completed()) } else { - p.SetCompleted(true) bar.SetTotal(p.Current(), true) } bar.SetCurrent(p.Current()) @@ -125,26 +152,22 @@ func (b *MultiBar) refresh(isCompleted bool) { func createBar(m *mpb.Progress, p *Progress, total int64) *mpb.Bar { var bar *mpb.Bar - isSpinner := false - if total < 0 { - isSpinner = true - } labelDecorator := decor.Name(p.label, decor.WC{W: progressBarNameColumnWidth, C: decor.DidentRight}) - suffixOption := mpb.AppendDecorators(decor.Name(progressSuffix), decor.Any(func(statistics decor.Statistics) string { - if p.Completed() { - return text.FgGreen.Sprintf(" done") - } - return "" - })) - if isSpinner { + suffixOption := mpb.AppendDecorators(decor.OnComplete(decor.Name(progressSuffix), text.FgGreen.Sprintf(" done"))) + if p.progressType == Spinner || p.progressType == SpinnerNoCounter { // unknown total, render a spinner - bar = m.AddSpinner(total, mpb.SpinnerOnMiddle, suffixOption, - mpb.PrependDecorators(labelDecorator, - decor.CurrentNoUnit(spinnerCounterFormat, decor.WC{W: progressBarCounterColumnWidth}))) - } else { - bar = m.AddBar(total, mpb.BarStyle(progressBarStyle), suffixOption, + var counterDecorator decor.Decorator + if p.progressType == Spinner { + counterDecorator = decor.CurrentNoUnit(spinnerCounterFormat, decor.WC{W: progressBarCounterColumnWidth}) + } else { + counterDecorator = decor.Name(spinnerNoCounterFormat, decor.WC{W: progressBarCounterColumnWidth}) + } + bar = m.AddSpinner(total, mpb.SpinnerOnMiddle, mpb.BarFillerClearOnComplete(), suffixOption, mpb.PrependDecorators(labelDecorator, counterDecorator, decor.OnComplete(decor.Name(progressPrefix), ""))) + } else { // type == Bar + bar = m.AddBar(total, mpb.BarStyle(progressBarStyle), mpb.BarFillerClearOnComplete(), suffixOption, mpb.PrependDecorators(labelDecorator, - decor.CountersNoUnit(progressCounterFormat, decor.WC{W: progressBarCounterColumnWidth}))) + decor.CountersNoUnit(progressCounterFormat, decor.WC{W: progressBarCounterColumnWidth}), + decor.OnComplete(decor.Name(progressPrefix), ""))) } return bar } diff --git a/onboard/catalog_actions.go b/onboard/catalog_actions.go index c86959a949e..56f8a673587 100644 --- a/onboard/catalog_actions.go +++ b/onboard/catalog_actions.go @@ -33,10 +33,11 @@ type CatalogRepoActions struct { logger logging.Logger deletedProgress *cmdutils.Progress addedProgress *cmdutils.Progress + commitProgress *cmdutils.Progress } func (c *CatalogRepoActions) Progress() []*cmdutils.Progress { - return []*cmdutils.Progress{c.addedProgress, c.deletedProgress} + return []*cmdutils.Progress{c.addedProgress, c.deletedProgress, c.commitProgress} } func NewCatalogActions(cataloger catalog.Cataloger, repository string, committer string, logger logging.Logger) *CatalogRepoActions { @@ -45,8 +46,9 @@ func NewCatalogActions(cataloger catalog.Cataloger, repository string, committer repository: repository, committer: committer, logger: logger, - addedProgress: cmdutils.NewProgress("Objects Added or Changed", -1), - deletedProgress: cmdutils.NewProgress("Objects Deleted", -1), + addedProgress: cmdutils.NewActiveProgress("Objects Added or Changed", cmdutils.Spinner), + deletedProgress: cmdutils.NewActiveProgress("Objects Deleted", cmdutils.Spinner), + commitProgress: cmdutils.NewProgress("Committing Changes", cmdutils.SpinnerNoCounter), } } @@ -137,6 +139,8 @@ func (c *CatalogRepoActions) ApplyImport(ctx context.Context, it Iterator, dryRu } } c.addedProgress.Add(int64(len(currentBatch))) + c.addedProgress.SetCompleted(true) + c.deletedProgress.SetCompleted(true) return &stats, nil } @@ -159,8 +163,13 @@ func (c *CatalogRepoActions) GetPreviousCommit(ctx context.Context) (commit *cat } func (c *CatalogRepoActions) Commit(ctx context.Context, commitMsg string, metadata catalog.Metadata) (*catalog.CommitLog, error) { - return c.cataloger.Commit(ctx, c.repository, DefaultBranchName, + c.commitProgress.Activate() + res, err := c.cataloger.Commit(ctx, c.repository, DefaultBranchName, commitMsg, c.committer, metadata) + if err == nil { + c.commitProgress.SetCompleted(true) + } + return res, err } diff --git a/onboard/import.go b/onboard/import.go index 725902582d7..029ec29c6dd 100644 --- a/onboard/import.go +++ b/onboard/import.go @@ -45,7 +45,10 @@ type Stats struct { PreviousImportDate time.Time } -var ErrNoInventoryURL = errors.New("no inventory_url in commit Metadata") +var ( + ErrNoInventoryURL = errors.New("no inventory_url in commit Metadata") + ErrInventoryAlreadyImported = errors.New("given inventory was already imported") +) func CreateImporter(ctx context.Context, logger logging.Logger, config *Config) (importer *Importer, err error) { res := &Importer{ @@ -74,6 +77,9 @@ func (s *Importer) diffIterator(ctx context.Context, commit catalog.CommitLog) ( if previousInventoryURL == "" { return nil, fmt.Errorf("%w. commit_ref=%s", ErrNoInventoryURL, commit.Reference) } + if previousInventoryURL == s.inventory.InventoryURL() { + return nil, fmt.Errorf("%w. commit_ref=%s", ErrInventoryAlreadyImported, commit.Reference) + } previousInv, err := s.inventoryGenerator.GenerateInventory(ctx, s.logger, previousInventoryURL, true) if err != nil { return nil, fmt.Errorf("failed to create inventory for previous state: %w", err) @@ -98,7 +104,6 @@ func (s *Importer) Import(ctx context.Context, dryRun bool) (*Stats, error) { } s.progress = append(dataToImport.Progress(), s.CatalogActions.Progress()...) stats, err := s.CatalogActions.ApplyImport(ctx, dataToImport, dryRun) - time.Sleep(10 * time.Second) if err != nil { return nil, err } @@ -109,7 +114,6 @@ func (s *Importer) Import(ctx context.Context, dryRun bool) (*Stats, error) { } if !dryRun { commitMetadata := CreateCommitMetadata(s.inventory, *stats) - s.progress = append(s.progress, cmdutils.NewProgress("Commit Changes", -1)) commitLog, err := s.CatalogActions.Commit(ctx, fmt.Sprintf(CommitMsgTemplate, s.inventory.SourceName()), commitMetadata) if err != nil { return nil, err From 2bc3024c1715a74ee6c543a2676c3b231e8a444a Mon Sep 17 00:00:00 2001 From: johnnyaug Date: Mon, 12 Oct 2020 20:11:39 +0300 Subject: [PATCH 3/5] remove import spaces --- cmd/lakefs/cmd/import.go | 5 ++--- cmdutils/progress.go | 1 - 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/cmd/lakefs/cmd/import.go b/cmd/lakefs/cmd/import.go index 579b4831842..561c15c46ea 100644 --- a/cmd/lakefs/cmd/import.go +++ b/cmd/lakefs/cmd/import.go @@ -10,15 +10,14 @@ import ( "github.com/jedib0t/go-pretty/text" "github.com/spf13/cobra" - "github.com/treeverse/lakefs/cmdutils" - "github.com/treeverse/lakefs/uri" - "github.com/treeverse/lakefs/block/factory" "github.com/treeverse/lakefs/catalog" + "github.com/treeverse/lakefs/cmdutils" "github.com/treeverse/lakefs/config" "github.com/treeverse/lakefs/db" "github.com/treeverse/lakefs/logging" "github.com/treeverse/lakefs/onboard" + "github.com/treeverse/lakefs/uri" ) const ( diff --git a/cmdutils/progress.go b/cmdutils/progress.go index f488b581d44..c1bbadac58c 100644 --- a/cmdutils/progress.go +++ b/cmdutils/progress.go @@ -5,7 +5,6 @@ import ( "time" "github.com/jedib0t/go-pretty/text" - "github.com/vbauerster/mpb/v5" "github.com/vbauerster/mpb/v5/decor" ) From 5a010eea59c92537a0649ec06bd1c48757845985 Mon Sep 17 00:00:00 2001 From: johnnyaug Date: Mon, 12 Oct 2020 23:54:31 +0300 Subject: [PATCH 4/5] triger GH actions From 27b423922e5f33da917ee584f161eae70d809902 Mon Sep 17 00:00:00 2001 From: johnnyaug Date: Tue, 13 Oct 2020 11:56:44 +0300 Subject: [PATCH 5/5] CR fix: do not use pointers for progress counters --- cmdutils/progress.go | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/cmdutils/progress.go b/cmdutils/progress.go index c1bbadac58c..3ce6c71b6d2 100644 --- a/cmdutils/progress.go +++ b/cmdutils/progress.go @@ -34,8 +34,8 @@ type ProgressReporter interface { type Progress struct { label string - current *int64 - total *int64 + current int64 + total int64 completed bool active bool progressType int @@ -49,14 +49,13 @@ type MultiBar struct { } func NewProgress(label string, progressType int) *Progress { - total := int64(0) + total := 0 if progressType == Spinner || progressType == SpinnerNoCounter { - total = int64(-1) + total = -1 } return &Progress{ label: label, - current: new(int64), - total: &total, + total: int64(total), progressType: progressType, } } @@ -72,27 +71,27 @@ func (p *Progress) Label() string { } func (p *Progress) Current() int64 { - return atomic.LoadInt64(p.current) + return atomic.LoadInt64(&p.current) } func (p *Progress) Total() int64 { - return atomic.LoadInt64(p.total) + return atomic.LoadInt64(&p.total) } func (p *Progress) Incr() { - atomic.AddInt64(p.current, 1) + atomic.AddInt64(&p.current, 1) } func (p *Progress) Add(n int64) { - atomic.AddInt64(p.current, n) + atomic.AddInt64(&p.current, n) } func (p *Progress) SetCurrent(n int64) { - atomic.StoreInt64(p.current, n) + atomic.StoreInt64(&p.current, n) } func (p *Progress) SetTotal(n int64) { - atomic.StoreInt64(p.total, n) + atomic.StoreInt64(&p.total, n) } func (p *Progress) Completed() bool {