Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvement/import tool progress indication #804

Merged
merged 7 commits into from
Oct 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions block/s3/inventory_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@ func NewInventoryIterator(inv *Inventory) *InventoryIterator {
creationTimestamp = 0
}
t := time.Unix(creationTimestamp/int64(time.Second/time.Millisecond), 0)
inventoryFileProgress := cmdutils.NewActiveProgress(fmt.Sprintf("Inventory (%s) Files Read", t.Format("2006-01-02")), cmdutils.Bar)
inventoryFileProgress.SetTotal(int64(len(inv.Manifest.Files)))
return &InventoryIterator{
Inventory: inv,
inventoryFileIndex: -1,
inventoryFileProgress: cmdutils.NewProgress(fmt.Sprintf("Inventory (%s) Files Read", t.Format("2006-01-02")), int64(len(inv.Manifest.Files))),
currentFileProgress: cmdutils.NewProgress(fmt.Sprintf("Inventory (%s) Current File", t.Format("2006-01-02")), 0),
inventoryFileProgress: inventoryFileProgress,
currentFileProgress: cmdutils.NewActiveProgress(fmt.Sprintf("Inventory (%s) Current File", t.Format("2006-01-02")), cmdutils.Bar),
}
}

Expand All @@ -52,14 +54,16 @@ func (it *InventoryIterator) Next() bool {
it.err = ErrInventoryNotSorted
return false
}
it.currentFileProgress.Incr()
it.currentFileProgress.SetCurrent(int64(it.valIndexInBuffer + 1))
it.val = val
return true
}
// value not found in buffer, need to reload the buffer
it.valIndexInBuffer = -1
if !it.moveToNextInventoryFile() {
// no more files left
it.inventoryFileProgress.SetCompleted(true)
it.currentFileProgress.SetCompleted(true)
return false
}
if !it.fillBuffer() {
Expand Down
5 changes: 2 additions & 3 deletions cmd/lakefs/cmd/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,14 @@ import (

"github.com/jedib0t/go-pretty/text"
"github.com/spf13/cobra"
"github.com/treeverse/lakefs/cmdutils"
"github.com/treeverse/lakefs/uri"

"github.com/treeverse/lakefs/block/factory"
"github.com/treeverse/lakefs/catalog"
"github.com/treeverse/lakefs/cmdutils"
"github.com/treeverse/lakefs/config"
"github.com/treeverse/lakefs/db"
"github.com/treeverse/lakefs/logging"
"github.com/treeverse/lakefs/onboard"
"github.com/treeverse/lakefs/uri"
)

const (
Expand Down
102 changes: 70 additions & 32 deletions cmdutils/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,41 @@ import (
"sync/atomic"
"time"

"github.com/jedib0t/go-pretty/text"
"github.com/vbauerster/mpb/v5"
"github.com/vbauerster/mpb/v5/decor"
)

const (
progressCounterFormat = "%d / %d ["
spinnerCounterFormat = "%d ["
progressCounterFormat = "%d / %d "
spinnerCounterFormat = "%d "
spinnerNoCounterFormat = ""
progressSuffix = "]"
progressPrefix = "["
progressBarWidth = 60
progressBarNameColumnWidth = 40
progressBarCounterColumnWidth = 20
progressBarStyle = " =>- <"
progressRefreshRate = 50 * time.Millisecond
)

const (
Bar = iota
Spinner
SpinnerNoCounter
)

type ProgressReporter interface {
Progress() []*Progress
}

type Progress struct {
label string
current *int64
total *int64
label string
current int64
total int64
completed bool
active bool
progressType int
}

type MultiBar struct {
Expand All @@ -36,40 +48,62 @@ type MultiBar struct {
ticker *time.Ticker
}

func NewProgress(label string, total int64) *Progress {
func NewProgress(label string, progressType int) *Progress {
total := 0
if progressType == Spinner || progressType == SpinnerNoCounter {
total = -1
}
return &Progress{
label: label,
current: new(int64),
total: &total,
label: label,
total: int64(total),
progressType: progressType,
}
}

func NewActiveProgress(label string, progressType int) *Progress {
res := NewProgress(label, progressType)
res.Activate()
return res
}

func (p *Progress) Label() string {
return p.label
}

func (p *Progress) Current() int64 {
return atomic.LoadInt64(p.current)
return atomic.LoadInt64(&p.current)
}

func (p *Progress) Total() int64 {
return atomic.LoadInt64(p.total)
return atomic.LoadInt64(&p.total)
}

func (p *Progress) Incr() {
atomic.AddInt64(p.current, 1)
atomic.AddInt64(&p.current, 1)
}

func (p *Progress) Add(n int64) {
atomic.AddInt64(p.current, n)
atomic.AddInt64(&p.current, n)
}

func (p *Progress) SetCurrent(n int64) {
atomic.StoreInt64(p.current, n)
atomic.StoreInt64(&p.current, n)
}

func (p *Progress) SetTotal(n int64) {
atomic.StoreInt64(p.total, n)
atomic.StoreInt64(&p.total, n)
}

func (p *Progress) Completed() bool {
return p.completed
}

func (p *Progress) SetCompleted(completed bool) {
p.completed = completed
}

func (p *Progress) Activate() {
p.active = true
}

func NewMultiBar(r ProgressReporter) *MultiBar {
Expand All @@ -93,41 +127,45 @@ func (b *MultiBar) Stop() {
b.mpb.Wait()
}

func (b *MultiBar) refresh(isCompleted bool) {
func (b *MultiBar) refresh(isStop bool) {
progress := b.reporter.Progress()
for _, p := range progress {
if p == nil || !p.active {
continue
}
total := p.Total()
bar, ok := b.mpbBars[p.label]
if !ok {
bar = createBar(b.mpb, p, total)
b.mpbBars[p.label] = bar
}
bar.SetTotal(total, isCompleted)
if !isCompleted {
bar.SetCurrent(p.Current())
if !isStop {
bar.SetTotal(total, p.Completed())
} else {
bar.SetCurrent(total)
bar.SetTotal(p.Current(), true)
}
bar.SetCurrent(p.Current())
}
}

func createBar(m *mpb.Progress, p *Progress, total int64) *mpb.Bar {
var bar *mpb.Bar
isSpinner := false
if total < 0 {
isSpinner = true
}
labelDecorator := decor.Name(p.label, decor.WC{W: progressBarNameColumnWidth, C: decor.DidentRight})
suffixOption := mpb.AppendDecorators(decor.Name(progressSuffix))
if isSpinner {
suffixOption := mpb.AppendDecorators(decor.OnComplete(decor.Name(progressSuffix), text.FgGreen.Sprintf(" done")))
if p.progressType == Spinner || p.progressType == SpinnerNoCounter {
// unknown total, render a spinner
bar = m.AddSpinner(total, mpb.SpinnerOnMiddle, suffixOption,
mpb.PrependDecorators(labelDecorator,
decor.CurrentNoUnit(spinnerCounterFormat, decor.WC{W: progressBarCounterColumnWidth})))
} else {
bar = m.AddBar(total, mpb.BarStyle(progressBarStyle), suffixOption,
var counterDecorator decor.Decorator
if p.progressType == Spinner {
counterDecorator = decor.CurrentNoUnit(spinnerCounterFormat, decor.WC{W: progressBarCounterColumnWidth})
} else {
counterDecorator = decor.Name(spinnerNoCounterFormat, decor.WC{W: progressBarCounterColumnWidth})
}
bar = m.AddSpinner(total, mpb.SpinnerOnMiddle, mpb.BarFillerClearOnComplete(), suffixOption, mpb.PrependDecorators(labelDecorator, counterDecorator, decor.OnComplete(decor.Name(progressPrefix), "")))
} else { // type == Bar
bar = m.AddBar(total, mpb.BarStyle(progressBarStyle), mpb.BarFillerClearOnComplete(), suffixOption,
mpb.PrependDecorators(labelDecorator,
decor.CountersNoUnit(progressCounterFormat, decor.WC{W: progressBarCounterColumnWidth})))
decor.CountersNoUnit(progressCounterFormat, decor.WC{W: progressBarCounterColumnWidth}),
decor.OnComplete(decor.Name(progressPrefix), "")))
}
return bar
}
17 changes: 13 additions & 4 deletions onboard/catalog_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ type CatalogRepoActions struct {
logger logging.Logger
deletedProgress *cmdutils.Progress
addedProgress *cmdutils.Progress
commitProgress *cmdutils.Progress
}

func (c *CatalogRepoActions) Progress() []*cmdutils.Progress {
return []*cmdutils.Progress{c.addedProgress, c.deletedProgress}
return []*cmdutils.Progress{c.addedProgress, c.deletedProgress, c.commitProgress}
}

func NewCatalogActions(cataloger catalog.Cataloger, repository string, committer string, logger logging.Logger) *CatalogRepoActions {
Expand All @@ -45,8 +46,9 @@ func NewCatalogActions(cataloger catalog.Cataloger, repository string, committer
repository: repository,
committer: committer,
logger: logger,
addedProgress: cmdutils.NewProgress("Objects Added or Changed", -1),
deletedProgress: cmdutils.NewProgress("Objects Deleted", -1),
addedProgress: cmdutils.NewActiveProgress("Objects Added or Changed", cmdutils.Spinner),
deletedProgress: cmdutils.NewActiveProgress("Objects Deleted", cmdutils.Spinner),
commitProgress: cmdutils.NewProgress("Committing Changes", cmdutils.SpinnerNoCounter),
}
}

Expand Down Expand Up @@ -137,6 +139,8 @@ func (c *CatalogRepoActions) ApplyImport(ctx context.Context, it Iterator, dryRu
}
}
c.addedProgress.Add(int64(len(currentBatch)))
c.addedProgress.SetCompleted(true)
c.deletedProgress.SetCompleted(true)
return &stats, nil
}

Expand All @@ -159,8 +163,13 @@ func (c *CatalogRepoActions) GetPreviousCommit(ctx context.Context) (commit *cat
}

func (c *CatalogRepoActions) Commit(ctx context.Context, commitMsg string, metadata catalog.Metadata) (*catalog.CommitLog, error) {
return c.cataloger.Commit(ctx, c.repository, DefaultBranchName,
c.commitProgress.Activate()
res, err := c.cataloger.Commit(ctx, c.repository, DefaultBranchName,
commitMsg,
c.committer,
metadata)
if err == nil {
c.commitProgress.SetCompleted(true)
}
return res, err
}
8 changes: 7 additions & 1 deletion onboard/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ type Stats struct {
PreviousImportDate time.Time
}

var ErrNoInventoryURL = errors.New("no inventory_url in commit Metadata")
var (
ErrNoInventoryURL = errors.New("no inventory_url in commit Metadata")
ErrInventoryAlreadyImported = errors.New("given inventory was already imported")
)

func CreateImporter(ctx context.Context, logger logging.Logger, config *Config) (importer *Importer, err error) {
res := &Importer{
Expand Down Expand Up @@ -74,6 +77,9 @@ func (s *Importer) diffIterator(ctx context.Context, commit catalog.CommitLog) (
if previousInventoryURL == "" {
return nil, fmt.Errorf("%w. commit_ref=%s", ErrNoInventoryURL, commit.Reference)
}
if previousInventoryURL == s.inventory.InventoryURL() {
return nil, fmt.Errorf("%w. commit_ref=%s", ErrInventoryAlreadyImported, commit.Reference)
}
previousInv, err := s.inventoryGenerator.GenerateInventory(ctx, s.logger, previousInventoryURL, true)
if err != nil {
return nil, fmt.Errorf("failed to create inventory for previous state: %w", err)
Expand Down