Skip to content

Commit 57274fc

Browse files
craig[bot]nvb
andcommitted
Merge #33242
33242: workload/tpcc: split, partition, and scatter during initialization r=nvanbenschoten a=nvanbenschoten Fixes #32135. This PR pushes splitting, partitioning, and scattering logic into `tpcc`'s `PostLoad` hook, removing it from its `Ops` method. Co-authored-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
2 parents 8426a2c + 8876c27 commit 57274fc

File tree

6 files changed

+215
-130
lines changed

6 files changed

+215
-130
lines changed

pkg/ccl/workloadccl/fixture.go

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -473,8 +473,9 @@ func ImportFixture(
473473
if err := g.Wait(); err != nil {
474474
return 0, err
475475
}
476-
// TODO(dan): This needs to do splits and run PostLoad hooks. Unify this,
477-
// RestoreFixture, and workload.Setup.
476+
if err := runPostLoadSteps(ctx, sqlDB, gen); err != nil {
477+
return 0, err
478+
}
478479
return atomic.LoadInt64(&bytesAtomic), nil
479480
}
480481

@@ -607,20 +608,27 @@ func RestoreFixture(
607608
if err := g.Wait(); err != nil {
608609
return 0, err
609610
}
610-
const splitConcurrency = 384 // TODO(dan): Don't hardcode this.
611-
for _, table := range fixture.Generator.Tables() {
612-
if err := workload.Split(ctx, sqlDB, table, splitConcurrency); err != nil {
613-
return 0, errors.Wrapf(err, `splitting %s`, table.Name)
614-
}
611+
if err := runPostLoadSteps(ctx, sqlDB, fixture.Generator); err != nil {
612+
return 0, err
615613
}
616-
if h, ok := fixture.Generator.(workload.Hookser); ok {
614+
return atomic.LoadInt64(&bytesAtomic), nil
615+
}
616+
617+
func runPostLoadSteps(ctx context.Context, sqlDB *gosql.DB, gen workload.Generator) error {
618+
if h, ok := gen.(workload.Hookser); ok {
617619
if hooks := h.Hooks(); hooks.PostLoad != nil {
618620
if err := hooks.PostLoad(sqlDB); err != nil {
619-
return 0, errors.Wrap(err, `PostLoad hook`)
621+
return errors.Wrap(err, `PostLoad hook`)
620622
}
621623
}
622624
}
623-
return atomic.LoadInt64(&bytesAtomic), nil
625+
const splitConcurrency = 384 // TODO(dan): Don't hardcode this.
626+
for _, table := range gen.Tables() {
627+
if err := workload.Split(ctx, sqlDB, table, splitConcurrency); err != nil {
628+
return errors.Wrapf(err, `splitting %s`, table.Name)
629+
}
630+
}
631+
return nil
624632
}
625633

626634
// ListFixtures returns the object paths to all fixtures stored in a FixtureConfig.

pkg/cmd/roachtest/tpcc.go

Lines changed: 26 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ type tpccOptions struct {
6363
// tpccFixturesCmd generates the command string to load tpcc data for the
6464
// specified warehouse count into a cluster using either `fixtures import`
6565
// or `fixtures load` depending on the cloud.
66-
func tpccFixturesCmd(t *test, cloud string, warehouses int, checks bool) string {
66+
func tpccFixturesCmd(t *test, cloud string, warehouses int, extraArgs string) string {
6767
var action string
6868
switch cloud {
6969
case "gce":
@@ -87,8 +87,8 @@ func tpccFixturesCmd(t *test, cloud string, warehouses int, checks bool) string
8787
default:
8888
t.Fatalf("unknown cloud: %q", cloud)
8989
}
90-
return fmt.Sprintf("./workload fixtures %s tpcc --checks=%v --warehouses=%d {pgurl:1}",
91-
action, checks, warehouses)
90+
return fmt.Sprintf("./workload fixtures %s tpcc --warehouses=%d %s {pgurl:1}",
91+
action, warehouses, extraArgs)
9292
}
9393

9494
func runTPCC(ctx context.Context, t *test, c *cluster, opts tpccOptions) {
@@ -145,7 +145,7 @@ func runTPCC(ctx context.Context, t *test, c *cluster, opts tpccOptions) {
145145
t.Status("loading dataset")
146146
c.Start(ctx, t, crdbNodes)
147147

148-
c.Run(ctx, workloadNode, tpccFixturesCmd(t, cloud, opts.Warehouses, true /* checks */))
148+
c.Run(ctx, workloadNode, tpccFixturesCmd(t, cloud, opts.Warehouses, ""))
149149
c.Stop(ctx, crdbNodes)
150150

151151
c.Run(ctx, crdbNodes, "test -e /sbin/zfs && sudo zfs snapshot data1@pristine")
@@ -155,7 +155,7 @@ func runTPCC(ctx context.Context, t *test, c *cluster, opts tpccOptions) {
155155
c.Start(ctx, t, crdbNodes)
156156
} else {
157157
c.Start(ctx, t, crdbNodes)
158-
c.Run(ctx, workloadNode, tpccFixturesCmd(t, cloud, opts.Warehouses, true /* checks */))
158+
c.Run(ctx, workloadNode, tpccFixturesCmd(t, cloud, opts.Warehouses, ""))
159159
}
160160
}()
161161
t.Status("waiting")
@@ -583,31 +583,33 @@ func loadTPCCBench(
583583
t.Fatal(err)
584584
}
585585

586-
// Load the corresponding fixture.
587-
t.l.Printf("restoring tpcc fixture\n")
588-
cmd := tpccFixturesCmd(t, cloud, b.LoadWarehouses, false /* checks */)
589-
if err := c.RunE(ctx, loadNode, cmd); err != nil {
590-
return err
591-
}
592-
593-
partArgs := ""
594-
rebalanceWait := time.Duration(b.LoadWarehouses/100) * time.Minute
586+
var loadArgs string
587+
var rebalanceWait time.Duration
595588
switch b.LoadConfig {
596589
case singleLoadgen:
597-
t.l.Printf("splitting and scattering\n")
590+
loadArgs = `--split --scatter --checks=false`
591+
rebalanceWait = time.Duration(b.LoadWarehouses/100) * time.Minute
598592
case singlePartitionedLoadgen:
599-
t.l.Printf("splitting, scattering, and partitioning\n")
600-
partArgs = fmt.Sprintf(`--partitions=%d`, b.partitions())
593+
loadArgs = fmt.Sprintf(`--split --scatter --checks=false --partitions=%d`, b.partitions())
601594
rebalanceWait = time.Duration(b.LoadWarehouses/50) * time.Minute
602595
case multiLoadgen:
603-
t.l.Printf("splitting, scattering, and partitioning\n")
604-
partArgs = fmt.Sprintf(`--partitions=%d --zones="%s" --partition-affinity=0`,
596+
loadArgs = fmt.Sprintf(`--split --scatter --checks=false --partitions=%d --zones="%s"`,
605597
b.partitions(), strings.Join(b.Distribution.zones(), ","))
606598
rebalanceWait = time.Duration(b.LoadWarehouses/20) * time.Minute
607599
default:
608600
panic("unexpected")
609601
}
610602

603+
// Load the corresponding fixture.
604+
t.l.Printf("restoring tpcc fixture\n")
605+
cmd := tpccFixturesCmd(t, cloud, b.LoadWarehouses, loadArgs)
606+
if err := c.RunE(ctx, loadNode, cmd); err != nil {
607+
return err
608+
}
609+
if rebalanceWait == 0 {
610+
return nil
611+
}
612+
611613
t.l.Printf("waiting %v for rebalancing\n", rebalanceWait)
612614
_, err := db.ExecContext(ctx, `SET CLUSTER SETTING kv.snapshot_rebalance.max_rate='64MiB'`)
613615
if err != nil {
@@ -617,9 +619,9 @@ func loadTPCCBench(
617619
// Split and scatter the tables. Ramp up to the expected load in the desired
618620
// distribution. This should allow for load-based rebalancing to help
619621
// distribute load. Optionally pass some load configuration-specific flags.
620-
cmd = fmt.Sprintf("./workload run tpcc --warehouses=%d --workers=%d --split --scatter "+
621-
"--wait=false --duration=%s --tolerate-errors %s {pgurl%s}",
622-
b.LoadWarehouses, b.LoadWarehouses, rebalanceWait, partArgs, roachNodes)
622+
cmd = fmt.Sprintf("./workload run tpcc --warehouses=%d --workers=%d "+
623+
"--wait=false --duration=%s --tolerate-errors {pgurl%s}",
624+
b.LoadWarehouses, b.LoadWarehouses, rebalanceWait, roachNodes)
623625
if out, err := c.RunWithBuffer(ctx, c.l, loadNode, cmd); err != nil {
624626
return errors.Wrapf(err, "failed with output %q", string(out))
625627
}
@@ -770,9 +772,9 @@ func runTPCCBench(ctx context.Context, t *test, c *cluster, b tpccBenchSpec) {
770772
case singleLoadgen:
771773
// Nothing.
772774
case singlePartitionedLoadgen:
773-
extraFlags = fmt.Sprintf(` --partitions=%d --split`, b.partitions())
775+
extraFlags = fmt.Sprintf(` --partitions=%d`, b.partitions())
774776
case multiLoadgen:
775-
extraFlags = fmt.Sprintf(" --partitions=%d --partition-affinity=%d --split",
777+
extraFlags = fmt.Sprintf(` --partitions=%d --partition-affinity=%d`,
776778
b.partitions(), groupIdx)
777779
activeWarehouses = warehouses / numLoadGroups
778780
default:

pkg/workload/cli/run.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,7 @@ func runRun(gen workload.Generator, urls []string, dbName string) error {
309309
if !*tolerateErrors {
310310
return err
311311
}
312+
log.Infof(ctx, "retrying after error during init: %v", err)
312313
}
313314
}
314315

@@ -324,9 +325,16 @@ func runRun(gen workload.Generator, urls []string, dbName string) error {
324325
return errors.Errorf(`no operations defined for %s`, gen.Meta().Name)
325326
}
326327
reg := histogram.NewRegistry()
327-
ops, err := o.Ops(urls, reg)
328-
if err != nil {
329-
return err
328+
var ops workload.QueryLoad
329+
for {
330+
ops, err = o.Ops(urls, reg)
331+
if err == nil {
332+
break
333+
}
334+
if !*tolerateErrors {
335+
return err
336+
}
337+
log.Infof(ctx, "retrying after error while creating load: %v", err)
330338
}
331339

332340
const splitConcurrency = 384 // TODO(dan): Don't hardcode this.

pkg/workload/tpcc/ddls.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@
1515
package tpcc
1616

1717
import (
18+
gosql "database/sql"
1819
"fmt"
1920

20-
"github.com/jackc/pgx"
2121
"github.com/pkg/errors"
2222
"golang.org/x/sync/errgroup"
2323
)
@@ -157,7 +157,7 @@ const (
157157
tpccOrderLineSchemaInterleave = ` interleave in parent "order" (ol_w_id, ol_d_id, ol_o_id)`
158158
)
159159

160-
func scatterRanges(db *pgx.ConnPool) {
160+
func scatterRanges(db *gosql.DB) error {
161161
tables := []string{
162162
`customer`,
163163
`district`,
@@ -180,7 +180,5 @@ func scatterRanges(db *pgx.ConnPool) {
180180
return nil
181181
})
182182
}
183-
if err := g.Wait(); err != nil {
184-
panic(err)
185-
}
183+
return g.Wait()
186184
}

0 commit comments

Comments
 (0)