From 99a4bbc1a61c82b7ac240a9976e0d576367a4bb2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Leszczy=C5=84ski?= <2000michal@wp.pl> Date: Wed, 24 Apr 2024 12:26:51 +0200 Subject: [PATCH] feat(repair): add optimizeJobType This repair job type uses small_table_optimization param in order to repair the whole table with one API call. Fixes #3642 --- pkg/scyllaclient/client_scylla.go | 5 ++- pkg/service/repair/generator.go | 14 ++++++-- pkg/service/repair/plan.go | 54 ++++++++++++++++++++++++++----- pkg/service/repair/worker.go | 11 +++++-- 4 files changed, 70 insertions(+), 14 deletions(-) diff --git a/pkg/scyllaclient/client_scylla.go b/pkg/scyllaclient/client_scylla.go index da2623121b..d45ff84cb0 100644 --- a/pkg/scyllaclient/client_scylla.go +++ b/pkg/scyllaclient/client_scylla.go @@ -520,7 +520,7 @@ func ReplicaHash(replicaSet []string) uint64 { } // Repair invokes async repair and returns the repair command ID. -func (c *Client) Repair(ctx context.Context, keyspace, table, master string, replicaSet []string, ranges []TokenRange) (int32, error) { +func (c *Client) Repair(ctx context.Context, keyspace, table, master string, replicaSet []string, ranges []TokenRange, smallTableOpt bool) (int32, error) { dr := dumpRanges(ranges) p := operations.StorageServiceRepairAsyncByKeyspacePostParams{ Context: forceHost(ctx, master), @@ -528,6 +528,9 @@ func (c *Client) Repair(ctx context.Context, keyspace, table, master string, rep ColumnFamilies: &table, Ranges: &dr, } + if smallTableOpt { + p.SmallTableOptimization = pointer.StringPtr("true") + } // Single node cluster repair fails with hosts param if len(replicaSet) > 1 { hosts := strings.Join(replicaSet, ",") diff --git a/pkg/service/repair/generator.go b/pkg/service/repair/generator.go index 7a4c9591ad..f41f820cd1 100644 --- a/pkg/service/repair/generator.go +++ b/pkg/service/repair/generator.go @@ -60,6 +60,7 @@ const ( normalJobType jobType = iota skipJobType mergeRangesJobType + optimizeJobType ) type job struct { @@ -162,7 +163,8 @@ func (g *generator) newTableGenerator(keyspace string, tp tablePlan, ring scylla done, allRangesCnt := g.pm.GetCompletedRanges(keyspace, tp.Table) // Always repair unfinished tablet table from scratch as // tablet load balancing is enabled when repair is interrupted. - if !g.ringDescriber.IsTabletKeyspace(keyspace) || len(done) == allRangesCnt { + tabletKs := g.ringDescriber.IsTabletKeyspace(keyspace) + if !tabletKs || len(done) == allRangesCnt { for _, r := range done { delete(todoRanges, r) } @@ -170,6 +172,8 @@ func (g *generator) newTableGenerator(keyspace string, tp tablePlan, ring scylla var jt jobType switch { + case g.plan.SmallTableOptSupport && tp.Small && !tabletKs: + jt = optimizeJobType case len(ring.ReplicaTokens) == 1 && tp.Small: jt = mergeRangesJobType default: @@ -256,6 +260,12 @@ func (tg *tableGenerator) newJob() (job, bool) { tg.ctl.Unblock(filtered) continue } + jt := tg.JobType + // A single optimized job repairs the whole table, + // so the remaining job are skipped (and sent only for recording progress). + if tg.JobType == optimizeJobType { + tg.JobType = skipJobType + } return job{ keyspace: tg.Keyspace, @@ -263,7 +273,7 @@ func (tg *tableGenerator) newJob() (job, bool) { master: tg.ms.Select(filtered), replicaSet: filtered, ranges: ranges, - jobType: tg.JobType, + jobType: jt, }, true } } diff --git a/pkg/service/repair/plan.go b/pkg/service/repair/plan.go index cdd437f99e..50a2cbea37 100644 --- a/pkg/service/repair/plan.go +++ b/pkg/service/repair/plan.go @@ -6,20 +6,23 @@ import ( "context" "math" "sort" + "sync/atomic" "github.com/pkg/errors" "github.com/scylladb/go-set/strset" "github.com/scylladb/scylla-manager/v3/pkg/scyllaclient" "github.com/scylladb/scylla-manager/v3/pkg/util/slice" + "golang.org/x/sync/errgroup" ) // plan describes whole repair schedule and state. type plan struct { Keyspaces keyspacePlans - Hosts []string - MaxParallel int - MaxHostIntensity map[string]Intensity + Hosts []string + MaxParallel int + MaxHostIntensity map[string]Intensity + SmallTableOptSupport bool // Used for progress purposes Stats map[scyllaclient.HostKeyspaceTable]tableStats } @@ -121,6 +124,11 @@ func newPlan(ctx context.Context, target Target, client *scyllaclient.Client) (* } ks.fillSmall(target.SmallTableThreshold) + support, err := isSmallTableOptSupported(ctx, client, hosts) + if err != nil { + return nil, errors.Wrap(err, "check support of small_table_optimization") + } + // Update max host intensity mhi, err := maxHostIntensity(ctx, client, hosts) if err != nil { @@ -128,11 +136,12 @@ func newPlan(ctx context.Context, target Target, client *scyllaclient.Client) (* } return &plan{ - Keyspaces: ks, - Hosts: hosts, - MaxParallel: maxP, - MaxHostIntensity: mhi, - Stats: newStats(sizeReport, ranges), + Keyspaces: ks, + Hosts: hosts, + MaxParallel: maxP, + MaxHostIntensity: mhi, + SmallTableOptSupport: support, + Stats: newStats(sizeReport, ranges), }, nil } @@ -367,3 +376,32 @@ func newHostKsTable(host, ks, table string) scyllaclient.HostKeyspaceTable { Table: table, } } + +func isSmallTableOptSupported(ctx context.Context, client *scyllaclient.Client, hosts []string) (bool, error) { + out := atomic.Bool{} + out.Store(true) + eg := errgroup.Group{} + + for _, host := range hosts { + h := host + eg.Go(func() error { + ni, err := client.NodeInfo(ctx, h) + if err != nil { + return err + } + res, err := ni.SupportsRepairSmallTableOptimization() + if err != nil { + return err + } + if !res { + out.Store(false) + } + return nil + }) + } + + if err := eg.Wait(); err != nil { + return false, err + } + return out.Load(), nil +} diff --git a/pkg/service/repair/worker.go b/pkg/service/repair/worker.go index babc8ba7ba..ef82a3ba68 100644 --- a/pkg/service/repair/worker.go +++ b/pkg/service/repair/worker.go @@ -58,17 +58,22 @@ func (w *worker) runRepair(ctx context.Context, j job) (out error) { out = errors.Wrapf(out, "master %s keyspace %s table %s command %d", j.master, j.keyspace, j.table, jobID) }() - ranges := j.ranges - if j.jobType == mergeRangesJobType { + var ranges []scyllaclient.TokenRange + switch { + case j.jobType == optimizeJobType: + ranges = nil + case j.jobType == mergeRangesJobType: ranges = []scyllaclient.TokenRange{ { StartToken: dht.Murmur3MinToken, EndToken: dht.Murmur3MaxToken, }, } + default: + ranges = j.ranges } - jobID, err = w.client.Repair(ctx, j.keyspace, j.table, j.master, j.replicaSet, ranges) + jobID, err = w.client.Repair(ctx, j.keyspace, j.table, j.master, j.replicaSet, ranges, j.jobType == optimizeJobType) if err != nil { return errors.Wrap(err, "schedule repair") }