Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cli list blocks usability improvements #403

Merged
merged 6 commits into from
Dec 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
* [CHANGE] Redo tempo-cli with basic command structure and improvements [#385](https://github.com/grafana/tempo/pull/385)
* [CHANGE] Add content negotiation support and sharding parameters to Querier [#375](https://github.com/grafana/tempo/pull/375)
* [ENHANCEMENT] Add docker-compose example for GCS along with new backend options [#397](https://github.com/grafana/tempo/pull/397)
* [ENHANCEMENT] tempo-cli list blocks usability improvements [#403](https://github.com/grafana/tempo/pull/403)
mdisibio marked this conversation as resolved.
Show resolved Hide resolved
* [BUGFIX] Compactor without GCS permissions fail silently [#379](https://github.com/grafana/tempo/issues/379)

## v0.4.0
Expand Down
16 changes: 8 additions & 8 deletions cmd/tempo-cli/cmd-list-block.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@ func dumpBlock(r tempodb_backend.Reader, c tempodb_backend.Compactor, tenantID s
return nil
}

objects, lvl, window, start, end := blockStats(meta, compactedMeta, windowRange)

fmt.Println("ID : ", id)
fmt.Println("Total Objects : ", objects)
fmt.Println("Level : ", lvl)
fmt.Println("Window : ", window)
fmt.Println("Start : ", start)
fmt.Println("End : ", end)
unifiedMeta := getMeta(meta, compactedMeta, windowRange)

fmt.Println("ID : ", unifiedMeta.id)
fmt.Println("Total Objects : ", unifiedMeta.objects)
fmt.Println("Level : ", unifiedMeta.compactionLevel)
fmt.Println("Window : ", unifiedMeta.window)
fmt.Println("Start : ", unifiedMeta.start)
fmt.Println("End : ", unifiedMeta.end)

if checkDupes {
fmt.Println("Searching for dupes ...")
Expand Down
147 changes: 86 additions & 61 deletions cmd/tempo-cli/cmd-list-blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ import (
)

type listBlocksCmd struct {
TenantID string `arg:"" help:"tenant-id within the bucket"`
LoadIndex bool `help:"load block indexes and display additional information"`
TenantID string `arg:"" help:"tenant-id within the bucket"`
LoadIndex bool `help:"load block indexes and display additional information"`
IncludeCompacted bool `help:"include compacted blocks"`
mdisibio marked this conversation as resolved.
Show resolved Hide resolved

backendOptions
}
Expand All @@ -30,104 +31,119 @@ func (l *listBlocksCmd) Run(ctx *globalOptions) error {

windowDuration := time.Hour

results, err := loadBucket(r, c, l.TenantID, windowDuration, l.LoadIndex)
results, err := loadBucket(r, c, l.TenantID, windowDuration, l.LoadIndex, l.IncludeCompacted)
if err != nil {
return err
}

displayResults(results, windowDuration, l.LoadIndex)
displayResults(results, windowDuration, l.LoadIndex, l.IncludeCompacted)
return nil
}

type bucketStats struct {
id uuid.UUID
compactionLevel uint8
objects int
window int64
start time.Time
end time.Time
type blockStats struct {
unifiedBlockMeta

totalIDs int
duplicateIDs int
}

func loadBucket(r tempodb_backend.Reader, c tempodb_backend.Compactor, tenantID string, windowRange time.Duration, loadIndex bool) ([]bucketStats, error) {
func loadBucket(r tempodb_backend.Reader, c tempodb_backend.Compactor, tenantID string, windowRange time.Duration, loadIndex bool, includeCompacted bool) ([]blockStats, error) {
blockIDs, err := r.Blocks(context.Background(), tenantID)
if err != nil {
return nil, err
}

fmt.Println("total blocks: ", len(blockIDs))
results := make([]bucketStats, 0)

for _, id := range blockIDs {
fmt.Print(".")

meta, err := r.BlockMeta(context.Background(), id, tenantID)
if err != nil && err != tempodb_backend.ErrMetaDoesNotExist {
return nil, err
}
// Load in parallel
wg := newBoundedWaitGroup(10)
resultsCh := make(chan blockStats, len(blockIDs))

compactedMeta, err := c.CompactedBlockMeta(id, tenantID)
if err != nil && err != tempodb_backend.ErrMetaDoesNotExist {
return nil, err
}
for _, id := range blockIDs {
wg.Add(1)

totalIDs := -1
duplicateIDs := -1
go func(id2 uuid.UUID) {
defer wg.Done()

if loadIndex {
indexBytes, err := r.Index(context.Background(), id, tenantID)
if err == nil {
records, err := encoding.UnmarshalRecords(indexBytes)
if err != nil {
return nil, err
}
duplicateIDs = 0
totalIDs = len(records)
for i := 1; i < len(records); i++ {
if bytes.Equal(records[i-1].ID, records[i].ID) {
duplicateIDs++
}
}
b, err := loadBlock(r, c, tenantID, id2, windowRange, loadIndex, includeCompacted)
if err != nil {
fmt.Println("Error loading block:", id2, err)
return
}
}

objects, lvl, window, start, end := blockStats(meta, compactedMeta, windowRange)
if b != nil {
resultsCh <- *b
}
}(id)
}

results = append(results, bucketStats{
id: id,
compactionLevel: lvl,
objects: objects,
window: window,
start: start,
end: end,
wg.Wait()
close(resultsCh)

totalIDs: totalIDs,
duplicateIDs: duplicateIDs,
})
results := make([]blockStats, 0)
for b := range resultsCh {
results = append(results, b)
}

sort.Slice(results, func(i, j int) bool {
bI := results[i]
bJ := results[j]
return results[i].end.Before(results[j].end)
})

return results, nil
}

func loadBlock(r tempodb_backend.Reader, c tempodb_backend.Compactor, tenantID string, id uuid.UUID, windowRange time.Duration, loadIndex bool, includeCompacted bool) (*blockStats, error) {
fmt.Print(".")

if bI.window == bJ.window {
return bI.compactionLevel < bJ.compactionLevel
meta, err := r.BlockMeta(context.Background(), id, tenantID)
if err == tempodb_backend.ErrMetaDoesNotExist && !includeCompacted {
return nil, nil
} else if err != nil && err != tempodb_backend.ErrMetaDoesNotExist {
return nil, err
}

compactedMeta, err := c.CompactedBlockMeta(id, tenantID)
if err != nil && err != tempodb_backend.ErrMetaDoesNotExist {
return nil, err
}

totalIDs := -1
duplicateIDs := -1

if loadIndex {
indexBytes, err := r.Index(context.Background(), id, tenantID)
if err == nil {
records, err := encoding.UnmarshalRecords(indexBytes)
if err != nil {
return nil, err
}
duplicateIDs = 0
totalIDs = len(records)
for i := 1; i < len(records); i++ {
if bytes.Equal(records[i-1].ID, records[i].ID) {
duplicateIDs++
}
}
}
}

return bI.window < bJ.window
})
return &blockStats{
unifiedBlockMeta: getMeta(meta, compactedMeta, windowRange),

return results, nil
totalIDs: totalIDs,
duplicateIDs: duplicateIDs,
}, nil
}

func displayResults(results []bucketStats, windowDuration time.Duration, includeIndexInfo bool) {
func displayResults(results []blockStats, windowDuration time.Duration, includeIndexInfo bool, includeCompacted bool) {

columns := []string{"id", "lvl", "count", "window", "start", "end", "duration"}
columns := []string{"id", "lvl", "count", "window", "start", "end", "duration", "age"}
if includeIndexInfo {
columns = append(columns, "idx", "dupe")
}
if includeCompacted {
columns = append(columns, "cmp")
}

totalObjects := 0
out := make([][]string, 0)
Expand Down Expand Up @@ -155,12 +171,21 @@ func displayResults(results []bucketStats, windowDuration time.Duration, include
case "duration":
// Time range included in bucket
s = fmt.Sprint(r.end.Sub(r.start).Round(time.Second))
case "age":
s = fmt.Sprint(time.Since(r.end).Round(time.Second))
case "idx":
// Number of entries in the index (may not be the same as the block when index downsampling enabled)
s = strconv.Itoa(r.totalIDs)
case "dupe":
// Number of duplicate IDs found in the index
s = strconv.Itoa(r.duplicateIDs)
case "cmp":
// Compacted?
if r.compacted {
s = "Y"
} else {
s = " "
}
}

line = append(line, s)
Expand Down
74 changes: 69 additions & 5 deletions cmd/tempo-cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"flag"
"fmt"
"io/ioutil"
"sync"
"time"

"github.com/google/uuid"
"github.com/grafana/tempo/cmd/tempo/app"
tempodb_backend "github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/backend/local"
Expand Down Expand Up @@ -107,12 +109,74 @@ func loadBackend(b *backendOptions, g *globalOptions) (tempodb_backend.Reader, t
return r, c, nil
}

func blockStats(meta *encoding.BlockMeta, compactedMeta *encoding.CompactedBlockMeta, windowRange time.Duration) (int, uint8, int64, time.Time, time.Time) {
type unifiedBlockMeta struct {
id uuid.UUID
compactionLevel uint8
objects int
window int64
start time.Time
end time.Time
compacted bool
}

func getMeta(meta *encoding.BlockMeta, compactedMeta *encoding.CompactedBlockMeta, windowRange time.Duration) unifiedBlockMeta {
if meta != nil {
return meta.TotalObjects, meta.CompactionLevel, meta.EndTime.Unix() / int64(windowRange/time.Second), meta.StartTime, meta.EndTime
} else if compactedMeta != nil {
return compactedMeta.TotalObjects, compactedMeta.CompactionLevel, compactedMeta.EndTime.Unix() / int64(windowRange/time.Second), compactedMeta.StartTime, compactedMeta.EndTime
return unifiedBlockMeta{
id: meta.BlockID,
compactionLevel: meta.CompactionLevel,
objects: meta.TotalObjects,
window: meta.EndTime.Unix() / int64(windowRange/time.Second),
start: meta.StartTime,
end: meta.EndTime,
compacted: false,
}
}
if compactedMeta != nil {
return unifiedBlockMeta{
id: compactedMeta.BlockID,
compactionLevel: compactedMeta.CompactionLevel,
objects: compactedMeta.TotalObjects,
window: compactedMeta.EndTime.Unix() / int64(windowRange/time.Second),
start: compactedMeta.StartTime,
end: compactedMeta.EndTime,
compacted: true,
}
}
return unifiedBlockMeta{
id: uuid.UUID{},
compactionLevel: 0,
objects: -1,
window: -1,
start: time.Unix(0, 0),
end: time.Unix(0, 0),
compacted: false,
}
}

// boundedWaitGroup like a normal wait group except limits number of active goroutines to given capacity.
type boundedWaitGroup struct {
wg sync.WaitGroup
ch chan struct{} // Chan buffer size is used to limit concurrency.
}

func newBoundedWaitGroup(cap int) boundedWaitGroup {
return boundedWaitGroup{ch: make(chan struct{}, cap)}
}

func (bwg *boundedWaitGroup) Add(delta int) {
for i := 0; i > delta; i-- {
<-bwg.ch
}
for i := 0; i < delta; i++ {
bwg.ch <- struct{}{}
}
bwg.wg.Add(delta)
}

func (bwg *boundedWaitGroup) Done() {
bwg.Add(-1)
}

return -1, 0, -1, time.Unix(0, 0), time.Unix(0, 0)
func (bwg *boundedWaitGroup) Wait() {
bwg.wg.Wait()
}
11 changes: 7 additions & 4 deletions docs/tempo/website/cli/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,19 +74,22 @@ Arguments:
- `tenant-id` The tenant ID. Use `single-tenant` for single tenant setups.

Options:
- `--load-index` Also load the block indexes and perform integrity checks for duplicates. **Note:** can be intense.
- `--include-compacted` Include blocks that have been compacted. Default behavior is to display only active blocks.
- `--load-index` Also load the block indexes and perform integrity checks for duplicates. **Note:** This can be a resource intensive process.

**Output:**
Explanation of output:
- `ID` Block ID.
- `Lvl` Compaction level of the block.
- `Count` Number of objects stored in the block.
- `Window` The time window considered for compaction purposes.
- `Window` The window of time that was considered for compaction purposes.
- `Start` The earliest timestamp stored in the block.
- `End` The latest timestamp stored in the block.
- `Duration` Time duration between start and end.
- `Age` The age of the block.
- `Duration`Duration between the start and end time.
- `Idx` Number of records stored in the index (present when --load-index is specified).
- `Dupe` Number of duplicate entries in the index. Should be zero. (present when --load-index is specified).
- `Dupe` Number of duplicate entries in the index (present when --load-index is specified). Should be zero.
- `Cmp` Whether the block has been compacted (present when --include-compacted is specified).

**Example:**
```bash
Expand Down