From 40a9206ad943d77dd04c946102c6e4f2cb9a6d73 Mon Sep 17 00:00:00 2001 From: Chengxiong Ruan Date: Mon, 17 Jan 2022 21:16:56 -0500 Subject: [PATCH] sql: pre-split hash sharded indexes ranges before DELETE_AND_WRITE_ONLY Fixes #74558 Pre-split ranges on shard boundaries before SchemaChanger move new hash sharded indexes from DELETE_ONLY to DELETE_AND_WRITE_ONLY state. Release note (bug fix): When creating hash sharded index to an existing table, traffic could hit hard on the single range of the index before it is split into more ranges for shards as range size grows. This change make schema changer able to presplit ranges on shard boundaries before the index becomes writable. `sql.hash_sharded_range_pre_split.max` is the cluster setting added so that users can set the upbound of the amount of ranges to have. If the bucket count of the defined index is less than the cluster setting, the bucket count will be the amount of pre-split ranges. --- docs/generated/settings/settings.html | 1 + pkg/sql/exec_util.go | 7 ++ .../logic_test/hash_sharded_index_presplit | 40 +++++++++ pkg/sql/schema_changer.go | 88 +++++++++++++++++++ pkg/sql/schema_changer_helpers_test.go | 51 +++++++++++ pkg/sql/schema_changer_test.go | 73 +++++++++++++++ 6 files changed, 260 insertions(+) create mode 100644 pkg/sql/logictest/testdata/logic_test/hash_sharded_index_presplit diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index cb5d818d2e16..0fa18190bdb7 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -130,6 +130,7 @@ sql.distsql.temp_storage.workmembyte size64 MiBmaximum amount of memory in bytes a processor can use before falling back to temp storage sql.guardrails.max_row_size_errbyte size512 MiBmaximum size of row (or column family if multiple column families are in use) that SQL can write to the database, above which an error is returned; use 0 to disable sql.guardrails.max_row_size_logbyte size64 MiBmaximum size of row (or column family if multiple column families are in use) that SQL can write to the database, above which an event is logged to SQL_PERF (or SQL_INTERNAL_PERF if the mutating statement was internal); use 0 to disable +sql.hash_sharded_range_pre_split.maxinteger16max pre-split ranges to have when adding hash sharded index to an existing table sql.log.slow_query.experimental_full_table_scans.enabledbooleanfalsewhen set to true, statements that perform a full table/index scan will be logged to the slow query log even if they do not meet the latency threshold. Must have the slow query log enabled for this setting to have any effect. sql.log.slow_query.internal_queries.enabledbooleanfalsewhen set to true, internal queries which exceed the slow query log threshold are logged to a separate log. Must have the slow query log enabled for this setting to have any effect. sql.log.slow_query.latency_thresholdduration0swhen set to non-zero, log statements whose service latency exceeds the threshold to a secondary logger on each node diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 97874c4f9427..acd151f35949 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -297,6 +297,13 @@ var hashShardedIndexesEnabledClusterMode = settings.RegisterBoolSetting( false, ).WithPublic() +var maxHashShardedIndexRangePreSplit = settings.RegisterIntSetting( + "sql.hash_sharded_range_pre_split.max", + "max pre-split ranges to have when adding hash sharded index to an existing table", + 16, + settings.PositiveInt, +).WithPublic() + var zigzagJoinClusterMode = settings.RegisterBoolSetting( "sql.defaults.zigzag_join.enabled", "default value for enable_zigzag_join session setting; allows use of zig-zag join by default", diff --git a/pkg/sql/logictest/testdata/logic_test/hash_sharded_index_presplit b/pkg/sql/logictest/testdata/logic_test/hash_sharded_index_presplit new file mode 100644 index 000000000000..13f5eac6ce3b --- /dev/null +++ b/pkg/sql/logictest/testdata/logic_test/hash_sharded_index_presplit @@ -0,0 +1,40 @@ +# AdminSplit is not allowed in multi-tenant. +# LogicTest: !3node-tenant + +statement ok +SET experimental_enable_hash_sharded_indexes = on; +CREATE TABLE t_hash_pre_split ( + a INT PRIMARY KEY, + b INT +); + +query TITTT retry +SELECT t.name, r.table_id, r.index_name, r.start_pretty, r.end_pretty +FROM crdb_internal.tables t +JOIN crdb_internal.ranges r ON t.table_id = r.table_id +WHERE t.name = 't_hash_pre_split' +AND t.state = 'PUBLIC' +AND r.split_enforced_until IS NOT NULL; +---- + +statement ok +CREATE INDEX t_hash_pre_split_idx_b ON t_hash_pre_split (b) USING HASH WITH BUCKET_COUNT = 8; + +query TITTT colnames,retry +SELECT t.name, r.table_id, r.index_name, r.start_pretty, r.end_pretty +FROM crdb_internal.tables t +JOIN crdb_internal.ranges r ON t.table_id = r.table_id +WHERE t.name = 't_hash_pre_split' +AND t.state = 'PUBLIC' +AND r.split_enforced_until IS NOT NULL; +---- +name table_id index_name start_pretty end_pretty +t_hash_pre_split 92 t_hash_pre_split_idx_b /Table/92/2 /Table/92/2/0 +t_hash_pre_split 92 t_hash_pre_split_idx_b /Table/92/2/0 /Table/92/2/1 +t_hash_pre_split 92 t_hash_pre_split_idx_b /Table/92/2/1 /Table/92/2/2 +t_hash_pre_split 92 t_hash_pre_split_idx_b /Table/92/2/2 /Table/92/2/3 +t_hash_pre_split 92 t_hash_pre_split_idx_b /Table/92/2/3 /Table/92/2/4 +t_hash_pre_split 92 t_hash_pre_split_idx_b /Table/92/2/4 /Table/92/2/5 +t_hash_pre_split 92 t_hash_pre_split_idx_b /Table/92/2/5 /Table/92/2/6 +t_hash_pre_split 92 t_hash_pre_split_idx_b /Table/92/2/6 /Table/92/2/7 +t_hash_pre_split 92 t_hash_pre_split_idx_b /Table/92/2/7 /Max diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index d1e3da71bdcd..f152001d05ae 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -48,6 +48,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlerrors" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" + "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/grpcutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -1465,6 +1466,13 @@ func (sc *SchemaChanger) runStateMachineAndBackfill(ctx context.Context) error { if fn := sc.testingKnobs.RunBeforePublishWriteAndDelete; fn != nil { fn() } + + if sc.execCfg.Codec.ForSystemTenant() { + if err := sc.preSplitHashShardedIndexRanges(ctx); err != nil { + return err + } + } + // Run through mutation state machine before backfill. if err := sc.RunStateMachineBeforeBackfill(ctx); err != nil { return err @@ -1973,6 +1981,14 @@ type SchemaChangerTestingKnobs struct { // TwoVersionLeaseViolation is called whenever a schema change transaction is // unable to commit because it is violating the two version lease invariant. TwoVersionLeaseViolation func() + + // RunBeforeHashShardedIndexRangePreSplit is called before pre-splitting index + // ranges for hash sharded index. + RunBeforeHashShardedIndexRangePreSplit func(tbl *tabledesc.Mutable, kbDB *kv.DB, codec keys.SQLCodec) error + + // RunAfterHashShardedIndexRangePreSplit is called after index ranges + // pre-splitting is done for hash sharded index. + RunAfterHashShardedIndexRangePreSplit func(tbl *tabledesc.Mutable, kbDB *kv.DB, codec keys.SQLCodec) error } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. @@ -2557,6 +2573,78 @@ func (sc *SchemaChanger) getDependentMutationsJobs( return dependentJobs, nil } +func (sc *SchemaChanger) preSplitHashShardedIndexRanges(ctx context.Context) error { + if err := sc.txn(ctx, func( + ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, + ) error { + hour := hlc.Timestamp{WallTime: timeutil.Now().Add(time.Hour).UnixNano()} + tableDesc, err := descsCol.GetMutableTableByID( + ctx, txn, sc.descID, + tree.ObjectLookupFlags{ + CommonLookupFlags: tree.CommonLookupFlags{ + IncludeOffline: true, + IncludeDropped: true, + }, + }, + ) + if err != nil { + return err + } + + if fn := sc.testingKnobs.RunBeforeHashShardedIndexRangePreSplit; fn != nil { + if err := fn(tableDesc, sc.db, sc.execCfg.Codec); err != nil { + return err + } + } + + for _, m := range tableDesc.AllMutations() { + if m.MutationID() != sc.mutationID { + // Mutations are applied in a FIFO order. Only apply the first set of + // mutations if they have the mutation ID we're looking for. + break + } + + if idx := m.AsIndex(); m.Adding() && m.DeleteOnly() && idx != nil { + if idx.IsSharded() { + splitAtShards := calculateSplitAtShards(maxHashShardedIndexRangePreSplit.Get(&sc.settings.SV), idx.GetSharded().ShardBuckets) + for _, shard := range splitAtShards { + keyPrefix := sc.execCfg.Codec.IndexPrefix(uint32(tableDesc.GetID()), uint32(idx.GetID())) + splitKey := encoding.EncodeVarintAscending(keyPrefix, shard) + if err := sc.db.SplitAndScatter(ctx, splitKey, hour); err != nil { + return err + } + } + } + } + } + + if fn := sc.testingKnobs.RunAfterHashShardedIndexRangePreSplit; fn != nil { + if err := fn(tableDesc, sc.db, sc.execCfg.Codec); err != nil { + return err + } + } + + return nil + }); err != nil { + return err + } + + return nil +} + +// calculateSplitAtShards returns a slice of min(maxSplit, shardBucketCount) +// shard numbers. Shard numbers are sampled with a fix step within +// [0, shardBucketCount) range. +func calculateSplitAtShards(maxSplit int64, shardBucketCount int32) []int64 { + splitCount := int(math.Min(float64(maxSplit), float64(shardBucketCount))) + step := float64(shardBucketCount) / float64(splitCount) + splitAtShards := make([]int64, splitCount) + for i := 0; i < splitCount; i++ { + splitAtShards[i] = int64(math.Floor(float64(i) * step)) + } + return splitAtShards +} + // isCurrentMutationDiscarded returns if the current column mutation is made irrelevant // by a later operation. The nextMutationIdx provides the index at which to check for // later mutation. diff --git a/pkg/sql/schema_changer_helpers_test.go b/pkg/sql/schema_changer_helpers_test.go index 43af4d03de10..b3ce0f0fc79f 100644 --- a/pkg/sql/schema_changer_helpers_test.go +++ b/pkg/sql/schema_changer_helpers_test.go @@ -12,11 +12,14 @@ package sql import ( "context" + "testing" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/backfill" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/stretchr/testify/require" ) // TestingDistIndexBackfill exposes the index backfill functionality for @@ -35,3 +38,51 @@ func (sc *SchemaChanger) TestingDistIndexBackfill( func (sc *SchemaChanger) SetJob(job *jobs.Job) { sc.job = job } + +func TestCalculateSplitAtShards(t *testing.T) { + defer leaktest.AfterTest(t)() + + testCases := []struct { + testName string + maxSplit int64 + bucketCount int32 + expected []int64 + }{ + { + testName: "buckets_less_than_max_split", + maxSplit: 8, + bucketCount: 0, + expected: []int64{}, + }, + { + testName: "buckets_less_than_max_split", + maxSplit: 8, + bucketCount: 5, + expected: []int64{0, 1, 2, 3, 4}, + }, + { + testName: "buckets_equal_to_max_split", + maxSplit: 8, + bucketCount: 8, + expected: []int64{0, 1, 2, 3, 4, 5, 6, 7}, + }, + { + testName: "buckets_greater_than_max_split_1", + maxSplit: 8, + bucketCount: 30, + expected: []int64{0, 3, 7, 11, 15, 18, 22, 26}, + }, + { + testName: "buckets_greater_than_max_split_2", + maxSplit: 8, + bucketCount: 1000, + expected: []int64{0, 125, 250, 375, 500, 625, 750, 875}, + }, + } + for _, tc := range testCases { + t.Run(tc.testName, func(t *testing.T) { + shards := calculateSplitAtShards(tc.maxSplit, tc.bucketCount) + require.Equal(t, tc.expected, shards) + }) + } +} diff --git a/pkg/sql/schema_changer_test.go b/pkg/sql/schema_changer_test.go index f33ef84ea5d9..e3545b94558f 100644 --- a/pkg/sql/schema_changer_test.go +++ b/pkg/sql/schema_changer_test.go @@ -40,6 +40,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/gcjob" "github.com/cockroachdb/cockroach/pkg/sql/sqltestutils" @@ -7491,3 +7492,75 @@ func TestJobsWithoutMutationsAreCancelable(t *testing.T) { ).Scan(&id) require.Equal(t, scJobID, id) } + +func TestHashShardedIndexRangePreSplit(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + getShardedIndexRanges := func(tableDesc *tabledesc.Mutable, kvDB *kv.DB, codec keys.SQLCodec) ([]kv.KeyValue, error) { + indexSpan := tableDesc.IndexSpan(codec, descpb.IndexID(2)) + ranges, err := kvDB.Scan( + ctx, + keys.RangeMetaKey(keys.MustAddr(indexSpan.Key)), + keys.RangeMetaKey(keys.MustAddr(indexSpan.EndKey)), + + 100, + ) + if err != nil { + return nil, err + } + return ranges, nil + } + + var runBeforePreSplitting func(tbl *tabledesc.Mutable, kvDB *kv.DB, codec keys.SQLCodec) error + var runAfterPreSplitting func(tbl *tabledesc.Mutable, kvDB *kv.DB, codec keys.SQLCodec) error + params, _ := tests.CreateTestServerParams() + params.Knobs = base.TestingKnobs{ + SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ + RunBeforeHashShardedIndexRangePreSplit: func(tbl *tabledesc.Mutable, kvDB *kv.DB, codec keys.SQLCodec) error { + return runBeforePreSplitting(tbl, kvDB, codec) + }, + RunAfterHashShardedIndexRangePreSplit: func(tbl *tabledesc.Mutable, kvDB *kv.DB, codec keys.SQLCodec) error { + return runAfterPreSplitting(tbl, kvDB, codec) + }, + }, + } + + s, sqlDB, _ := serverutils.StartServer(t, params) + defer s.Stopper().Stop(ctx) + tdb := sqlutils.MakeSQLRunner(sqlDB) + + tdb.Exec(t, ` +CREATE DATABASE t; +CREATE TABLE t.test_split(a INT PRIMARY KEY, b INT NOT NULL); +`, + ) + + runBeforePreSplitting = func(tableDesc *tabledesc.Mutable, kvDB *kv.DB, codec keys.SQLCodec) error { + ranges, err := getShardedIndexRanges(tableDesc, kvDB, codec) + if err != nil { + return err + } + if len(ranges) != 0 { + return errors.Newf("expected 0 ranges but found %d", len(ranges)) + } + return nil + } + + runAfterPreSplitting = func(tableDesc *tabledesc.Mutable, kvDB *kv.DB, codec keys.SQLCodec) error { + ranges, err := getShardedIndexRanges(tableDesc, kvDB, codec) + if err != nil { + return err + } + if len(ranges) != 8 { + return errors.Newf("expected 8 ranges but found %d", len(ranges)) + } + return nil + } + + tdb.Exec(t, ` +SET experimental_enable_hash_sharded_indexes = on; +CREATE INDEX idx_test_split_b ON t.test_split (b) USING HASH WITH BUCKET_COUNT = 8; +`) +}