Skip to content

Commit

Permalink
Merge pull request #10369 from vivekmenezes/backfill
Browse files Browse the repository at this point in the history
sql: Use BackfillChunkSize testing knob in all tests
  • Loading branch information
vivekmenezes authored Nov 2, 2016
2 parents 2709a72 + 3db9eb9 commit 0b3d611
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 42 deletions.
24 changes: 12 additions & 12 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,27 +35,27 @@ const (
// TODO(vivek): Replace these constants with a runtime budget for the
// operation chunk involved.

// ColumnTruncateAndBackfillChunkSize is the maximum number of columns
// columnTruncateAndBackfillChunkSize is the maximum number of columns
// processed per chunk during column truncate or backfill.
ColumnTruncateAndBackfillChunkSize = 200
columnTruncateAndBackfillChunkSize = 200

// IndexTruncateChunkSize is the maximum number of index entries truncated
// indexTruncateChunkSize is the maximum number of index entries truncated
// per chunk during an index truncation. This value is larger than the
// other chunk constants because the operation involves only running a
// DeleteRange().
IndexTruncateChunkSize = 600
indexTruncateChunkSize = 600

// IndexBackfillChunkSize is the maximum number index entries backfilled
// indexBackfillChunkSize is the maximum number index entries backfilled
// per chunk during an index backfill. The index backfill involves a table
// scan, and a number of individual ops presented in a batch. This value
// is smaller than ColumnTruncateAndBackfillChunkSize, because it involves
// a number of individual index row updates that can be scattered over
// many ranges.
IndexBackfillChunkSize = 100
indexBackfillChunkSize = 100

// CheckpointInterval is the interval after which a checkpoint of the
// checkpointInterval is the interval after which a checkpoint of the
// schema change is posted.
CheckpointInterval = 10 * time.Second
checkpointInterval = 10 * time.Second
)

