diff --git a/pkg/ccl/logictestccl/tests/3node-tenant-multiregion/BUILD.bazel b/pkg/ccl/logictestccl/tests/3node-tenant-multiregion/BUILD.bazel index c6c185b48666..bd6a4649f420 100644 --- a/pkg/ccl/logictestccl/tests/3node-tenant-multiregion/BUILD.bazel +++ b/pkg/ccl/logictestccl/tests/3node-tenant-multiregion/BUILD.bazel @@ -8,8 +8,9 @@ go_test( data = [ "//c-deps:libgeos", # keep "//pkg/sql/logictest:testdata", # keep + "//pkg/sql/opt/exec/execbuilder:testdata", # keep ], - shard_count = 2, + shard_count = 3, deps = [ "//pkg/build/bazel", "//pkg/ccl", @@ -17,6 +18,7 @@ go_test( "//pkg/security/securityassets", "//pkg/security/securitytest", "//pkg/server", + "//pkg/sql", "//pkg/sql/logictest", "//pkg/testutils/serverutils", "//pkg/testutils/skip", diff --git a/pkg/ccl/logictestccl/tests/3node-tenant-multiregion/generated_test.go b/pkg/ccl/logictestccl/tests/3node-tenant-multiregion/generated_test.go index 48a9b1a34918..c550d7651f6f 100644 --- a/pkg/ccl/logictestccl/tests/3node-tenant-multiregion/generated_test.go +++ b/pkg/ccl/logictestccl/tests/3node-tenant-multiregion/generated_test.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/security/securityassets" "github.com/cockroachdb/cockroach/pkg/security/securitytest" "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/logictest" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" @@ -32,6 +33,7 @@ import ( const configIdx = 10 var logicTestDir string +var execBuildLogicTestDir string func init() { if bazel.BuiltWithBazel() { @@ -43,6 +45,15 @@ func init() { } else { logicTestDir = "../../../../sql/logictest/testdata/logic_test" } + if bazel.BuiltWithBazel() { + var err error + execBuildLogicTestDir, err = bazel.Runfile("pkg/sql/opt/exec/execbuilder/testdata") + if err != nil { + panic(err) + } + } else { + execBuildLogicTestDir = "../../../../sql/opt/exec/execbuilder/testdata" + } } func TestMain(m *testing.M) { @@ -58,6 +69,14 @@ func runLogicTest(t *testing.T, file string) { skip.UnderDeadlock(t, "times out and/or hangs") logictest.RunLogicTest(t, logictest.TestServerArgs{}, configIdx, filepath.Join(logicTestDir, file)) } +func runExecBuildLogicTest(t *testing.T, file string) { + defer sql.TestingOverrideExplainEnvVersion("CockroachDB execbuilder test version")() + skip.UnderDeadlock(t, "times out and/or hangs") + serverArgs := logictest.TestServerArgs{ + DisableWorkmemRandomization: true, + } + logictest.RunLogicTest(t, serverArgs, configIdx, filepath.Join(execBuildLogicTestDir, file)) +} func TestTenantLogic_distsql_tenant( t *testing.T, @@ -72,3 +91,10 @@ func TestTenantLogic_distsql_tenant_locality( defer leaktest.AfterTest(t)() runLogicTest(t, "distsql_tenant_locality") } + +func TestTenantExecBuild_distsql_tenant_locality( + t *testing.T, +) { + defer leaktest.AfterTest(t)() + runExecBuildLogicTest(t, "distsql_tenant_locality") +} diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index fe2dfbcc783b..dc7eedc3c4d1 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -468,6 +468,7 @@ go_library( "//pkg/util/mon", "//pkg/util/protoutil", "//pkg/util/quotapool", + "//pkg/util/randutil", "//pkg/util/retry", "//pkg/util/ring", "//pkg/util/stop", diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index 2434d683fb5f..0730d557c0bd 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -54,6 +54,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/quotapool" + "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" @@ -134,6 +135,8 @@ type DistSQLPlanner struct { codec keys.SQLCodec clock *hlc.Clock + + rng *rand.Rand } // DistributionType is an enum defining when a plan should be distributed. @@ -179,6 +182,7 @@ func NewDistSQLPlanner( sqlInstanceProvider sqlinstance.Provider, clock *hlc.Clock, ) *DistSQLPlanner { + rng, _ := randutil.NewPseudoRand() dsp := &DistSQLPlanner{ planVersion: planVersion, st: st, @@ -199,6 +203,7 @@ func NewDistSQLPlanner( sqlInstanceProvider: sqlInstanceProvider, codec: codec, clock: clock, + rng: rng, } dsp.parallelLocalScansSem = quotapool.NewIntPool("parallel local scans concurrency", @@ -1052,176 +1057,349 @@ func (dsp *DistSQLPlanner) PartitionSpans( return dsp.partitionSpansTenant(ctx, planCtx, spans) } -// partitionSpansSystem finds node owners for ranges touching the given spans -// for a system tenant. -func (dsp *DistSQLPlanner) partitionSpansSystem( - ctx context.Context, planCtx *PlanningCtx, spans roachpb.Spans, -) (partitions []SpanPartition, _ error) { - // nodeMap maps a SQLInstanceID to an index inside the partitions array. - nodeMap := make(map[base.SQLInstanceID]int) +// partitionSpans takes a single span and splits it up according to the owning +// nodes (if the span touches multiple ranges). +// +// - partitions is the set of SpanPartitions so far. The updated set is +// returned. +// - nodeMap maps a SQLInstanceID to an index inside the partitions array. If +// the SQL instance chosen for the span is not in this map, then a new +// SpanPartition is appended to partitions and nodeMap is updated accordingly. +// - getSQLInstanceIDForKVNodeID is a resolver from the KV node ID to the SQL +// instance ID. +// +// The updated array of SpanPartitions is returned as well as the index into +// that array pointing to the SpanPartition that included the last part of the +// span. +func (dsp *DistSQLPlanner) partitionSpan( + ctx context.Context, + planCtx *PlanningCtx, + span roachpb.Span, + partitions []SpanPartition, + nodeMap map[base.SQLInstanceID]int, + getSQLInstanceIDForKVNodeID func(roachpb.NodeID) base.SQLInstanceID, +) (_ []SpanPartition, lastPartitionIdx int, _ error) { it := planCtx.spanIter - for _, span := range spans { - // rSpan is the span we are currently partitioning. - rSpan, err := keys.SpanAddr(span) + // rSpan is the span we are currently partitioning. + rSpan, err := keys.SpanAddr(span) + if err != nil { + return nil, 0, err + } + + var lastSQLInstanceID base.SQLInstanceID + // lastKey maintains the EndKey of the last piece of `span`. + lastKey := rSpan.Key + if log.V(1) { + log.Infof(ctx, "partitioning span %s", span) + } + // We break up rSpan into its individual ranges (which may or may not be on + // separate nodes). We then create "partitioned spans" using the end keys of + // these individual ranges. + for it.Seek(ctx, span, kvcoord.Ascending); ; it.Next(ctx) { + if !it.Valid() { + return nil, 0, it.Error() + } + replDesc, err := it.ReplicaInfo(ctx) if err != nil { - return nil, err + return nil, 0, err } - - var lastSQLInstanceID base.SQLInstanceID - // lastKey maintains the EndKey of the last piece of `span`. - lastKey := rSpan.Key + desc := it.Desc() if log.V(1) { - log.Infof(ctx, "partitioning span %s", span) - } - // We break up rSpan into its individual ranges (which may or - // may not be on separate nodes). We then create "partitioned - // spans" using the end keys of these individual ranges. - for it.Seek(ctx, span, kvcoord.Ascending); ; it.Next(ctx) { - if !it.Valid() { - return nil, it.Error() - } - replDesc, err := it.ReplicaInfo(ctx) - if err != nil { - return nil, err - } - desc := it.Desc() - if log.V(1) { - descCpy := desc // don't let desc escape - log.Infof(ctx, "lastKey: %s desc: %s", lastKey, &descCpy) - } - - if !desc.ContainsKey(lastKey) { - // This range must contain the last range's EndKey. - log.Fatalf( - ctx, "next range %v doesn't cover last end key %v. Partitions: %#v", - desc.RSpan(), lastKey, partitions, - ) - } + descCpy := desc // don't let desc escape + log.Infof(ctx, "lastKey: %s desc: %s", lastKey, &descCpy) + } - sqlInstanceID := base.SQLInstanceID(replDesc.NodeID) - partitionIdx, inNodeMap := nodeMap[sqlInstanceID] - if !inNodeMap { - // This is the first time we are seeing this sqlInstanceID for these - // spans. Check its health. - status := dsp.CheckInstanceHealthAndVersion(ctx, planCtx, sqlInstanceID) - // If the node is unhealthy or its DistSQL version is incompatible, use - // the gateway to process this span instead of the unhealthy host. - // An empty address indicates an unhealthy host. - if status != NodeOK { - log.Eventf(ctx, "not planning on node %d: %s", sqlInstanceID, status) - sqlInstanceID = dsp.gatewaySQLInstanceID - partitionIdx, inNodeMap = nodeMap[sqlInstanceID] - } + if !desc.ContainsKey(lastKey) { + // This range must contain the last range's EndKey. + log.Fatalf( + ctx, "next range %v doesn't cover last end key %v. Partitions: %#v", + desc.RSpan(), lastKey, partitions, + ) + } - if !inNodeMap { - partitionIdx = len(partitions) - partitions = append(partitions, SpanPartition{SQLInstanceID: sqlInstanceID}) - nodeMap[sqlInstanceID] = partitionIdx - } - } - partition := &partitions[partitionIdx] + sqlInstanceID := getSQLInstanceIDForKVNodeID(replDesc.NodeID) + partitionIdx, inNodeMap := nodeMap[sqlInstanceID] + if !inNodeMap { + partitionIdx = len(partitions) + partitions = append(partitions, SpanPartition{SQLInstanceID: sqlInstanceID}) + nodeMap[sqlInstanceID] = partitionIdx + } + partition := &partitions[partitionIdx] + lastPartitionIdx = partitionIdx + + if len(span.EndKey) == 0 { + // If we see a span to partition that has no end key, it means that + // we're going to do a point lookup on the start key of this span. + // Thus, we include the span into partition.Spans without trying to + // merge it with the last span. + partition.Spans = append(partition.Spans, span) + break + } - if len(span.EndKey) == 0 { - // If we see a span to partition that has no end key, it means - // that we're going to do a point lookup on the start key of - // this span. Thus, we include the span into partition.Spans - // without trying to merge it with the last span. - partition.Spans = append(partition.Spans, span) - break - } + // Limit the end key to the end of the span we are resolving. + endKey := desc.EndKey + if rSpan.EndKey.Less(endKey) { + endKey = rSpan.EndKey + } - // Limit the end key to the end of the span we are resolving. - endKey := desc.EndKey - if rSpan.EndKey.Less(endKey) { - endKey = rSpan.EndKey - } + if lastSQLInstanceID == sqlInstanceID { + // Two consecutive ranges on the same node, merge the spans. + partition.Spans[len(partition.Spans)-1].EndKey = endKey.AsRawKey() + } else { + partition.Spans = append(partition.Spans, roachpb.Span{ + Key: lastKey.AsRawKey(), + EndKey: endKey.AsRawKey(), + }) + } - if lastSQLInstanceID == sqlInstanceID { - // Two consecutive ranges on the same node, merge the spans. - partition.Spans[len(partition.Spans)-1].EndKey = endKey.AsRawKey() - } else { - partition.Spans = append(partition.Spans, roachpb.Span{ - Key: lastKey.AsRawKey(), - EndKey: endKey.AsRawKey(), - }) - } + if !endKey.Less(rSpan.EndKey) { + // Done. + break + } - if !endKey.Less(rSpan.EndKey) { - // Done. - break - } + lastKey = endKey + lastSQLInstanceID = sqlInstanceID + } + return partitions, lastPartitionIdx, nil +} - lastKey = endKey - lastSQLInstanceID = sqlInstanceID +// partitionSpansSystem finds node owners for ranges touching the given spans +// for a system tenant. +func (dsp *DistSQLPlanner) partitionSpansSystem( + ctx context.Context, planCtx *PlanningCtx, spans roachpb.Spans, +) (partitions []SpanPartition, _ error) { + nodeMap := make(map[base.SQLInstanceID]int) + resolver := func(nodeID roachpb.NodeID) base.SQLInstanceID { + sqlInstanceID := base.SQLInstanceID(nodeID) + _, inNodeMap := nodeMap[sqlInstanceID] + // If this is the first time we are seeing this sqlInstanceID for these + // spans, then we check its health. + checkHealth := !inNodeMap + return dsp.getSQLInstanceIDForKVNodeIDSystem(ctx, planCtx, nodeID, checkHealth) + } + for _, span := range spans { + var err error + partitions, _, err = dsp.partitionSpan( + ctx, planCtx, span, partitions, nodeMap, resolver, + ) + if err != nil { + return nil, err } } return partitions, nil } -// partitionSpansTenant assigns SQL instances in a tenant to spans. Currently -// assignments are made to all available instances in a round-robin fashion. +// partitionSpansTenant assigns SQL instances in a tenant to spans. It performs +// region-aware physical planning among all available SQL instances if the +// region information is available on at least some of the instances, and it +// falls back to naive round-robin assignment if not. func (dsp *DistSQLPlanner) partitionSpansTenant( ctx context.Context, planCtx *PlanningCtx, spans roachpb.Spans, ) (partitions []SpanPartition, _ error) { - if dsp.sqlInstanceProvider == nil { - return nil, errors.AssertionFailedf("sql instance provider not available in multi-tenant environment") - } - // GetAllInstances only returns healthy instances. - instances, err := dsp.sqlInstanceProvider.GetAllInstances(ctx) + resolver, instances, hasLocalitySet, err := dsp.makeSQLInstanceIDForKVNodeIDTenantResolver(ctx) if err != nil { return nil, err } - if len(instances) == 0 { - return nil, errors.New("no healthy sql instances available for planning") - } - // Randomize the order in which we assign partitions, so that work is - // allocated fairly across queries. - rand.Shuffle(len(instances), func(i, j int) { - instances[i], instances[j] = instances[j], instances[i] - }) - - // nodeMap maps a SQLInstanceID to an index inside the partitions array. nodeMap := make(map[base.SQLInstanceID]int) var lastKey roachpb.Key - var lastIdx int - for i, span := range spans { - if log.V(1) { - log.Infof(ctx, "partitioning span %s", span) - } - // Rows with column families may have been split into different spans. These - // spans should be assigned the same pod so that the pod can stitch together - // the rows correctly. Split rows are in adjacent spans. + var lastPartitionIdx int + for _, span := range spans { + // Rows with column families may have been split into different spans. + // These spans should be assigned the same pod so that the pod can + // stitch together the rows correctly. Split rows are in adjacent spans. if safeKey, err := keys.EnsureSafeSplitKey(span.Key); err == nil { if safeKey.Equal(lastKey) { - partition := &partitions[lastIdx] + if log.V(1) { + log.Infof(ctx, "partitioning span %s", span) + } + partition := &partitions[lastPartitionIdx] partition.Spans = append(partition.Spans, span) continue } lastKey = safeKey } - sqlInstanceID := instances[i%len(instances)].InstanceID - partitionIdx, inNodeMap := nodeMap[sqlInstanceID] - if !inNodeMap { - partitionIdx = len(partitions) - partitions = append(partitions, SpanPartition{SQLInstanceID: sqlInstanceID}) - nodeMap[sqlInstanceID] = partitionIdx + partitions, lastPartitionIdx, err = dsp.partitionSpan( + ctx, planCtx, span, partitions, nodeMap, resolver, + ) + if err != nil { + return nil, err } - partition := &partitions[partitionIdx] - partition.Spans = append(partition.Spans, span) - lastIdx = partitionIdx } - // If spans were only assigned to one SQL instance, then assign them all to - // the gateway instance. The primary reason is to avoid an extra hop. - // TODO(harding): Don't do this if using an instance in another locality. - if len(partitions) == 1 && partitions[0].SQLInstanceID != dsp.gatewaySQLInstanceID { - partitions[0].SQLInstanceID = dsp.gatewaySQLInstanceID + if err = dsp.maybeReassignToGatewaySQLInstance(partitions, instances, hasLocalitySet); err != nil { + return nil, err } return partitions, nil } -// getInstanceIDForScan retrieves the SQL Instance ID where the single table reader -// should reside for a limited scan. Ideally this is the lease holder for the -// first range in the specified spans. But if that node is unhealthy or +// getSQLInstanceIDForKVNodeIDSystem returns the SQL instance ID that should +// handle the range with the given node ID when planning is done on behalf of +// the system tenant. It ensures that the chosen SQL instance is healthy and of +// the compatible DistSQL version. +func (dsp *DistSQLPlanner) getSQLInstanceIDForKVNodeIDSystem( + ctx context.Context, planCtx *PlanningCtx, nodeID roachpb.NodeID, checkHealth bool, +) base.SQLInstanceID { + sqlInstanceID := base.SQLInstanceID(nodeID) + if checkHealth { + status := dsp.CheckInstanceHealthAndVersion(ctx, planCtx, sqlInstanceID) + // If the node is unhealthy or its DistSQL version is incompatible, use + // the gateway to process this span instead of the unhealthy host. An + // empty address indicates an unhealthy host. + if status != NodeOK { + log.Eventf(ctx, "not planning on node %d: %s", sqlInstanceID, status) + sqlInstanceID = dsp.gatewaySQLInstanceID + } + } + return sqlInstanceID +} + +// makeSQLInstanceIDForKVNodeIDTenantResolver returns a function that can choose +// the SQL instance ID for a provided node ID on behalf of a tenant. It also +// returns a list of all healthy instances for the current tenant as well as a +// boolean indicating whether the locality information is available for at least +// some of those instances. +func (dsp *DistSQLPlanner) makeSQLInstanceIDForKVNodeIDTenantResolver( + ctx context.Context, +) ( + resolver func(roachpb.NodeID) base.SQLInstanceID, + _ []sqlinstance.InstanceInfo, + hasLocalitySet bool, + _ error, +) { + if dsp.sqlInstanceProvider == nil { + return nil, nil, false, errors.AssertionFailedf("sql instance provider not available in multi-tenant environment") + } + // GetAllInstances only returns healthy instances. + // TODO(yuzefovich): confirm that all instances are of compatible version. + instances, err := dsp.sqlInstanceProvider.GetAllInstances(ctx) + if err != nil { + return nil, nil, false, err + } + if len(instances) == 0 { + return nil, nil, false, errors.New("no healthy sql instances available for planning") + } + + // Populate a map from the region string to all healthy SQL instances in + // that region. + regionToSQLInstanceIDs := make(map[string][]base.SQLInstanceID) + for _, instance := range instances { + region, ok := instance.Locality.Find("region") + if !ok { + // If we can't determine the region of this instance, don't use it + // for planning. + log.Eventf(ctx, "could not find region for SQL instance %s", instance) + continue + } + instancesInRegion := regionToSQLInstanceIDs[region] + instancesInRegion = append(instancesInRegion, instance.InstanceID) + regionToSQLInstanceIDs[region] = instancesInRegion + } + + if len(regionToSQLInstanceIDs) > 0 { + // If we were able to determine the region information at least for some + // instances, use the region-aware resolver. + hasLocalitySet = true + resolver = func(nodeID roachpb.NodeID) base.SQLInstanceID { + nodeDesc, err := dsp.nodeDescs.GetNodeDescriptor(nodeID) + if err != nil { + log.Eventf(ctx, "unable to get node descriptor for KV node %s", nodeID) + return dsp.gatewaySQLInstanceID + } + region, ok := nodeDesc.Locality.Find("region") + if !ok { + log.Eventf(ctx, "could not find region for KV node %s", nodeDesc) + return dsp.gatewaySQLInstanceID + } + instancesInRegion, ok := regionToSQLInstanceIDs[region] + if !ok { + // There are no instances in this region, so just use the + // gateway. + // TODO(yuzefovich): we should instead pick the closest instance + // in a different region. + return dsp.gatewaySQLInstanceID + } + // Pick a random instance in this region in order to spread the + // load. + // TODO(yuzefovich): consider using a different probability + // distribution for the "local" region (i.e. where the gateway is) + // where the gateway instances is favored. Also, if we had the + // information about latencies between different instances, we could + // favor those that are closed to the gateway. However, we need to + // be careful since non-query code paths (like CDC and BulkIO) do + // benefit from the even spread of the spans. + return instancesInRegion[dsp.rng.Intn(len(instancesInRegion))] + } + } else { + // If it just so happens that we couldn't determine the region for all + // SQL instances, we'll use the naive round-robin strategy that is + // completely locality-ignorant. + hasLocalitySet = false + // Randomize the order in which we choose instances so that work is + // allocated fairly across queries. + dsp.rng.Shuffle(len(instances), func(i, j int) { + instances[i], instances[j] = instances[j], instances[i] + }) + var i int + resolver = func(roachpb.NodeID) base.SQLInstanceID { + id := instances[i%len(instances)].InstanceID + i++ + return id + } + } + return resolver, instances, hasLocalitySet, nil +} + +// maybeReassignToGatewaySQLInstance checks whether the span partitioning is +// such that it contains only a single SQL instance that is different from the +// gateway, yet the gateway instance is in the same region as the assigned one. +// If that is the case, then all spans are reassigned to the gateway instance in +// order to avoid an extra hop needed when setting up the distributed plan. If +// the locality information isn't available for the instances, then we assume +// the assigned instance to be in the same region as the gateway. +func (dsp *DistSQLPlanner) maybeReassignToGatewaySQLInstance( + partitions []SpanPartition, instances []sqlinstance.InstanceInfo, hasLocalitySet bool, +) error { + if len(partitions) != 1 || partitions[0].SQLInstanceID == dsp.gatewaySQLInstanceID { + // Keep the existing partitioning if more than one instance is used or + // the gateway is already used as the single instance. + return nil + } + var gatewayRegion, assignedRegion string + if hasLocalitySet { + assignedInstance := partitions[0].SQLInstanceID + var ok bool + for _, instance := range instances { + if instance.InstanceID == dsp.gatewaySQLInstanceID { + gatewayRegion, ok = instance.Locality.Find("region") + if !ok { + // If we can't determine the region of the gateway, keep the + // spans assigned to the other instance. + break + } + } else if instance.InstanceID == assignedInstance { + assignedRegion, ok = instance.Locality.Find("region") + if !ok { + // We couldn't determine the region of the assigned instance + // but it shouldn't be possible since we wouldn't have used + // the instance in the planning (since we wouldn't include + // it into regionToSQLInstanceIDs map in + // makeSQLInstanceIDForKVNodeIDTenantResolver). + return errors.AssertionFailedf( + "unexpectedly planned all spans on a SQL instance %s "+ + "which we could not find region for", instance, + ) + } + } + } + } + if gatewayRegion == assignedRegion { + partitions[0].SQLInstanceID = dsp.gatewaySQLInstanceID + } + return nil +} + +// getInstanceIDForScan retrieves the SQL Instance ID where the single table +// reader should reside for a limited scan. Ideally this is the lease holder for +// the first range in the specified spans. But if that node is unhealthy or // incompatible, we use the gateway node instead. func (dsp *DistSQLPlanner) getInstanceIDForScan( ctx context.Context, planCtx *PlanningCtx, spans []roachpb.Span, reverse bool, @@ -1245,13 +1423,16 @@ func (dsp *DistSQLPlanner) getInstanceIDForScan( return 0, err } - sqlInstanceID := base.SQLInstanceID(replDesc.NodeID) - status := dsp.CheckInstanceHealthAndVersion(ctx, planCtx, sqlInstanceID) - if status != NodeOK { - log.Eventf(ctx, "not planning on node %d: %s", sqlInstanceID, status) - return dsp.gatewaySQLInstanceID, nil + if dsp.codec.ForSystemTenant() { + return dsp.getSQLInstanceIDForKVNodeIDSystem( + ctx, planCtx, replDesc.NodeID, true, /* checkHealth */ + ), nil + } + resolver, _, _, err := dsp.makeSQLInstanceIDForKVNodeIDTenantResolver(ctx) + if err != nil { + return 0, err } - return sqlInstanceID, nil + return resolver(replDesc.NodeID), nil } // convertOrdering maps the columns in props.ordering to the output columns of a diff --git a/pkg/sql/logictest/testdata/logic_test/distsql_tenant_locality b/pkg/sql/logictest/testdata/logic_test/distsql_tenant_locality index da2197da5eab..543411a18b3e 100644 --- a/pkg/sql/logictest/testdata/logic_test/distsql_tenant_locality +++ b/pkg/sql/logictest/testdata/logic_test/distsql_tenant_locality @@ -1,42 +1,57 @@ # LogicTest: 3node-tenant-multiregion +# tenant-cluster-setting-override-opt: allow-zone-configs-for-secondary-tenants allow-multi-region-abstractions-for-secondary-tenants # Create a table on the secondary tenant. statement ok -CREATE TABLE t (k INT PRIMARY KEY, v INT) +CREATE TABLE t (k INT PRIMARY KEY, v INT, FAMILY (k, v)); +INSERT INTO t SELECT i, i FROM generate_series(1, 6) AS g(i) # Split the ranges in the table. statement ok -ALTER TABLE t SPLIT AT VALUES (1), (2), (3) +ALTER TABLE t SPLIT AT SELECT generate_series(1, 6) -# Relocate ranges in the admin tenant based on node locality. +# Relocate ranges in the admin tenant so that +# - [1-2) and [2-3) are on node 2 +# - [3-4) and [4-5) are on node 3 +# - [5-6) and [6-7) are on node 1. user host-cluster-root statement ok -ALTER RANGE RELOCATE LEASE TO 1 FOR SELECT range_id FROM crdb_internal.ranges WHERE start_pretty LIKE '%Tenant%1' +ALTER RANGE RELOCATE LEASE TO 2 FOR SELECT range_id FROM crdb_internal.ranges WHERE start_pretty LIKE '%Tenant%1'; +ALTER RANGE RELOCATE LEASE TO 2 FOR SELECT range_id FROM crdb_internal.ranges WHERE start_pretty LIKE '%Tenant%2'; +ALTER RANGE RELOCATE LEASE TO 3 FOR SELECT range_id FROM crdb_internal.ranges WHERE start_pretty LIKE '%Tenant%3'; +ALTER RANGE RELOCATE LEASE TO 3 FOR SELECT range_id FROM crdb_internal.ranges WHERE start_pretty LIKE '%Tenant%4'; +ALTER RANGE RELOCATE LEASE TO 1 FOR SELECT range_id FROM crdb_internal.ranges WHERE start_pretty LIKE '%Tenant%5'; +ALTER RANGE RELOCATE LEASE TO 1 FOR SELECT range_id FROM crdb_internal.ranges WHERE start_pretty LIKE '%Tenant%6'; -statement ok -ALTER RANGE RELOCATE LEASE TO 2 FOR SELECT range_id FROM crdb_internal.ranges WHERE start_pretty LIKE '%Tenant%2' +user root +# Populate the range cache. statement ok -ALTER RANGE RELOCATE LEASE TO 3 FOR SELECT range_id FROM crdb_internal.ranges WHERE start_pretty LIKE '%Tenant%3' +SELECT * FROM t -# Check range lease holders in the admin tenant. -query TI -SELECT start_pretty, lease_holder FROM crdb_internal.ranges WHERE start_pretty LIKE '%Tenant%' +# Run a query that involves all 3 tenants. +query II rowsort +SELECT * FROM t WHERE k IN (1, 3, 5) ---- -/Tenant/10 1 -/Tenant/10/Table/106/1/1 1 -/Tenant/10/Table/106/1/2 2 -/Tenant/10/Table/106/1/3 3 +1 1 +3 3 +5 5 -# TODO(harding): Once locality-aware distribution is implemented, run queries in -# the secondary tenant. -user root +# Run a query with a scan only on the third tenant. +query II rowsort +SELECT * FROM t WHERE k >= 3 AND k < 5 +---- +3 3 +4 4 -# Check sql instance locality in the secondary tenant. -query IT -SELECT id, locality FROM system.sql_instances +# Run a query with a scan with LIMIT that is executed on the second tenant. +query II rowsort +SELECT * FROM t WHERE k >= 1 LIMIT 10 ---- -1 {"Tiers": "region=test"} -2 {"Tiers": "region=test1"} -3 {"Tiers": "region=test2"} +1 1 +2 2 +3 3 +4 4 +5 5 +6 6 diff --git a/pkg/sql/opt/exec/execbuilder/testdata/distsql_tenant_locality b/pkg/sql/opt/exec/execbuilder/testdata/distsql_tenant_locality new file mode 100644 index 000000000000..cb67b059a519 --- /dev/null +++ b/pkg/sql/opt/exec/execbuilder/testdata/distsql_tenant_locality @@ -0,0 +1,100 @@ +# LogicTest: 3node-tenant-multiregion +# tenant-cluster-setting-override-opt: allow-zone-configs-for-secondary-tenants allow-multi-region-abstractions-for-secondary-tenants + +# Create a table on the secondary tenant. +statement ok +CREATE TABLE t (k INT PRIMARY KEY, v INT, FAMILY (k, v)) + +# Split the ranges in the table. +statement ok +ALTER TABLE t SPLIT AT SELECT generate_series(1, 6) + +# Relocate ranges in the admin tenant so that +# - [1-2) and [2-3) are on node 2 +# - [3-4) and [4-5) are on node 3 +# - [5-6) and [6-7) are on node 1. +user host-cluster-root + +statement ok +ALTER RANGE RELOCATE LEASE TO 2 FOR SELECT range_id FROM crdb_internal.ranges WHERE start_pretty LIKE '%Tenant%1'; +ALTER RANGE RELOCATE LEASE TO 2 FOR SELECT range_id FROM crdb_internal.ranges WHERE start_pretty LIKE '%Tenant%2'; +ALTER RANGE RELOCATE LEASE TO 3 FOR SELECT range_id FROM crdb_internal.ranges WHERE start_pretty LIKE '%Tenant%3'; +ALTER RANGE RELOCATE LEASE TO 3 FOR SELECT range_id FROM crdb_internal.ranges WHERE start_pretty LIKE '%Tenant%4'; +ALTER RANGE RELOCATE LEASE TO 1 FOR SELECT range_id FROM crdb_internal.ranges WHERE start_pretty LIKE '%Tenant%5'; +ALTER RANGE RELOCATE LEASE TO 1 FOR SELECT range_id FROM crdb_internal.ranges WHERE start_pretty LIKE '%Tenant%6'; + +# Check range lease holders in the admin tenant. +query TI rowsort,retry +SELECT start_pretty, lease_holder FROM crdb_internal.ranges WHERE start_pretty LIKE '%Tenant%Table%/1/%' +---- +/Tenant/10/Table/106/1/1 2 +/Tenant/10/Table/106/1/2 2 +/Tenant/10/Table/106/1/3 3 +/Tenant/10/Table/106/1/4 3 +/Tenant/10/Table/106/1/5 1 +/Tenant/10/Table/106/1/6 1 + +user root + +# Populate the range cache. +statement ok +SELECT * FROM t + +# Check sql instance locality in the secondary tenant. +query IT +SELECT id, locality FROM system.sql_instances +---- +1 {"Tiers": "region=test"} +2 {"Tiers": "region=test1"} +3 {"Tiers": "region=test2"} + +# Ensure that we plan TableReaders in the regions according to the leaseholder +# of each range, namely we want +# - TableReader on SQL Instance 2 to scan Span /106/1/1/0 +# - TableReader on SQL Instance 3 to scan Span /106/1/3/0 +# - TableReader on SQL Instance 1 to scan Span /106/1/5/0. +query T +EXPLAIN (DISTSQL) SELECT * FROM t WHERE k IN (1, 3, 5) +---- +distribution: full +vectorized: true +· +• scan + missing stats + table: t@t_pkey + spans: [/1 - /1] [/3 - /3] [/5 - /5] +· +Diagram: https://cockroachdb.github.io/distsqlplan/decode.html#eJykkU9Lw0AQxe9-imVOVkaSTayHnJQaMVDbmhQUJEjMDqU0zcbdjSgl312yOdSWKrEe58-b9-PNBvRbAQGET7PxdTRhpzdRMk8exgOWhONwNGdn7Dae3jPDHu_COGQr1i5xZD6y4QAQSilokq1JQ_AMHBA8QPAhRaiUzElrqdrRxi5G4gMCF2FZVrVp2ylCLhVBsAGzNAVBAPPstaCYMkHKaW8JMtmysOfNlXmpVvQJCCNZ1OtSB2yF7B0QkiprK4e7lw53ho4LaYMga7M10iZbEAS8wR9gtgx1KZUgRWLHP20O4E7kuawcf2_xsLW3Y8375-AemQPvnYPXH4YfCeMf9ZQDMDHpSpaaemXutk8jsaDuw1rWKqeZkrm16cqp1dmGIG26Ke-KqLQjC_hdzH8VX-yI3X2x9x9n_0_itDn5CgAA__-lhT3K + +# Ensure that a single scan that touches multiple ranges is represented by a +# single span after physical planning. We expect to only have a single +# TableReader on node 3. +query T +EXPLAIN (DISTSQL) SELECT * FROM t WHERE k >= 3 AND k < 5 +---- +distribution: full +vectorized: true +· +• scan + missing stats + table: t@t_pkey + spans: [/3 - /4] +· +Diagram: https://cockroachdb.github.io/distsqlplan/decode.html#eJyMkNFK80AUhO__p1jmqv3Zkk1DvVgQLG3EQE1rUlDQIDE5lNI0G7MbUULeXZKItoLVu8yc803mbA39nEHCvVstpp7PBnMvXIc3iyEL3YU7W7P_7DJYXjPDbq_cwGWDHXuohHDonDlDNvXnn07CJkNw5ColP96ThryHDQ4HEUdRqoS0VmVr192Sl75CCo5tXlSmtSOORJUEWcNsTUaQ8NVIFVabkpKJt1m31nCoynxB2sQbghw3_CDYPh28jp8yCihOqbTEUTzMhXksdvQGjpnKqn2uJdtx9gKOsIhbZdnizLItZ_TxMcFPneyjTr8cG5AuVK7pT9eKJuKgdEP9g2pVlQmtSpV0v-nlsuM6IyVt-um4F17ej9qCh7B9Ehbf4Kj59x4AAP__gJu1Iw== + +# Ensure that a query with a LIMIT is planned on the instance that is the +# "leaseholder" of the first range touched by the scan. We expect to only have a +# single TableReader on node 2. +query T +EXPLAIN (DISTSQL) SELECT * FROM t WHERE k >= 1 LIMIT 10 +---- +distribution: full +vectorized: true +· +• scan + missing stats + table: t@t_pkey + spans: [/1 - ] + limit: 10 +· +Diagram: https://cockroachdb.github.io/distsqlplan/decode.html#eJyMUF9LvEAUff99iuE8_YpZnDHoYSAoNiPB_ZMKBSVhellkXcecMQrxu4f60G7Q1tucc-85c87tYF5LKHgP6-DKX7L_134UR3fBCYu8wJvH7JTdhKsFs-z-1gs9tmVPrRBndMEkC_yFHzMpwFHpnJbpjgzUIyQ4XCQcdaMzMkY3A92NS37-DiU4iqpu7UAnHJluCKqDLWxJUFjqma6dwSUnmxbluNZz6NZ-iYxNNwTl9nzPWB43jtOXkkJKc2occWAPe2mf6y19gGOuy3ZXGcW2nL2BI6rTATlSnDvSkbPx4YIjKHaFHfr_FE4ehPuldUim1pWhP9UWfcJB-YamyxrdNhmtG52N30xwNepGIidjp6k7Ab-aRkPAfbE8KhbfxEn_7zMAAP__RR61bw==