Skip to content

Commit

Permalink
workloadrepo: Add unit testing for the Workload Repository. (#58266)
Browse files Browse the repository at this point in the history
ref #58247
  • Loading branch information
wddevries authored Jan 9, 2025
1 parent 8390fc4 commit 2ae0b0a
Show file tree
Hide file tree
Showing 8 changed files with 909 additions and 167 deletions.
4 changes: 3 additions & 1 deletion pkg/util/workloadrepo/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,17 @@ go_library(

go_test(
name = "workloadrepo_test",
timeout = "short",
timeout = "long",
srcs = ["worker_test.go"],
embed = [":workloadrepo"],
flaky = True,
deps = [
"//pkg/domain",
"//pkg/infoschema",
"//pkg/kv",
"//pkg/owner",
"//pkg/parser/ast",
"//pkg/sessionctx",
"//pkg/testkit",
"@com_github_stretchr_testify//require",
"@io_etcd_go_etcd_client_v3//:client",
Expand Down
136 changes: 82 additions & 54 deletions pkg/util/workloadrepo/housekeeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package workloadrepo

import (
"context"
"errors"
"fmt"
"strings"
"time"
Expand All @@ -38,79 +39,102 @@ func calcNextTick(now time.Time) time.Duration {
return next.Sub(now)
}

func createAllPartitions(ctx context.Context, sess sessionctx.Context, is infoschema.InfoSchema) error {
func createPartition(ctx context.Context, is infoschema.InfoSchema, tbl *repositoryTable, sess sessionctx.Context, now time.Time) error {
tbSchema, err := is.TableByName(ctx, workloadSchemaCIStr, ast.NewCIStr(tbl.destTable))
if err != nil {
logutil.BgLogger().Info("workload repository cannot get table", zap.String("tbl", tbl.destTable), zap.NamedError("err", err))
return err
}
tbInfo := tbSchema.Meta()

sb := &strings.Builder{}
for _, tbl := range workloadTables {
tbSchema, err := is.TableByName(ctx, workloadSchemaCIStr, ast.NewCIStr(tbl.destTable))
sqlescape.MustFormatSQL(sb, "ALTER TABLE %n.%n ADD PARTITION (", WorkloadSchema, tbl.destTable)
skip, err := generatePartitionRanges(sb, tbInfo, now)
if err != nil {
return err
}
if !skip {
fmt.Fprintf(sb, ")")
_, err = execRetry(ctx, sess, sb.String())
if err != nil {
logutil.BgLogger().Info("workload repository cannot get table", zap.String("tbl", tbl.destTable), zap.NamedError("err", err))
logutil.BgLogger().Info("workload repository cannot add partitions", zap.String("parts", sb.String()), zap.NamedError("err", err))
return err
}
tbInfo := tbSchema.Meta()

sb.Reset()
sqlescape.MustFormatSQL(sb, "ALTER TABLE %n.%n ADD PARTITION (", WorkloadSchema, tbl.destTable)
if !generatePartitionRanges(sb, tbInfo) {
fmt.Fprintf(sb, ")")
_, err = execRetry(ctx, sess, sb.String())
if err != nil {
logutil.BgLogger().Info("workload repository cannot add partitions", zap.String("parts", sb.String()), zap.NamedError("err", err))
return err
}
}
return nil
}

func (w *worker) createAllPartitions(ctx context.Context, sess sessionctx.Context, is infoschema.InfoSchema, now time.Time) error {
for _, tbl := range w.workloadTables {
if err := createPartition(ctx, is, &tbl, sess, now); err != nil {
return err
}
}
return nil
}

func (w *worker) dropOldPartitions(ctx context.Context, sess sessionctx.Context, is infoschema.InfoSchema, now time.Time) error {
w.Lock()
retention := int(w.retentionDays)
w.Unlock()
func dropOldPartition(ctx context.Context, is infoschema.InfoSchema,
tbl *repositoryTable, now time.Time, retention int, sess sessionctx.Context) error {
tbSchema, err := is.TableByName(ctx, workloadSchemaCIStr, ast.NewCIStr(tbl.destTable))
if err != nil {
return fmt.Errorf("workload repository could not find table `%s`: %v", tbl.destTable, err)
}
tbInfo := tbSchema.Meta()
if tbInfo == nil {
return fmt.Errorf("workload repository could not load information for '%s'", tbl.destTable)
}
pi := tbInfo.GetPartitionInfo()
if pi == nil || pi.Definitions == nil {
return fmt.Errorf("workload repository could not load partition information for '%s'", tbl.destTable)
}
for _, pt := range pi.Definitions {
ot, err := parsePartitionName(pt.Name.L)
if err != nil {
return fmt.Errorf("workload repository could not cannot parse partition name (%s) for '%s': %v", pt.Name.L, tbl.destTable, err)
}
if int(now.Sub(ot).Hours()/24) < retention {
continue
}
sb := &strings.Builder{}
sqlescape.MustFormatSQL(sb, "ALTER TABLE %n.%n DROP PARTITION %n",
WorkloadSchema, tbl.destTable, pt.Name.L)
_, err = execRetry(ctx, sess, sb.String())
if err != nil {
return fmt.Errorf("workload repository cannot drop partition (%s) on '%s': %v", pt.Name.L, tbl.destTable, err)
}
}

return nil
}

func (w *worker) dropOldPartitions(ctx context.Context, sess sessionctx.Context, is infoschema.InfoSchema, now time.Time, retention int) error {
if retention == 0 {
// disabled housekeeping
return nil
}

sb := &strings.Builder{}
for _, tbl := range workloadTables {
tbSchema, err := is.TableByName(ctx, workloadSchemaCIStr, ast.NewCIStr(tbl.destTable))
if err != nil {
logutil.BgLogger().Info("workload repository cannot get table", zap.String("tbl", tbl.destTable), zap.NamedError("err", err))
continue
}
tbInfo := tbSchema.Meta()
for _, pt := range tbInfo.GetPartitionInfo().Definitions {
ot, err := time.Parse("p20060102", pt.Name.L)
if err != nil {
logutil.BgLogger().Info("workload repository cannot parse partition name", zap.String("part", pt.Name.L), zap.NamedError("err", err))
break
}
if int(now.Sub(ot).Hours()/24) < retention {
continue
}
sb.Reset()
sqlescape.MustFormatSQL(sb, "ALTER TABLE %s.%s DROP PARTITION %s",
WorkloadSchema, tbl.destTable, pt.Name.L)
_, err = execRetry(ctx, sess, sb.String())
if err != nil {
logutil.BgLogger().Info("workload repository cannot drop partition", zap.String("part", pt.Name.L), zap.NamedError("err", err))
break
}
var err error
for _, tbl := range w.workloadTables {
err2 := dropOldPartition(ctx, is, &tbl, now, retention, sess)
if err2 != nil {
logutil.BgLogger().Warn("workload repository could not drop partitions", zap.NamedError("err", err2))
err = errors.Join(err, err2)
}
}
return nil

return err
}

func (w *worker) startHouseKeeper(ctx context.Context) func() {
func (w *worker) getHouseKeeper(ctx context.Context, fn func(time.Time) time.Duration) func() {
return func() {
now := time.Now()
timer := time.NewTimer(calcNextTick(now))
timer := time.NewTimer(fn(now))
defer timer.Stop()

_sessctx := w.getSessionWithRetry()
defer w.sesspool.Put(_sessctx)
sess := _sessctx.(sessionctx.Context)

for {
select {
case <-ctx.Done():
Expand All @@ -125,21 +149,25 @@ func (w *worker) startHouseKeeper(ctx context.Context) func() {
is := sessiontxn.GetTxnManager(sess).GetTxnInfoSchema()

// create new partitions
if err := createAllPartitions(ctx, sess, is); err != nil {
if err := w.createAllPartitions(ctx, sess, is, now); err != nil {
continue
}

w.Lock()
retention := int(w.retentionDays)
w.Unlock()

// drop old partitions
if err := w.dropOldPartitions(ctx, sess, is, now); err != nil {
if err := w.dropOldPartitions(ctx, sess, is, now, retention); err != nil {
continue
}

// reschedule, drain channel first
if !timer.Stop() {
<-timer.C
}
timer.Reset(calcNextTick(now))
timer.Reset(fn(now))
}
}
}
}

func (w *worker) startHouseKeeper(ctx context.Context) func() {
return w.getHouseKeeper(ctx, calcNextTick)
}
16 changes: 6 additions & 10 deletions pkg/util/workloadrepo/sampling.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,7 @@ func (w *worker) samplingTable(ctx context.Context, rt *repositoryTable) {

func (w *worker) startSample(ctx context.Context) func() {
return func() {
w.Lock()
w.samplingTicker = time.NewTicker(time.Duration(w.samplingInterval) * time.Second)
w.Unlock()
w.resetSamplingInterval(w.samplingInterval)

for {
select {
Expand All @@ -56,8 +54,8 @@ func (w *worker) startSample(ctx context.Context) func() {
// sample thread
var wg util.WaitGroupWrapper

for rtIdx := range workloadTables {
rt := &workloadTables[rtIdx]
for rtIdx := range w.workloadTables {
rt := &w.workloadTables[rtIdx]
if rt.tableType != samplingTable {
continue
}
Expand All @@ -73,10 +71,6 @@ func (w *worker) startSample(ctx context.Context) func() {
}

func (w *worker) resetSamplingInterval(newRate int32) {
if w.samplingTicker == nil {
return
}

if newRate == 0 {
w.samplingTicker.Stop()
} else {
Expand All @@ -95,7 +89,9 @@ func (w *worker) changeSamplingInterval(_ context.Context, d string) error {

if int32(n) != w.samplingInterval {
w.samplingInterval = int32(n)
w.resetSamplingInterval(w.samplingInterval)
if w.samplingTicker != nil {
w.resetSamplingInterval(w.samplingInterval)
}
}

return nil
Expand Down
Loading

0 comments on commit 2ae0b0a

Please sign in to comment.