Skip to content

Commit

Permalink
feat(repair): add optimizeJobType
Browse files Browse the repository at this point in the history
This repair job type uses small_table_optimization param in order to repair the whole table with one API call.

Fixes #3642
  • Loading branch information
Michal-Leszczynski authored and karol-kokoszka committed May 9, 2024
1 parent 2bd7912 commit 99a4bbc
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 14 deletions.
5 changes: 4 additions & 1 deletion pkg/scyllaclient/client_scylla.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,14 +520,17 @@ func ReplicaHash(replicaSet []string) uint64 {
}

// Repair invokes async repair and returns the repair command ID.
func (c *Client) Repair(ctx context.Context, keyspace, table, master string, replicaSet []string, ranges []TokenRange) (int32, error) {
func (c *Client) Repair(ctx context.Context, keyspace, table, master string, replicaSet []string, ranges []TokenRange, smallTableOpt bool) (int32, error) {
dr := dumpRanges(ranges)
p := operations.StorageServiceRepairAsyncByKeyspacePostParams{
Context: forceHost(ctx, master),
Keyspace: keyspace,
ColumnFamilies: &table,
Ranges: &dr,
}
if smallTableOpt {
p.SmallTableOptimization = pointer.StringPtr("true")
}
// Single node cluster repair fails with hosts param
if len(replicaSet) > 1 {
hosts := strings.Join(replicaSet, ",")
Expand Down
14 changes: 12 additions & 2 deletions pkg/service/repair/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ const (
normalJobType jobType = iota
skipJobType
mergeRangesJobType
optimizeJobType
)

type job struct {
Expand Down Expand Up @@ -162,14 +163,17 @@ func (g *generator) newTableGenerator(keyspace string, tp tablePlan, ring scylla
done, allRangesCnt := g.pm.GetCompletedRanges(keyspace, tp.Table)
// Always repair unfinished tablet table from scratch as
// tablet load balancing is enabled when repair is interrupted.
if !g.ringDescriber.IsTabletKeyspace(keyspace) || len(done) == allRangesCnt {
tabletKs := g.ringDescriber.IsTabletKeyspace(keyspace)
if !tabletKs || len(done) == allRangesCnt {
for _, r := range done {
delete(todoRanges, r)
}
}

var jt jobType
switch {
case g.plan.SmallTableOptSupport && tp.Small && !tabletKs:
jt = optimizeJobType
case len(ring.ReplicaTokens) == 1 && tp.Small:
jt = mergeRangesJobType
default:
Expand Down Expand Up @@ -256,14 +260,20 @@ func (tg *tableGenerator) newJob() (job, bool) {
tg.ctl.Unblock(filtered)
continue
}
jt := tg.JobType
// A single optimized job repairs the whole table,
// so the remaining job are skipped (and sent only for recording progress).
if tg.JobType == optimizeJobType {
tg.JobType = skipJobType
}

return job{
keyspace: tg.Keyspace,
table: tg.Table,
master: tg.ms.Select(filtered),
replicaSet: filtered,
ranges: ranges,
jobType: tg.JobType,
jobType: jt,
}, true
}
}
Expand Down
54 changes: 46 additions & 8 deletions pkg/service/repair/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,23 @@ import (
"context"
"math"
"sort"
"sync/atomic"

"github.com/pkg/errors"
"github.com/scylladb/go-set/strset"
"github.com/scylladb/scylla-manager/v3/pkg/scyllaclient"
"github.com/scylladb/scylla-manager/v3/pkg/util/slice"
"golang.org/x/sync/errgroup"
)

// plan describes whole repair schedule and state.
type plan struct {
Keyspaces keyspacePlans

Hosts []string
MaxParallel int
MaxHostIntensity map[string]Intensity
Hosts []string
MaxParallel int
MaxHostIntensity map[string]Intensity
SmallTableOptSupport bool
// Used for progress purposes
Stats map[scyllaclient.HostKeyspaceTable]tableStats
}
Expand Down Expand Up @@ -121,18 +124,24 @@ func newPlan(ctx context.Context, target Target, client *scyllaclient.Client) (*
}
ks.fillSmall(target.SmallTableThreshold)

support, err := isSmallTableOptSupported(ctx, client, hosts)
if err != nil {
return nil, errors.Wrap(err, "check support of small_table_optimization")
}

// Update max host intensity
mhi, err := maxHostIntensity(ctx, client, hosts)
if err != nil {
return nil, errors.Wrap(err, "calculate max host intensity")
}

return &plan{
Keyspaces: ks,
Hosts: hosts,
MaxParallel: maxP,
MaxHostIntensity: mhi,
Stats: newStats(sizeReport, ranges),
Keyspaces: ks,
Hosts: hosts,
MaxParallel: maxP,
MaxHostIntensity: mhi,
SmallTableOptSupport: support,
Stats: newStats(sizeReport, ranges),
}, nil
}

Expand Down Expand Up @@ -367,3 +376,32 @@ func newHostKsTable(host, ks, table string) scyllaclient.HostKeyspaceTable {
Table: table,
}
}

func isSmallTableOptSupported(ctx context.Context, client *scyllaclient.Client, hosts []string) (bool, error) {
out := atomic.Bool{}
out.Store(true)
eg := errgroup.Group{}

for _, host := range hosts {
h := host
eg.Go(func() error {
ni, err := client.NodeInfo(ctx, h)
if err != nil {
return err
}
res, err := ni.SupportsRepairSmallTableOptimization()
if err != nil {
return err
}
if !res {
out.Store(false)
}
return nil
})
}

if err := eg.Wait(); err != nil {
return false, err
}
return out.Load(), nil
}
11 changes: 8 additions & 3 deletions pkg/service/repair/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,22 @@ func (w *worker) runRepair(ctx context.Context, j job) (out error) {
out = errors.Wrapf(out, "master %s keyspace %s table %s command %d", j.master, j.keyspace, j.table, jobID)
}()

ranges := j.ranges
if j.jobType == mergeRangesJobType {
var ranges []scyllaclient.TokenRange
switch {
case j.jobType == optimizeJobType:
ranges = nil
case j.jobType == mergeRangesJobType:
ranges = []scyllaclient.TokenRange{
{
StartToken: dht.Murmur3MinToken,
EndToken: dht.Murmur3MaxToken,
},
}
default:
ranges = j.ranges
}

jobID, err = w.client.Repair(ctx, j.keyspace, j.table, j.master, j.replicaSet, ranges)
jobID, err = w.client.Repair(ctx, j.keyspace, j.table, j.master, j.replicaSet, ranges, j.jobType == optimizeJobType)
if err != nil {
return errors.Wrap(err, "schedule repair")
}
Expand Down

0 comments on commit 99a4bbc

Please sign in to comment.