func makeColIDtoRowIndex(
Expand Down Expand Up @@ -257,7 +257,7 @@ func (sc *SchemaChanger) maybeWriteResumeSpan(
mutationIdx int,
lastCheckpoint *time.Time,
) error {
checkpointInterval := CheckpointInterval
checkpointInterval := checkpointInterval
if sc.testingKnobs.WriteCheckpointInterval > 0 {
checkpointInterval = sc.testingKnobs.WriteCheckpointInterval
}
Expand Down Expand Up @@ -308,7 +308,7 @@ func (sc *SchemaChanger) truncateAndBackfillColumns(
}

// Run through the entire table key space adding and deleting columns.
chunkSize := sc.getChunkSize(ColumnTruncateAndBackfillChunkSize)
chunkSize := sc.getChunkSize(columnTruncateAndBackfillChunkSize)
// Evaluate default values.
updateCols := append(added, dropped...)
updateValues := make(parser.DTuple, len(updateCols))
Expand Down Expand Up @@ -498,7 +498,7 @@ func (sc *SchemaChanger) truncateIndexes(
dropped []sqlbase.IndexDescriptor,
mutationIdx int,
) error {
chunkSize := sc.getChunkSize(IndexTruncateChunkSize)
chunkSize := sc.getChunkSize(indexTruncateChunkSize)
if sc.testingKnobs.BackfillChunkSize > 0 {
chunkSize = sc.testingKnobs.BackfillChunkSize
}
Expand Down Expand Up @@ -580,7 +580,7 @@ func (sc *SchemaChanger) backfillIndexes(
}

// Backfill the index entries for all the rows.
chunkSize := sc.getChunkSize(IndexBackfillChunkSize)
chunkSize := sc.getChunkSize(indexBackfillChunkSize)
lastCheckpoint := timeutil.Now()
for row, done := int64(0), false; !done; row += chunkSize {
// First extend the schema change lease.
Expand Down
16 changes: 14 additions & 2 deletions pkg/sql/drop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,11 +201,17 @@ CREATE INDEX foo on t.kv (v);

func TestDropIndex(t *testing.T) {
defer leaktest.AfterTest(t)()
const chunkSize = 200
params, _ := createTestServerParams()
params.Knobs = base.TestingKnobs{
SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{
BackfillChunkSize: chunkSize,
},
}
s, sqlDB, kvDB := serverutils.StartServer(t, params)
defer s.Stopper().Stop()

numRows := 2*sql.IndexBackfillChunkSize + 1
numRows := 2*chunkSize + 1
createKVTable(t, sqlDB, numRows)

tableDesc := sqlbase.GetTableDescriptor(kvDB, "t", "kv")
Expand Down Expand Up @@ -272,11 +278,17 @@ CREATE INDEX intlv_idx ON intlv (k, n) INTERLEAVE IN PARENT kv (k);

func TestDropIndexInterleaved(t *testing.T) {
defer leaktest.AfterTest(t)()
const chunkSize = 200
params, _ := createTestServerParams()
params.Knobs = base.TestingKnobs{
SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{
BackfillChunkSize: chunkSize,
},
}
s, sqlDB, kvDB := serverutils.StartServer(t, params)
defer s.Stopper().Stop()

numRows := 2*sql.IndexBackfillChunkSize + 1
numRows := 2*chunkSize + 1
createKVInterleavedTable(t, sqlDB, numRows)

tableDesc := sqlbase.GetTableDescriptor(kvDB, "t", "kv")
Expand Down
51 changes: 24 additions & 27 deletions pkg/sql/schema_changer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,16 @@ func runSchemaChangeWithOperations(
}
}

// bulkInsertIntoTable fills up table t.test with (maxValue + 1) rows.
func bulkInsertIntoTable(sqlDB *gosql.DB, maxValue int) error {
inserts := make([]string, maxValue+1)
for i := 0; i < maxValue+1; i++ {
inserts[i] = fmt.Sprintf(`(%d, %d)`, i, maxValue-i)
}
_, err := sqlDB.Exec(`INSERT INTO t.test VALUES ` + strings.Join(inserts, ","))
return err
}

// Test schema change backfills are not affected by various operations
// that run simultaneously.
func TestRaceWithBackfill(t *testing.T) {
Expand Down Expand Up @@ -571,11 +581,7 @@ CREATE UNIQUE INDEX vidx ON t.test (v);

// Bulk insert.
maxValue := 4000
insert := fmt.Sprintf(`INSERT INTO t.test VALUES (%d, %d)`, 0, maxValue)
for i := 1; i <= maxValue; i++ {
insert += fmt.Sprintf(` ,(%d, %d)`, i, maxValue-i)
}
if _, err := sqlDB.Exec(insert); err != nil {
if err := bulkInsertIntoTable(sqlDB, maxValue); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -765,8 +771,7 @@ func TestAbortSchemaChangeBackfill(t *testing.T) {
<-commandsDone
},
AsyncExecNotification: asyncSchemaChangerDisabled,
// Set the backfill chunk size for all the backfill operations.
BackfillChunkSize: maxValue,
BackfillChunkSize: maxValue,
},
}
server, sqlDB, kvDB := serverutils.StartServer(t, params)
Expand Down Expand Up @@ -934,11 +939,7 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v INT);

// Bulk insert.
maxValue := 5000
insert := fmt.Sprintf(`INSERT INTO t.test VALUES (%d, %d)`, 0, maxValue)
for i := 1; i <= maxValue; i++ {
insert += fmt.Sprintf(` ,(%d, %d)`, i, maxValue-i)
}
if _, err := sqlDB.Exec(insert); err != nil {
if err := bulkInsertIntoTable(sqlDB, maxValue); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -1034,6 +1035,7 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v INT);
func TestSchemaChangePurgeFailure(t *testing.T) {
defer leaktest.AfterTest(t)()
params, _ := createTestServerParams()
const chunkSize = 200
// Disable the async schema changer.
var enableAsyncSchemaChanges uint32
attempts := 0
Expand Down Expand Up @@ -1061,7 +1063,8 @@ func TestSchemaChangePurgeFailure(t *testing.T) {
},
// Speed up evaluation of async schema changes so that it
// processes a purged schema change quickly.
AsyncExecQuickly: true,
AsyncExecQuickly: true,
BackfillChunkSize: chunkSize,
},
}
server, sqlDB, kvDB := serverutils.StartServer(t, params)
Expand All @@ -1075,12 +1078,8 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v INT);
}

// Bulk insert.
maxValue := csql.IndexBackfillChunkSize + 1
insert := fmt.Sprintf(`INSERT INTO t.test VALUES (%d, %d)`, 0, maxValue)
for i := 1; i <= maxValue; i++ {
insert += fmt.Sprintf(` ,(%d, %d)`, i, maxValue-i)
}
if _, err := sqlDB.Exec(insert); err != nil {
const maxValue = chunkSize + 1
if err := bulkInsertIntoTable(sqlDB, maxValue); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -1125,7 +1124,7 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v INT);
// There is still some garbage index data that needs to be purged. All the
// rows from k = 0 to k = maxValue have index values. The k = maxValue + 1
// row with the conflict doesn't contain an index value.
numGarbageValues := csql.IndexBackfillChunkSize
numGarbageValues := chunkSize
tablePrefix := roachpb.Key(keys.MakeTablePrefix(uint32(tableDesc.ID)))
tableEnd := tablePrefix.PrefixEnd()
if kvs, err := kvDB.Scan(context.TODO(), tablePrefix, tableEnd, 0); err != nil {
Expand Down Expand Up @@ -1165,6 +1164,7 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v INT);
func TestSchemaChangeReverseMutations(t *testing.T) {
defer leaktest.AfterTest(t)()
params, _ := createTestServerParams()
const chunkSize = 200
// Disable synchronous schema change processing so that the mutations get
// processed asynchronously.
var enableAsyncSchemaChanges uint32
Expand All @@ -1179,7 +1179,8 @@ func TestSchemaChangeReverseMutations(t *testing.T) {
}
return nil
},
AsyncExecQuickly: true,
AsyncExecQuickly: true,
BackfillChunkSize: chunkSize,
},
}
s, sqlDB, kvDB := serverutils.StartServer(t, params)
Expand All @@ -1194,12 +1195,8 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v INT);
}

// Add some data
maxValue := csql.IndexBackfillChunkSize + 1
insert := fmt.Sprintf(`INSERT INTO t.test VALUES (%d, %d)`, 0, maxValue)
for i := 1; i <= maxValue; i++ {
insert += fmt.Sprintf(` ,(%d, %d)`, i, maxValue-i)
}
if _, err := sqlDB.Exec(insert); err != nil {
const maxValue = chunkSize + 1
if err := bulkInsertIntoTable(sqlDB, maxValue); err != nil {
t.Fatal(err)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/truncate.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (

// TableTruncateChunkSize is the maximum number of keys deleted per chunk
// during a table truncation.
const TableTruncateChunkSize = IndexTruncateChunkSize
const TableTruncateChunkSize = indexTruncateChunkSize

// Truncate deletes all rows from a table.
// Privileges: DROP on table.
Expand Down

0 comments on commit 0b3d611

Please sign in to comment.