Skip to content

Commit

Permalink
sql: pre-split hash sharded indexes ranges before DELETE_AND_WRITE_ONLY
Browse files Browse the repository at this point in the history
Fixes cockroachdb#74558

Pre-split ranges on shard boundaries before SchemaChanger move
new hash sharded indexes from DELETE_ONLY to DELETE_AND_WRITE_ONLY
state.

Release note (bug fix): When creating hash sharded index to an existing
table, traffic could hit hard on the single range of the index before
it is split into more ranges for shards as range size grows. This change
make schema changer able to presplit ranges on shard boundaries before
the index becomes writable. `sql.hash_sharded_range_pre_split.max` is
the cluster setting added so that users can set the upbound of the
amount of ranges to have. If the bucket count of the defined index is
less than the cluster setting, the bucket count will be the amount of
pre-split ranges.
  • Loading branch information
chengxiong-ruan committed Jan 26, 2022
1 parent f479a04 commit 08504a3
Show file tree
Hide file tree
Showing 7 changed files with 261 additions and 0 deletions.
1 change: 1 addition & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ sql.distsql.max_running_flows integer 500 maximum number of concurrent flows tha
sql.distsql.temp_storage.workmem byte size 64 MiB maximum amount of memory in bytes a processor can use before falling back to temp storage
sql.guardrails.max_row_size_err byte size 512 MiB maximum size of row (or column family if multiple column families are in use) that SQL can write to the database, above which an error is returned; use 0 to disable
sql.guardrails.max_row_size_log byte size 64 MiB maximum size of row (or column family if multiple column families are in use) that SQL can write to the database, above which an event is logged to SQL_PERF (or SQL_INTERNAL_PERF if the mutating statement was internal); use 0 to disable
sql.hash_sharded_range_pre_split.max integer 16 max pre-split ranges to have when adding hash sharded index to an existing table
sql.log.slow_query.experimental_full_table_scans.enabled boolean false when set to true, statements that perform a full table/index scan will be logged to the slow query log even if they do not meet the latency threshold. Must have the slow query log enabled for this setting to have any effect.
sql.log.slow_query.internal_queries.enabled boolean false when set to true, internal queries which exceed the slow query log threshold are logged to a separate log. Must have the slow query log enabled for this setting to have any effect.
sql.log.slow_query.latency_threshold duration 0s when set to non-zero, log statements whose service latency exceeds the threshold to a secondary logger on each node
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@
<tr><td><code>sql.distsql.temp_storage.workmem</code></td><td>byte size</td><td><code>64 MiB</code></td><td>maximum amount of memory in bytes a processor can use before falling back to temp storage</td></tr>
<tr><td><code>sql.guardrails.max_row_size_err</code></td><td>byte size</td><td><code>512 MiB</code></td><td>maximum size of row (or column family if multiple column families are in use) that SQL can write to the database, above which an error is returned; use 0 to disable</td></tr>
<tr><td><code>sql.guardrails.max_row_size_log</code></td><td>byte size</td><td><code>64 MiB</code></td><td>maximum size of row (or column family if multiple column families are in use) that SQL can write to the database, above which an event is logged to SQL_PERF (or SQL_INTERNAL_PERF if the mutating statement was internal); use 0 to disable</td></tr>
<tr><td><code>sql.hash_sharded_range_pre_split.max</code></td><td>integer</td><td><code>16</code></td><td>max pre-split ranges to have when adding hash sharded index to an existing table</td></tr>
<tr><td><code>sql.log.slow_query.experimental_full_table_scans.enabled</code></td><td>boolean</td><td><code>false</code></td><td>when set to true, statements that perform a full table/index scan will be logged to the slow query log even if they do not meet the latency threshold. Must have the slow query log enabled for this setting to have any effect.</td></tr>
<tr><td><code>sql.log.slow_query.internal_queries.enabled</code></td><td>boolean</td><td><code>false</code></td><td>when set to true, internal queries which exceed the slow query log threshold are logged to a separate log. Must have the slow query log enabled for this setting to have any effect.</td></tr>
<tr><td><code>sql.log.slow_query.latency_threshold</code></td><td>duration</td><td><code>0s</code></td><td>when set to non-zero, log statements whose service latency exceeds the threshold to a secondary logger on each node</td></tr>
Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,13 @@ var hashShardedIndexesEnabledClusterMode = settings.RegisterBoolSetting(
false,
).WithPublic()

