Skip to content

Commit

Permalink
sql: enables distributed distsql queries for multi-tenant
Browse files Browse the repository at this point in the history
This change allows SQL queries to be distributed in multi-tenant
environments. The distribution algorithm randomly assigns spans to SQL
instances, but if only one instance is used the spans are assigned
instead to the gateway instance. Distribution does not take locality
into account, which will be implemented in a future PR.

This change also supports running execbuilder tests with the
3node-tenant configuration, which is under CCL. These tests can be run
in the following manner:

```
make test PKG=./pkg/ccl/logictestccl TESTS=TestTenantExecBuild
./dev test pkg/ccl/logictestccl -f=TestTenantExecBuild
```

Fixes: cockroachdb#80680

Release note: None
  • Loading branch information
rharding6373 committed May 4, 2022
1 parent 70d1c10 commit 7a1d2a2
Show file tree
Hide file tree
Showing 18 changed files with 198 additions and 51 deletions.
4 changes: 3 additions & 1 deletion pkg/ccl/logictestccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ go_test(
"main_test.go",
],
data = glob(["testdata/**"]) + [
"//pkg/sql/logictest:testdata",
"//c-deps:libgeos",
"//pkg/sql/logictest:testdata",
"//pkg/sql/opt/exec/execbuilder:testdata",
],
embed = [":logictestccl"],
deps = [
Expand All @@ -29,6 +30,7 @@ go_test(
"//pkg/sql/logictest",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/randutil",
Expand Down
22 changes: 22 additions & 0 deletions pkg/ccl/logictestccl/logic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
_ "github.com/cockroachdb/cockroach/pkg/ccl"
"github.com/cockroachdb/cockroach/pkg/sql/logictest"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
)

Expand Down Expand Up @@ -54,3 +55,24 @@ func TestTenantSQLLiteLogic(t *testing.T) {
defer leaktest.AfterTest(t)()
logictest.RunSQLLiteLogicTest(t, "3node-tenant")
}

// TestTenantExecBuild runs execbuilder test files under the 3node-tenant
// configuration, which constructs a secondary tenant and runs the test within
// that secondary tenant's sandbox.
func TestTenantExecBuild(t *testing.T) {
defer leaktest.AfterTest(t)()
skip.UnderDeadlock(t, "times out and/or hangs")

testdataDir := "../../sql/opt/exec/execbuilder/testdata/"
if bazel.BuiltWithBazel() {
runfile, err := bazel.Runfile("pkg/sql/opt/exec/execbuilder/testdata/")
if err != nil {
t.Fatal(err)
}
testdataDir = runfile
}

logictest.RunLogicTestWithDefaultConfig(
t, logictest.TestServerArgs{DisableWorkmemRandomization: true}, "3node-tenant", true, /* runCCLConfigs */
filepath.Join(testdataDir, "[^.]*"))
}
10 changes: 10 additions & 0 deletions pkg/keys/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -835,10 +835,20 @@ func GetRowPrefixLength(key roachpb.Key) (int, error) {
}
sqlN := len(sqlKey)

// Check that the prefix contains a valid TableID.
if encoding.PeekType(sqlKey) != encoding.Int {
// Not a table key, so the row prefix is the entire key.
return n, nil
}
tableIDLen, err := encoding.GetUvarintLen(sqlKey)
if err != nil {
return 0, err
}
// Check that the prefix contains a valid IndexID after the TableID.
if tableIDLen >= sqlN || encoding.PeekType(sqlKey[tableIDLen:]) != encoding.Int {
return 0, errors.Errorf("%s: not a valid table key", key)
}

// The column family ID length is encoded as a varint and we take advantage
// of the fact that the column family ID itself will be encoded in 0-9 bytes
// and thus the length of the column family ID data will fit in a single
Expand Down
8 changes: 4 additions & 4 deletions pkg/keys/keys_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -673,21 +673,21 @@ func TestEnsureSafeSplitKey(t *testing.T) {
err string
}{
// Column ID suffix size is too large.
{es(1), "malformed table key"},
{es(1, 2), "malformed table key"},
// The table ID is invalid.
{es(200)[:1], "insufficient bytes to decode uvarint value"},
{es(200)[:1], "not a valid table key"},
// The index ID is invalid.
{es(1), "not a valid table key"},
{es(1, 200)[:2], "insufficient bytes to decode uvarint value"},
// The column ID suffix is invalid.
{es(1, 2, 200)[:3], "insufficient bytes to decode uvarint value"},
// Exercises a former overflow bug. We decode a uint(18446744073709551610) which, if cast
// to int carelessly, results in -6.
{encoding.EncodeVarintAscending(tenSysCodec.TablePrefix(999), 322434), "malformed table key"},
// Same test cases, but for tenant 5.
{e5(1), "malformed table key"},
{e5(1, 2), "malformed table key"},
{e5(200)[:3], "insufficient bytes to decode uvarint value"},
{e5(200)[:3], "not a valid table key"},
{e5(1), "not a valid table key"},
{e5(1, 200)[:4], "insufficient bytes to decode uvarint value"},
{e5(1, 2, 200)[:5], "insufficient bytes to decode uvarint value"},
{encoding.EncodeVarintAscending(ten5Codec.TablePrefix(999), 322434), "malformed table key"},
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/apply_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func runPlanInsidePlan(params runParams, plan *planComponents, resultWriter rowR
)
distributeType := DistributionType(DistributionTypeNone)
if distributePlan.WillDistribute() {
distributeType = DistributionTypeSystemTenantOnly
distributeType = DistributionTypeAlways
}
planCtx := params.p.extendedEvalCtx.ExecCfg.DistSQLPlanner.NewPlanningCtx(
params.ctx, evalCtx, &plannerCopy, params.p.txn, distributeType)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -1138,7 +1138,7 @@ func (ex *connExecutor) dispatchToExecutionEngine(
}
distribute := DistributionType(DistributionTypeNone)
if distributePlan.WillDistribute() {
distribute = DistributionTypeSystemTenantOnly
distribute = DistributionTypeAlways
}
ex.sessionTracing.TraceExecStart(ctx, "distributed")
stats, err := ex.execWithDistSQLEngine(
Expand Down
62 changes: 41 additions & 21 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,12 @@ func mustWrapValuesNode(planCtx *PlanningCtx, specifiedInQuery bool) bool {
func checkSupportForPlanNode(node planNode) (distRecommendation, error) {
switch n := node.(type) {
// Keep these cases alphabetized, please!
case *createStatsNode:
if n.runAsJob {
return cannotDistribute, planNodeNotSupportedErr
}
return shouldDistribute, nil

case *distinctNode:
return checkSupportForPlanNode(n.plan)

Expand Down Expand Up @@ -651,12 +657,6 @@ func checkSupportForPlanNode(node planNode) (distRecommendation, error) {
}
return shouldDistribute, nil

case *createStatsNode:
if n.runAsJob {
return cannotDistribute, planNodeNotSupportedErr
}
return shouldDistribute, nil

default:
return cannotDistribute, planNodeNotSupportedErr
}
Expand Down Expand Up @@ -1164,7 +1164,7 @@ func (dsp *DistSQLPlanner) partitionSpansTenant(
return partitions, nil
}
if dsp.sqlInstanceProvider == nil {
return nil, errors.New("sql instance provider not available in multi-tenant environment")
return nil, errors.AssertionFailedf("sql instance provider not available in multi-tenant environment")
}
// GetAllInstances only returns healthy instances.
instances, err := dsp.sqlInstanceProvider.GetAllInstances(ctx)
Expand All @@ -1182,11 +1182,24 @@ func (dsp *DistSQLPlanner) partitionSpansTenant(

// nodeMap maps a SQLInstanceID to an index inside the partitions array.
nodeMap := make(map[base.SQLInstanceID]int)
var lastKey roachpb.Key
var lastIdx int
for i := range spans {
span := spans[i]
if log.V(1) {
log.Infof(ctx, "partitioning span %s", span)
}
// Rows with column families may have been split into different spans. These
// spans should be assigned the same pod so that the pod can stitch together
// the rows correctly. Split rows are in adjacent spans.
if safeKey, err := keys.EnsureSafeSplitKey(span.Key); err == nil {
if safeKey.Equal(lastKey) {
partition := &partitions[lastIdx]
partition.Spans = append(partition.Spans, span)
continue
}
lastKey = safeKey
}
sqlInstanceID := instances[i%len(instances)].InstanceID
partitionIdx, inNodeMap := nodeMap[sqlInstanceID]
if !inNodeMap {
Expand All @@ -1196,6 +1209,13 @@ func (dsp *DistSQLPlanner) partitionSpansTenant(
}
partition := &partitions[partitionIdx]
partition.Spans = append(partition.Spans, span)
lastIdx = partitionIdx
}
// If spans were only assigned to one SQL instance, then assign them all to
// the gateway instance. The primary reason is to avoid an extra hop.
// TODO(harding): Don't do this if using an instance in another locality.
if len(partitions) == 1 && partitions[0].SQLInstanceID != dsp.gatewaySQLInstanceID {
partitions[0].SQLInstanceID = dsp.gatewaySQLInstanceID
}
return partitions, nil
}
Expand Down Expand Up @@ -2901,6 +2921,20 @@ func (dsp *DistSQLPlanner) createPhysPlanForPlanNode(

switch n := node.(type) {
// Keep these cases alphabetized, please!
case *createStatsNode:
if n.runAsJob {
plan, err = dsp.wrapPlan(ctx, planCtx, n, false /* allowPartialDistribution */)
} else {
// Create a job record but don't actually start the job.
var record *jobs.Record
record, err = n.makeJobRecord(ctx)
if err != nil {
return nil, err
}
plan, err = dsp.createPlanForCreateStats(ctx, planCtx, 0, /* jobID */
record.Details.(jobspb.CreateStatsDetails))
}

case *distinctNode:
plan, err = dsp.createPlanForDistinct(ctx, planCtx, n)

Expand Down Expand Up @@ -3021,20 +3055,6 @@ func (dsp *DistSQLPlanner) createPhysPlanForPlanNode(
case *zigzagJoinNode:
plan, err = dsp.createPlanForZigzagJoin(planCtx, n)

case *createStatsNode:
if n.runAsJob {
plan, err = dsp.wrapPlan(ctx, planCtx, n, false /* allowPartialDistribution */)
} else {
// Create a job record but don't actually start the job.
var record *jobs.Record
record, err = n.makeJobRecord(ctx)
if err != nil {
return nil, err
}
plan, err = dsp.createPlanForCreateStats(ctx, planCtx, 0, /* jobID */
record.Details.(jobspb.CreateStatsDetails))
}

default:
// Can't handle a node? We wrap it and continue on our way.
plan, err = dsp.wrapPlan(ctx, planCtx, n, false /* allowPartialDistribution */)
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -1255,7 +1255,7 @@ func (dsp *DistSQLPlanner) planAndRunSubquery(
).WillDistribute()
distribute := DistributionType(DistributionTypeNone)
if distributeSubquery {
distribute = DistributionTypeSystemTenantOnly
distribute = DistributionTypeAlways
}
subqueryPlanCtx := dsp.NewPlanningCtx(ctx, evalCtx, planner, planner.txn,
distribute)
Expand Down Expand Up @@ -1602,7 +1602,7 @@ func (dsp *DistSQLPlanner) planAndRunPostquery(
).WillDistribute()
distribute := DistributionType(DistributionTypeNone)
if distributePostquery {
distribute = DistributionTypeSystemTenantOnly
distribute = DistributionTypeAlways
}
postqueryPlanCtx := dsp.NewPlanningCtx(ctx, evalCtx, planner, planner.txn,
distribute)
Expand Down
3 changes: 0 additions & 3 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1611,9 +1611,6 @@ func getPlanDistribution(
return physicalplan.LocalPlan
}

if _, singleTenant := nodeID.OptionalNodeID(); !singleTenant {
return physicalplan.LocalPlan
}
if distSQLMode == sessiondatapb.DistSQLOff {
return physicalplan.LocalPlan
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/explain_vec.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func newPlanningCtxForExplainPurposes(
) *PlanningCtx {
distribute := DistributionType(DistributionTypeNone)
if distribution.WillDistribute() {
distribute = DistributionTypeSystemTenantOnly
distribute = DistributionTypeAlways
}
planCtx := distSQLPlanner.NewPlanningCtx(params.ctx, params.extendedEvalCtx,
params.p, params.p.txn, distribute)
Expand Down
11 changes: 6 additions & 5 deletions pkg/sql/logictest/logic.go
Original file line number Diff line number Diff line change
Expand Up @@ -820,10 +820,11 @@ var logicTestConfigs = []testClusterConfig{
// logictest command.
// To run a logic test with this config as a directive, run:
// make test PKG=./pkg/ccl/logictestccl TESTS=TestTenantLogic//<test_name>
name: threeNodeTenantConfigName,
numNodes: 3,
useTenant: true,
isCCLConfig: true,
name: threeNodeTenantConfigName,
numNodes: 3,
useTenant: true,
isCCLConfig: true,
overrideDistSQLMode: "on",
},
// Regions and zones below are named deliberately, and contain "-"'s to be reflective
// of the naming convention in public clouds. "-"'s are handled differently in SQL
Expand Down Expand Up @@ -3403,7 +3404,7 @@ func (t *logicTest) verifyError(
} else {
newErr := errors.Errorf("%s: %s\nexpected error code %q, but found success",
pos, sql, expectErrCode)
return (err != nil), newErr
return err != nil, newErr
}
}
return true, nil
Expand Down
7 changes: 2 additions & 5 deletions pkg/sql/logictest/testdata/logic_test/distsql_automatic_stats
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
# LogicTest: !metamorphic !3node-tenant

# Note: this test is disabled on 3node-tenant because it sometimes causes one of
# the UPDATE statements below (where we update more than 20% of the table) to be
# flaky. See comments there for details.
# LogicTest: !metamorphic

# Disable automatic stats
statement ok
Expand Down Expand Up @@ -79,6 +75,7 @@ UPDATE data SET d = 12 WHERE d = 10
# For some reason, 3node-tenant occasionally splits the UPDATE into 4 pieces,
# with each one affecting at most 88 rows. Since 88 < 205, the refresh is not
# guaranteed, making this test flaky.
skipif config 3node-tenant
query TTIII colnames,rowsort,retry
SELECT DISTINCT ON (column_names) statistics_name, column_names, row_count, distinct_count, null_count
FROM [SHOW STATISTICS FOR TABLE data] ORDER BY column_names ASC, created DESC
Expand Down
22 changes: 22 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/distsql_tenant
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# LogicTest: 3node-tenant

statement ok
CREATE TABLE t (k INT PRIMARY KEY, v INT, w INT, x INT,
FAMILY fam_0 (k),
FAMILY fam_1 (x),
FAMILY fam_2 (v, w)
)

statement ok
INSERT INTO t VALUES (23, 1, 2, 3), (34, 3, 4, 8);

query IIII
SELECT * FROM t WHERE k < 10 OR (k > 20 AND k < 29) OR k > 40
----
23 1 2 3

query II
SELECT v, w FROM t WHERE k = 23
----
1 2

4 changes: 2 additions & 2 deletions pkg/sql/logictest/testdata/logic_test/role
Original file line number Diff line number Diff line change
Expand Up @@ -292,15 +292,15 @@ role_name member is_admin
testrole testuser true
testrole testuser2 true

query TTB colnames
query TTB colnames,rowsort
SHOW GRANTS ON ROLE FOR testuser, testuser2
----
role_name member is_admin
admin testuser false
testrole testuser true
testrole testuser2 true

query TTB colnames
query TTB colnames,rowsort
SHOW GRANTS ON ROLE admin, testrole FOR root, testuser2
----
role_name member is_admin
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/opt/exec/execbuilder/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,9 @@ go_test(
"//pkg/util/randutil",
],
)

filegroup(
name = "testdata",
srcs = glob(["testdata/**"]),
visibility = ["//pkg/ccl/logictestccl:__subpackages__"],
)
Loading

0 comments on commit 7a1d2a2

Please sign in to comment.