Skip to content

Commit 09abc26

Browse files
committed
workload: Allow multi-region TPC-C to use IMPORT
Previously, multi-region TPC-C could only leverage insert for data loading. This commit enables the IMPORT path by adding the pre-create step to the workload Setup. The commit also works around a problem in IMPORT where for multi-region databases, if certain table types (REGIONAL BY ROW, REGIONAL BY TABLE IN) are created during a running IMPORT, the IMPORT will fail due to the fact that the crdb_internal_region type is modified under the covers (to install back references). To work around this problem, we pre-create the tables before running an IMPORT INTO. Release note (bug fix): Fixes IMPORT in tpcc workload.
1 parent d7d6a17 commit 09abc26

File tree

4 files changed

+65
-26
lines changed

4 files changed

+65
-26
lines changed

pkg/ccl/workloadccl/fixture.go

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,19 @@ func ImportFixture(
401401
pathPrefix = `workload://`
402402
}
403403

404+
// Pre-create tables. It's required that we pre-create the tables before we
405+
// parallelize the IMPORT because for multi-region setups, the create table
406+
// will end up modifying the crdb_internal_region type (to install back
407+
// references). If create table is done in parallel with IMPORT, some IMPORT
408+
// jobs may fail because the type is being modified concurrently with the
409+
// IMPORT. Removing the need to pre-create is being tracked with #70987.
410+
for _, table := range tables {
411+
err := createFixtureTable(sqlDB, dbName, table)
412+
if err != nil {
413+
return 0, errors.Wrapf(err, `creating table %s`, table.Name)
414+
}
415+
}
416+
404417
for _, t := range tables {
405418
table := t
406419
paths := csvServerPaths(pathPrefix, gen, table, numNodes*filesPerNode)
@@ -417,6 +430,16 @@ func ImportFixture(
417430
return atomic.LoadInt64(&bytesAtomic), nil
418431
}
419432

433+
func createFixtureTable(sqlDB *gosql.DB, dbName string, table workload.Table) error {
434+
qualifiedTableName := makeQualifiedTableName(dbName, &table)
435+
createTable := fmt.Sprintf(
436+
`CREATE TABLE IF NOT EXISTS %s %s`,
437+
qualifiedTableName,
438+
table.Schema)
439+
_, err := sqlDB.Exec(createTable)
440+
return err
441+
}
442+
420443
func importFixtureTable(
421444
ctx context.Context,
422445
sqlDB *gosql.DB,
@@ -429,8 +452,9 @@ func importFixtureTable(
429452
start := timeutil.Now()
430453
var buf bytes.Buffer
431454
var params []interface{}
455+
432456
qualifiedTableName := makeQualifiedTableName(dbName, &table)
433-
fmt.Fprintf(&buf, `IMPORT TABLE %s %s CSV DATA (`, qualifiedTableName, table.Schema)
457+
fmt.Fprintf(&buf, `IMPORT INTO %s CSV DATA (`, qualifiedTableName)
434458
// Generate $1,...,$N-1, where N is the number of csv paths.
435459
for _, path := range paths {
436460
params = append(params, path)

pkg/ccl/workloadccl/fixture_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -196,9 +196,9 @@ func TestImportFixture(t *testing.T) {
196196

197197
// Since we did not inject stats, the IMPORT should have triggered
198198
// automatic stats collection.
199-
sqlDB.CheckQueryResultsRetry(t,
200-
`SELECT statistics_name, column_names, row_count, distinct_count, null_count
201-
FROM [SHOW STATISTICS FOR TABLE ingest.fx]`,
199+
statsQuery := fmt.Sprintf(`SELECT statistics_name, column_names, row_count, distinct_count, null_count
200+
FROM [SHOW STATISTICS FOR TABLE ingest.fx] WHERE row_count = %d`, fixtureTestGenRows)
201+
sqlDB.CheckQueryResultsRetry(t, statsQuery,
202202
[][]string{
203203
{"__auto__", "{key}", "10", "10", "0"},
204204
{"__auto__", "{value}", "10", "1", "0"},

pkg/workload/tpcc/tpcc.go

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,31 @@ var tpccMeta = workload.Meta{
230230
},
231231
}
232232

233+
func queryDatabaseRegions(db *gosql.DB) (map[string]struct{}, error) {
234+
regions := make(map[string]struct{})
235+
rows, err := db.Query(`SELECT region FROM [SHOW REGIONS FROM DATABASE]`)
236+
if err != nil {
237+
return regions, err
238+
}
239+
defer func() {
240+
_ = rows.Close()
241+
}()
242+
for rows.Next() {
243+
if rows.Err() != nil {
244+
return regions, err
245+
}
246+
var region string
247+
if err := rows.Scan(&region); err != nil {
248+
return regions, err
249+
}
250+
regions[region] = struct{}{}
251+
}
252+
if rows.Err() != nil {
253+
return regions, err
254+
}
255+
return regions, nil
256+
}
257+
233258
// Meta implements the Generator interface.
234259
func (*tpcc) Meta() workload.Meta { return tpccMeta }
235260

@@ -354,27 +379,10 @@ func (w *tpcc) Hooks() workload.Hooks {
354379
return nil
355380
}
356381

357-
regions := make(map[string]struct{})
358-
rows, err := db.Query(`SELECT region FROM [SHOW REGIONS FROM DATABASE]`)
382+
regions, err := queryDatabaseRegions(db)
359383
if err != nil {
360384
return err
361385
}
362-
defer func() {
363-
_ = rows.Close()
364-
}()
365-
for rows.Next() {
366-
if rows.Err() != nil {
367-
return err
368-
}
369-
var region string
370-
if err := rows.Scan(&region); err != nil {
371-
return err
372-
}
373-
regions[region] = struct{}{}
374-
}
375-
if rows.Err() != nil {
376-
return err
377-
}
378386

379387
var dbName string
380388
if err := db.QueryRow(`SHOW DATABASE`).Scan(&dbName); err != nil {

pkg/workload/workloadsql/workloadsql.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,17 @@ import (
3636
func Setup(
3737
ctx context.Context, db *gosql.DB, gen workload.Generator, l workload.InitialDataLoader,
3838
) (int64, error) {
39+
var hooks workload.Hooks
40+
if h, ok := gen.(workload.Hookser); ok {
41+
hooks = h.Hooks()
42+
}
43+
44+
if hooks.PreCreate != nil {
45+
if err := hooks.PreCreate(db); err != nil {
46+
return 0, errors.Wrapf(err, "Could not pre-create")
47+
}
48+
}
49+
3950
bytes, err := l.InitialDataLoad(ctx, db, gen)
4051
if err != nil {
4152
return 0, err
@@ -48,10 +59,6 @@ func Setup(
4859
}
4960
}
5061

51-
var hooks workload.Hooks
52-
if h, ok := gen.(workload.Hookser); ok {
53-
hooks = h.Hooks()
54-
}
5562
if hooks.PostLoad != nil {
5663
if err := hooks.PostLoad(db); err != nil {
5764
return 0, errors.Wrapf(err, "Could not postload")

0 commit comments

Comments
 (0)