Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

workload/bank: batch initial row generation #39719

Merged
merged 1 commit into from
Aug 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func backupRestoreTestSetupWithParams(
if numAccounts == 0 {
splits = 0
}
bankData := bank.FromConfig(numAccounts, payloadSize, splits)
bankData := bank.FromConfig(numAccounts, numAccounts, payloadSize, splits)

sqlDB = sqlutils.MakeSQLRunner(tc.Conns[0])
sqlDB.Exec(t, `CREATE DATABASE data`)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/validations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestCatchupScanOrdering(t *testing.T) {
t.Run("bank", func(t *testing.T) {
ctx := context.Background()
const numRows, numRanges, payloadBytes, maxTransfer = 10, 10, 10, 999
gen := bank.FromConfig(numRows, payloadBytes, numRanges)
gen := bank.FromConfig(numRows, numRows, payloadBytes, numRanges)
var l workloadsql.InsertsDataLoader
if _, err := workloadsql.Setup(ctx, db, gen, l); err != nil {
t.Fatal(err)
Expand Down
11 changes: 6 additions & 5 deletions pkg/ccl/storageccl/export_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ func TestWorkloadStorage(t *testing.T) {
settings := cluster.MakeTestingClusterSettings()

rows, payloadBytes, ranges := 4, 12, 1
gen := bank.FromConfig(rows, payloadBytes, ranges)
gen := bank.FromConfig(rows, rows, payloadBytes, ranges)
bankTable := gen.Tables()[0]
bankURL := func(extraParams ...map[string]string) *url.URL {
params := url.Values{`version`: []string{gen.Meta().Version}}
Expand Down Expand Up @@ -559,14 +559,15 @@ func TestWorkloadStorage(t *testing.T) {
require.NoError(t, err)
require.Equal(t, strings.TrimSpace(`
0,0,initial-dTqn
1,0,initial-vOpi
2,0,initial-qMvo
3,0,initial-nKir
1,0,initial-Pkyk
2,0,initial-eJkM
3,0,initial-TlNb
`), strings.TrimSpace(string(bytes)))
}

{
params := map[string]string{`row-start`: `1`, `row-end`: `3`, `payload-bytes`: `14`}
params := map[string]string{
`row-start`: `1`, `row-end`: `3`, `payload-bytes`: `14`, `batch-size`: `1`}
s, err := ExportStorageFromURI(ctx, bankURL(params).String(), settings)
require.NoError(t, err)
r, err := s.ReadFile(ctx, ``)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/utilccl/sampledataccl/bankdata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestToBackup(t *testing.T) {
for _, chunkBytes := range chunkBytesSizes {
t.Run(fmt.Sprintf("rows=%d/chunk=%d", rows, chunkBytes), func(t *testing.T) {
dir := fmt.Sprintf("%d-%d", rows, chunkBytes)
data := bank.FromConfig(rows, payloadBytes, ranges).Tables()[0]
data := bank.FromConfig(rows, rows, payloadBytes, ranges).Tables()[0]
backup, err := toBackup(t, data, filepath.Join(outerDir, dir), chunkBytes)
if err != nil {
t.Fatalf("%+v", err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/workloadccl/allccl/all_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func TestDeterministicInitialData(t *testing.T) {
// TODO(dan): We're starting to accumulate these various lists, bigInitialData
// is another. Consider moving them to be properties on the workload.Meta.
fingerprintGoldens := map[string]uint64{
`bank`: 0x1603d103c14d0364,
`bank`: 0x7b4d519ed8bd07ce,
`bulkingest`: 0xcf3e4028ac084aea,
`indexes`: 0xcbf29ce484222325,
`intro`: 0x81c6a8cfd9c3452a,
Expand Down
59 changes: 42 additions & 17 deletions pkg/workload/bank/bank.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
)`

defaultRows = 1000
defaultBatchSize = 1000
defaultPayloadBytes = 100
defaultRanges = 10
maxTransfer = 999
Expand All @@ -45,8 +46,9 @@ type bank struct {
flags workload.Flags
connFlags *workload.ConnFlags

seed uint64
rows, payloadBytes, ranges int
seed uint64
rows, batchSize int
payloadBytes, ranges int
}

func init() {
Expand All @@ -61,8 +63,12 @@ var bankMeta = workload.Meta{
New: func() workload.Generator {
g := &bank{}
g.flags.FlagSet = pflag.NewFlagSet(`bank`, pflag.ContinueOnError)
g.flags.Meta = map[string]workload.FlagMeta{
`batch-size`: {RuntimeOnly: true},
}
g.flags.Uint64Var(&g.seed, `seed`, 1, `Key hash seed.`)
g.flags.IntVar(&g.rows, `rows`, defaultRows, `Initial number of accounts in bank table.`)
g.flags.IntVar(&g.batchSize, `batch-size`, defaultBatchSize, `Number of rows in each batch of initial data.`)
g.flags.IntVar(&g.payloadBytes, `payload-bytes`, defaultPayloadBytes, `Size of the payload field in each initial row.`)
g.flags.IntVar(&g.ranges, `ranges`, defaultRanges, `Initial number of ranges in bank table.`)
g.connFlags = workload.NewConnFlags(&g.flags)
Expand All @@ -73,18 +79,22 @@ var bankMeta = workload.Meta{
// FromRows returns Bank testdata with the given number of rows and default
// payload size and range count.
func FromRows(rows int) workload.Generator {
return FromConfig(rows, defaultPayloadBytes, defaultRanges)
return FromConfig(rows, 1, defaultPayloadBytes, defaultRanges)
}

// FromConfig returns a one table testdata with three columns: an `id INT
// PRIMARY KEY` representing an account number, a `balance` INT, and a `payload`
// BYTES to pad the size of the rows for various tests.
func FromConfig(rows int, payloadBytes int, ranges int) workload.Generator {
func FromConfig(rows int, batchSize int, payloadBytes int, ranges int) workload.Generator {
if ranges > rows {
ranges = rows
}
if batchSize <= 0 {
batchSize = defaultBatchSize
}
return workload.FromFlags(bankMeta,
fmt.Sprintf(`--rows=%d`, rows),
fmt.Sprintf(`--batch-size=%d`, batchSize),
fmt.Sprintf(`--payload-bytes=%d`, payloadBytes),
fmt.Sprintf(`--ranges=%d`, ranges),
)
Expand All @@ -105,6 +115,9 @@ func (b *bank) Hooks() workload.Hooks {
"Value of 'rows' (%d) must be greater than or equal to value of 'ranges' (%d)",
b.rows, b.ranges)
}
if b.batchSize <= 0 {
return errors.Errorf(`Value of batch-size must be greater than zero; was %d`, b.batchSize)
}
return nil
},
}
Expand All @@ -118,23 +131,35 @@ var bankColTypes = []coltypes.T{

// Tables implements the Generator interface.
func (b *bank) Tables() []workload.Table {
numBatches := (b.rows + b.batchSize - 1) / b.batchSize // ceil(b.rows/b.batchSize)
table := workload.Table{
Name: `bank`,
Schema: bankSchema,
InitialRows: workload.BatchedTuples{
NumBatches: b.rows,
FillBatch: func(rowIdx int, cb coldata.Batch, a *bufalloc.ByteAllocator) {
rng := rand.NewSource(b.seed + uint64(rowIdx))
var payload []byte
*a, payload = a.Alloc(b.payloadBytes, 0 /* extraCap */)
const initialPrefix = `initial-`
copy(payload[:len(initialPrefix)], []byte(initialPrefix))
randStringLetters(rng, payload[len(initialPrefix):])

cb.Reset(bankColTypes, 1)
cb.ColVec(0).Int64()[0] = int64(rowIdx) // id
cb.ColVec(1).Int64()[0] = 0 // balance
cb.ColVec(2).Bytes().Set(0, payload) // payload
NumBatches: numBatches,
FillBatch: func(batchIdx int, cb coldata.Batch, a *bufalloc.ByteAllocator) {
rng := rand.NewSource(b.seed + uint64(batchIdx))

rowBegin, rowEnd := batchIdx*b.batchSize, (batchIdx+1)*b.batchSize
if rowEnd > b.rows {
rowEnd = b.rows
}
cb.Reset(bankColTypes, rowEnd-rowBegin)
idCol := cb.ColVec(0).Int64()
balanceCol := cb.ColVec(1).Int64()
payloadCol := cb.ColVec(2).Bytes()
for rowIdx := rowBegin; rowIdx < rowEnd; rowIdx++ {
var payload []byte
*a, payload = a.Alloc(b.payloadBytes, 0 /* extraCap */)
const initialPrefix = `initial-`
copy(payload[:len(initialPrefix)], []byte(initialPrefix))
randStringLetters(rng, payload[len(initialPrefix):])

rowOffset := rowIdx - rowBegin
idCol[rowOffset] = int64(rowIdx)
balanceCol[rowOffset] = 0
payloadCol.Set(rowOffset, payload)
}
},
},
Splits: workload.Tuples(
Expand Down
2 changes: 1 addition & 1 deletion pkg/workload/bank/bank_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestBank(t *testing.T) {
sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `DROP TABLE IF EXISTS bank`)

bank := FromConfig(test.rows, defaultPayloadBytes, test.ranges)
bank := FromConfig(test.rows, test.rows, defaultPayloadBytes, test.ranges)
bankTable := bank.Tables()[0]
sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE %s %s`, bankTable.Name, bankTable.Schema))

Expand Down
2 changes: 1 addition & 1 deletion pkg/workload/csv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestHandleCSV(t *testing.T) {
0,0,initial-dTqnRurXztAPkykhZWvsCmeJkMwRNcJAvTlNbgUEYfagEQJaHmfPsquKZUBOGwpAjPtATpGXFJkrtQCEJODSlmQctvyh`,
},
{
`?rows=5&row-start=1&row-end=3`, `
`?rows=5&row-start=1&row-end=3&batch-size=1`, `
1,0,initial-vOpikzTTWxvMqnkpfEIVXgGyhZNDqvpVqpNnHawruAcIVltgbnIEIGmCDJcnkVkfVmAcutkMvRACFuUBPsZTemTDSfZT
2,0,initial-qMvoPeRiOBXvdVQxhZUfdmehETKPXyBaVWxzMqwiStIkxfoDFygYxIDyXiaVEarcwMboFhBlCAapvKijKAyjEAhRBNZz`,
},
Expand Down