Skip to content

Commit

Permalink
Add an "all results" query to scanner/fixer workflows (#5470)
Browse files Browse the repository at this point in the history
# What

This PR adds an `all_results` query to both scanner and fixer workflows, to retrieve all (non-empty) results in one operation.  This makes it easier to find all failures and all output filenames, without having to repeatedly query in varying ways.

# Why

Currently, getting all output filenames from these workflows is an exercise in frustration.

You can:
- query `shard_corrupt_keys` to get all shards with corruptions (no data on fails, etc)
- query `shard_report` to get a *single* shard's corruptions, errors, skips, control-flow failures
- browse activity results by hand to discover ^ this in bulk

But unfortunately:
- metrics do not contain per-shard info so finding the relevant activity or shard is hard
- there are essentially no logs in this entire system (!?!)
- there is currently no query to get both failures and corruptions/fixes in bulk
- if one invariant reports "fixed" and then the next returns "fail" because the fix removed data,
  the end result goes into "failures".  this is true for scans too, corrupt + fail == fail.

Many small bits of friction make trying to bulk-analyze this system *incredibly* painful.

While we do need to just rewrite the whole thing to be less... like it is... we can at least expose this bulk info quite easily in a new query.
  • Loading branch information
Groxx authored Dec 8, 2023
1 parent bc897a1 commit b635358
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 0 deletions.
22 changes: 22 additions & 0 deletions service/worker/scanner/shardscanner/aggregators.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,17 @@ func (a *ShardFixResultAggregator) GetReport(shardID int) (*FixReport, error) {
return nil, fmt.Errorf("shard %v has not finished yet, check back later for report", shardID)
}

func (a *ShardFixResultAggregator) GetAllFixResults() (map[int]FixResult, error) {
result := make(map[int]FixResult, len(a.reports))
for k, v := range a.reports {
if v.Result.Empty() {
continue
}
result[k] = v.Result
}
return result, nil
}

func (a *ShardFixResultAggregator) adjustAggregation(stats FixStats, fn func(a, b int64) int64) {
a.aggregation.EntitiesCount = fn(a.aggregation.EntitiesCount, stats.EntitiesCount)
a.aggregation.SkippedCount = fn(a.aggregation.SkippedCount, stats.SkippedCount)
Expand Down Expand Up @@ -557,6 +568,17 @@ func (a *ShardScanResultAggregator) adjustAggregation(stats ScanStats, fn func(a
}
}

func (a *ShardScanResultAggregator) GetAllScanResults() (map[int]ScanResult, error) {
result := make(map[int]ScanResult, len(a.reports))
for k, v := range a.reports {
if v.Result.Empty() {
continue
}
result[k] = v.Result
}
return result, nil
}

func getStatusResult(
minShardID int,
maxShardID int,
Expand Down
6 changes: 6 additions & 0 deletions service/worker/scanner/shardscanner/fixer_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,12 @@ func setHandlers(aggregator *ShardFixResultAggregator) map[string]interface{} {
}
return aggregator.GetDomainStatus(req)
},
AllResultsQuery: func() (map[int]FixResult, error) {
if aggregator == nil {
return nil, errQueryNotReady
}
return aggregator.GetAllFixResults()
},
}
}

Expand Down
11 changes: 11 additions & 0 deletions service/worker/scanner/shardscanner/scanner_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ const (
ShardSizeQuery = "shard_size"
// DomainReportQuery is the query name for the query used to get the reports per domains for all finished shards
DomainReportQuery = "domain_report"
// AllResultsQuery returns filenames / paginating data for corruptions and failures in this workflow,
// for shards which have finished processing. This works for both scanner and fixer, and the return structures
// are very similar.
//
// This data is also available for a single shard under ShardReportQuery, but using that requires
// re-querying repeatedly if more than that single shard's data is desired, e.g. for manual
// troubleshooting purposes.
AllResultsQuery = "all_results"

scanShardReportChan = "scanShardReportChan"
)
Expand Down Expand Up @@ -206,6 +214,9 @@ func getScanHandlers(aggregator *ShardScanResultAggregator) map[string]interface
DomainReportQuery: func(req DomainReportQueryRequest) (*DomainScanReportQueryResult, error) {
return aggregator.GetDomainStatus(req)
},
AllResultsQuery: func() (map[int]ScanResult, error) {
return aggregator.GetAllScanResults()
},
}
}

Expand Down
41 changes: 41 additions & 0 deletions service/worker/scanner/shardscanner/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,3 +519,44 @@ func GetFixerContext(
}
return val, nil
}

// Empty returns true if this ScanResult has no "real" data, e.g. only nils or empty values.
func (s *ScanResult) Empty() bool {
if s == nil {
return true
}
if s.ControlFlowFailure != nil && (*s.ControlFlowFailure != ControlFlowFailure{}) {
return false // at least control flow failure has data
}
if s.ShardScanKeys != nil {
if s.ShardScanKeys.Corrupt != nil && (*s.ShardScanKeys.Corrupt != store.Keys{}) {
return false // corrupt data exists
}
if s.ShardScanKeys.Failed != nil && (*s.ShardScanKeys.Failed != store.Keys{}) {
return false // failed data exists
}
}
return true // both empty
}

// Empty returns true if this FixResult has no "real" data, e.g. only nils or empty values.
func (f *FixResult) Empty() bool {
if f == nil {
return true
}
if f.ControlFlowFailure != nil && (*f.ControlFlowFailure != ControlFlowFailure{}) {
return false // at least control flow failure has data
}
if f.ShardFixKeys != nil {
if f.ShardFixKeys.Fixed != nil && (*f.ShardFixKeys.Fixed != store.Keys{}) {
return false // fixed data exists
}
if f.ShardFixKeys.Failed != nil && (*f.ShardFixKeys.Failed != store.Keys{}) {
return false // failed data exists
}
if f.ShardFixKeys.Skipped != nil && (*f.ShardFixKeys.Skipped != store.Keys{}) {
return false // skipped data exists
}
}
return true // both empty
}

0 comments on commit b635358

Please sign in to comment.