var maxHashShardedIndexRangePreSplit = settings.RegisterIntSetting(
"sql.hash_sharded_range_pre_split.max",
"max pre-split ranges to have when adding hash sharded index to an existing table",
16,
settings.PositiveInt,
).WithPublic()

var zigzagJoinClusterMode = settings.RegisterBoolSetting(
"sql.defaults.zigzag_join.enabled",
"default value for enable_zigzag_join session setting; allows use of zig-zag join by default",
Expand Down
40 changes: 40 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/hash_sharded_index_presplit
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# AdminSplit is not allowed in multi-tenant.
# LogicTest: !3node-tenant

statement ok
SET experimental_enable_hash_sharded_indexes = on;
CREATE TABLE t_hash_pre_split (
a INT PRIMARY KEY,
b INT
);

query TITTT retry
SELECT t.name, r.table_id, r.index_name, r.start_pretty, r.end_pretty
FROM crdb_internal.tables t
JOIN crdb_internal.ranges r ON t.table_id = r.table_id
WHERE t.name = 't_hash_pre_split'
AND t.state = 'PUBLIC'
AND r.split_enforced_until IS NOT NULL;
----

statement ok
CREATE INDEX t_hash_pre_split_idx_b ON t_hash_pre_split (b) USING HASH WITH BUCKET_COUNT = 8;

query TITTT colnames,retry
SELECT t.name, r.table_id, r.index_name, r.start_pretty, r.end_pretty
FROM crdb_internal.tables t
JOIN crdb_internal.ranges r ON t.table_id = r.table_id
WHERE t.name = 't_hash_pre_split'
AND t.state = 'PUBLIC'
AND r.split_enforced_until IS NOT NULL;
----
name table_id index_name start_pretty end_pretty
t_hash_pre_split 53 t_hash_pre_split_idx_b /Table/53/2 /Table/53/2/0
t_hash_pre_split 53 t_hash_pre_split_idx_b /Table/53/2/0 /Table/53/2/1
t_hash_pre_split 53 t_hash_pre_split_idx_b /Table/53/2/1 /Table/53/2/2
t_hash_pre_split 53 t_hash_pre_split_idx_b /Table/53/2/2 /Table/53/2/3
t_hash_pre_split 53 t_hash_pre_split_idx_b /Table/53/2/3 /Table/53/2/4
t_hash_pre_split 53 t_hash_pre_split_idx_b /Table/53/2/4 /Table/53/2/5
t_hash_pre_split 53 t_hash_pre_split_idx_b /Table/53/2/5 /Table/53/2/6
t_hash_pre_split 53 t_hash_pre_split_idx_b /Table/53/2/6 /Table/53/2/7
t_hash_pre_split 53 t_hash_pre_split_idx_b /Table/53/2/7 /Max
88 changes: 88 additions & 0 deletions pkg/sql/schema_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -1465,6 +1466,13 @@ func (sc *SchemaChanger) runStateMachineAndBackfill(ctx context.Context) error {
if fn := sc.testingKnobs.RunBeforePublishWriteAndDelete; fn != nil {
fn()
}

if sc.execCfg.Codec.ForSystemTenant() {
if err := sc.preSplitHashShardedIndexRanges(ctx); err != nil {
return err
}
}

// Run through mutation state machine before backfill.
if err := sc.RunStateMachineBeforeBackfill(ctx); err != nil {
return err
Expand Down Expand Up @@ -1973,6 +1981,14 @@ type SchemaChangerTestingKnobs struct {
// TwoVersionLeaseViolation is called whenever a schema change transaction is
// unable to commit because it is violating the two version lease invariant.
TwoVersionLeaseViolation func()

// RunBeforeHashShardedIndexRangePreSplit is called before pre-splitting index
// ranges for hash sharded index.
RunBeforeHashShardedIndexRangePreSplit func(tbl *tabledesc.Mutable, kbDB *kv.DB, codec keys.SQLCodec) error

// RunAfterHashShardedIndexRangePreSplit is called after index ranges
// pre-splitting is done for hash sharded index.
RunAfterHashShardedIndexRangePreSplit func(tbl *tabledesc.Mutable, kbDB *kv.DB, codec keys.SQLCodec) error
}

// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
Expand Down Expand Up @@ -2557,6 +2573,78 @@ func (sc *SchemaChanger) getDependentMutationsJobs(
return dependentJobs, nil
}

func (sc *SchemaChanger) preSplitHashShardedIndexRanges(ctx context.Context) error {
if err := sc.txn(ctx, func(
ctx context.Context, txn *kv.Txn, descsCol *descs.Collection,
) error {
hour := hlc.Timestamp{WallTime: timeutil.Now().Add(time.Hour).UnixNano()}
tableDesc, err := descsCol.GetMutableTableByID(
ctx, txn, sc.descID,
tree.ObjectLookupFlags{
CommonLookupFlags: tree.CommonLookupFlags{
IncludeOffline: true,
IncludeDropped: true,
},
},
)
if err != nil {
return err
}

if fn := sc.testingKnobs.RunBeforeHashShardedIndexRangePreSplit; fn != nil {
if err := fn(tableDesc, sc.db, sc.execCfg.Codec); err != nil {
return err
}
}

for _, m := range tableDesc.AllMutations() {
if m.MutationID() != sc.mutationID {
// Mutations are applied in a FIFO order. Only apply the first set of
// mutations if they have the mutation ID we're looking for.
break
}

if idx := m.AsIndex(); m.Adding() && m.DeleteOnly() && idx != nil {
if idx.IsSharded() {
splitAtShards := calculateSplitAtShards(maxHashShardedIndexRangePreSplit.Get(&sc.settings.SV), idx.GetSharded().ShardBuckets)
for _, shard := range splitAtShards {
keyPrefix := sc.execCfg.Codec.IndexPrefix(uint32(tableDesc.GetID()), uint32(idx.GetID()))
splitKey := encoding.EncodeVarintAscending(keyPrefix, shard)
if err := sc.db.SplitAndScatter(ctx, splitKey, hour); err != nil {
return err
}
}
}
}
}

if fn := sc.testingKnobs.RunAfterHashShardedIndexRangePreSplit; fn != nil {
if err := fn(tableDesc, sc.db, sc.execCfg.Codec); err != nil {
return err
}
}

return nil
}); err != nil {
return err
}

return nil
}

// calculateSplitAtShards returns a slice of min(maxSplit, shardBucketCount)
// shard numbers. Shard numbers are sampled with a fix step within
// [0, shardBucketCount) range.
func calculateSplitAtShards(maxSplit int64, shardBucketCount int32) []int64 {
splitCount := int(math.Min(float64(maxSplit), float64(shardBucketCount)))
step := float64(shardBucketCount) / float64(splitCount)
splitAtShards := make([]int64, splitCount)
for i := 0; i < splitCount; i++ {
splitAtShards[i] = int64(math.Floor(float64(i) * step))
}
return splitAtShards
}

// isCurrentMutationDiscarded returns if the current column mutation is made irrelevant
// by a later operation. The nextMutationIdx provides the index at which to check for
// later mutation.
Expand Down
51 changes: 51 additions & 0 deletions pkg/sql/schema_changer_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@ package sql

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/backfill"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/stretchr/testify/require"
)

// TestingDistIndexBackfill exposes the index backfill functionality for
Expand All @@ -35,3 +38,51 @@ func (sc *SchemaChanger) TestingDistIndexBackfill(
func (sc *SchemaChanger) SetJob(job *jobs.Job) {
sc.job = job
}

func TestCalculateSplitAtShards(t *testing.T) {
defer leaktest.AfterTest(t)()

testCases := []struct {
testName string
maxSplit int64
bucketCount int32
expected []int64
}{
{
testName: "buckets_less_than_max_split",
maxSplit: 8,
bucketCount: 0,
expected: []int64{},
},
{
testName: "buckets_less_than_max_split",
maxSplit: 8,
bucketCount: 5,
expected: []int64{0, 1, 2, 3, 4},
},
{
testName: "buckets_equal_to_max_split",
maxSplit: 8,
bucketCount: 8,
expected: []int64{0, 1, 2, 3, 4, 5, 6, 7},
},
{
testName: "buckets_greater_than_max_split_1",
maxSplit: 8,
bucketCount: 30,
expected: []int64{0, 3, 7, 11, 15, 18, 22, 26},
},
{
testName: "buckets_greater_than_max_split_2",
maxSplit: 8,
bucketCount: 1000,
expected: []int64{0, 125, 250, 375, 500, 625, 750, 875},
},
}
for _, tc := range testCases {
t.Run(tc.testName, func(t *testing.T) {
shards := calculateSplitAtShards(tc.maxSplit, tc.bucketCount)
require.Equal(t, tc.expected, shards)
})
}
}
73 changes: 73 additions & 0 deletions pkg/sql/schema_changer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/lease"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/gcjob"
"github.com/cockroachdb/cockroach/pkg/sql/sqltestutils"
Expand Down Expand Up @@ -7491,3 +7492,75 @@ func TestJobsWithoutMutationsAreCancelable(t *testing.T) {
).Scan(&id)
require.Equal(t, scJobID, id)
}

func TestHashShardedIndexRangePreSplit(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

getShardedIndexRanges := func(tableDesc *tabledesc.Mutable, kvDB *kv.DB, codec keys.SQLCodec) ([]kv.KeyValue, error) {
indexSpan := tableDesc.IndexSpan(codec, descpb.IndexID(2))
ranges, err := kvDB.Scan(
ctx,
keys.RangeMetaKey(keys.MustAddr(indexSpan.Key)),
keys.RangeMetaKey(keys.MustAddr(indexSpan.EndKey)),

100,
)
if err != nil {
return nil, err
}
return ranges, nil
}

var runBeforePreSplitting func(tbl *tabledesc.Mutable, kvDB *kv.DB, codec keys.SQLCodec) error
var runAfterPreSplitting func(tbl *tabledesc.Mutable, kvDB *kv.DB, codec keys.SQLCodec) error
params, _ := tests.CreateTestServerParams()
params.Knobs = base.TestingKnobs{
SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{
RunBeforeHashShardedIndexRangePreSplit: func(tbl *tabledesc.Mutable, kvDB *kv.DB, codec keys.SQLCodec) error {
return runBeforePreSplitting(tbl, kvDB, codec)
},
RunAfterHashShardedIndexRangePreSplit: func(tbl *tabledesc.Mutable, kvDB *kv.DB, codec keys.SQLCodec) error {
return runAfterPreSplitting(tbl, kvDB, codec)
},
},
}

s, sqlDB, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop(ctx)
tdb := sqlutils.MakeSQLRunner(sqlDB)

tdb.Exec(t, `
CREATE DATABASE t;
CREATE TABLE t.test_split(a INT PRIMARY KEY, b INT NOT NULL);
`,
)

runBeforePreSplitting = func(tableDesc *tabledesc.Mutable, kvDB *kv.DB, codec keys.SQLCodec) error {
ranges, err := getShardedIndexRanges(tableDesc, kvDB, codec)
if err != nil {
return err
}
if len(ranges) != 0 {
return errors.Newf("expected 0 ranges but found %d", len(ranges))
}
return nil
}

runAfterPreSplitting = func(tableDesc *tabledesc.Mutable, kvDB *kv.DB, codec keys.SQLCodec) error {
ranges, err := getShardedIndexRanges(tableDesc, kvDB, codec)
if err != nil {
return err
}
if len(ranges) != 8 {
return errors.Newf("expected 8 ranges but found %d", len(ranges))
}
return nil
}

tdb.Exec(t, `
SET experimental_enable_hash_sharded_indexes = on;
CREATE INDEX idx_test_split_b ON t.test_split (b) USING HASH WITH BUCKET_COUNT = 8;
`)
}

0 comments on commit 08504a3

Please sign in to comment.