Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
69486:     sql: support for temporary table clean up for tenants r=fqazi a=fqazi

Fixes: #67401
Previously, the temporary table clean-up did not execute for
tenants. This was inadequate that temporary tables would last
longer than the life span of user sessions. To address this,
this patch adds support for cleaning up on a single tenant
pod. Specifically removing checks for meta1 lease when under
a tenant, and support for listing sessions.

Release justification: low risk and fixes a tenant-related bug
Release note (bug fix): Temporary tables were not properly cleaned up for
tenants.

69789: sql: drop database cascade can fail resolving schemas r=fqazi a=fqazi

Fixes: #69713

Previously, drop database cascade would drop the schema first,
and then drop any objects under those schemas after. This was
inadequate because as we look up any objects under the schemas,
we may need to resolve types, which will may lead to a look up
on the schema. If the schema is dropped then we will fail while
resolving any types. To address this, this patch drops the objects
under the schema first followed by the database.

Release justification: Low risk bug fix for drop database cascade
Release note (bug fix): Drop database cascade can fail while resolving
a schema in a certain scenarios with the following error:
 "ERROR: error resolving referenced table ID <ID>: descriptor is
  being dropped"

69975: sql: fix interaction between stmt bundles and tracing r=yuzefovich a=yuzefovich

Previously, we wouldn't generate the bundle if the verbose tracing was
already enabled on the cluster because we wouldn't call
`instrumentationHelper.Finish` where we actually generate the bundle.
This would result in empty responses for `EXPLAIN ANALYZE (DEBUG)` as
well as the requests for stmt diagnostics being stuck in "waiting"
state.

Fixes: #69398.

Release note (bug fix): Previously, if the tracing
(`sql.trace.txn.enable_threshold` cluster setting) was enabled on the
cluster, the statement diagnostics collection (`EXPLAIN ANALYZE
(DEBUG)`) wouldn't work. This is now fixed.

Release justification: low-risk fix to a long-standing bug.

Co-authored-by: Faizan Qazi <faizan@cockroachlabs.com>
Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
  • Loading branch information
3 people committed Sep 10, 2021
4 parents fd070d5 + 21b5854 + fd11f94 + 996db2a commit 6e91bed
Show file tree
Hide file tree
Showing 17 changed files with 351 additions and 52 deletions.
1 change: 1 addition & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ sql.stats.persisted_rows.max integer 1000000 maximum number of rows of statement
sql.stats.post_events.enabled boolean false if set, an event is logged for every CREATE STATISTICS job
sql.telemetry.query_sampling.enabled boolean false when set to true, executed queries will emit an event on the telemetry logging channel
sql.temp_object_cleaner.cleanup_interval duration 30m0s how often to clean up orphaned temporary objects
sql.temp_object_cleaner.wait_interval duration 30m0s how long after creation a temporary object will be cleaned up
sql.trace.log_statement_execute boolean false set to true to enable logging of executed statements
sql.trace.session_eventlog.enabled boolean false set to true to enable session tracing. Note that enabling this may have a non-trivial negative performance impact.
sql.trace.stmt.enable_threshold duration 0s duration beyond which all statements are traced (set to 0 to disable). This applies to individual statements within a transaction and is therefore finer-grained than sql.trace.txn.enable_threshold.
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@
<tr><td><code>sql.stats.post_events.enabled</code></td><td>boolean</td><td><code>false</code></td><td>if set, an event is logged for every CREATE STATISTICS job</td></tr>
<tr><td><code>sql.telemetry.query_sampling.enabled</code></td><td>boolean</td><td><code>false</code></td><td>when set to true, executed queries will emit an event on the telemetry logging channel</td></tr>
<tr><td><code>sql.temp_object_cleaner.cleanup_interval</code></td><td>duration</td><td><code>30m0s</code></td><td>how often to clean up orphaned temporary objects</td></tr>
<tr><td><code>sql.temp_object_cleaner.wait_interval</code></td><td>duration</td><td><code>30m0s</code></td><td>how long after creation a temporary object will be cleaned up</td></tr>
<tr><td><code>sql.trace.log_statement_execute</code></td><td>boolean</td><td><code>false</code></td><td>set to true to enable logging of executed statements</td></tr>
<tr><td><code>sql.trace.session_eventlog.enabled</code></td><td>boolean</td><td><code>false</code></td><td>set to true to enable session tracing. Note that enabling this may have a non-trivial negative performance impact.</td></tr>
<tr><td><code>sql.trace.stmt.enable_threshold</code></td><td>duration</td><td><code>0s</code></td><td>duration beyond which all statements are traced (set to 0 to disable). This applies to individual statements within a transaction and is therefore finer-grained than sql.trace.txn.enable_threshold.</td></tr>
Expand Down
5 changes: 5 additions & 0 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ func (d *datadrivenTestState) addServer(
}
settings := cluster.MakeTestingClusterSettings()
sql.TempObjectCleanupInterval.Override(context.Background(), &settings.SV, duration)
sql.TempObjectWaitInterval.Override(context.Background(), &settings.SV, time.Millisecond)
params.ServerArgs.Settings = settings
}

Expand Down Expand Up @@ -8039,11 +8040,14 @@ func TestFullClusterTemporaryBackupAndRestore(t *testing.T) {

numNodes := 4
// Start a new server that shares the data directory.
settings := cluster.MakeTestingClusterSettings()
sql.TempObjectWaitInterval.Override(context.Background(), &settings.SV, time.Microsecond*0)
dir, dirCleanupFn := testutils.TempDir(t)
defer dirCleanupFn()
params := base.TestClusterArgs{}
params.ServerArgs.ExternalIODir = dir
params.ServerArgs.UseDatabase = "defaultdb"
params.ServerArgs.Settings = settings
knobs := base.TestingKnobs{
SQLExecutor: &sql.ExecutorTestingKnobs{
DisableTempObjectsCleanupOnSessionExit: true,
Expand Down Expand Up @@ -8097,6 +8101,7 @@ func TestFullClusterTemporaryBackupAndRestore(t *testing.T) {
},
}
params.ServerArgs.Knobs = knobs
params.ServerArgs.Settings = settings
_, _, sqlDBRestore, cleanupRestore := backupRestoreTestSetupEmpty(t, singleNode, dir, InitManualReplication,
params)
defer cleanupRestore()
Expand Down
6 changes: 6 additions & 0 deletions pkg/ccl/testccl/sqlccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,17 @@ go_test(
srcs = [
"main_test.go",
"run_control_test.go",
"temp_table_clean_test.go",
],
deps = [
"//pkg/base",
"//pkg/ccl/kvccl/kvtenantccl",
"//pkg/security",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/sqlliveness/slinstance",
"//pkg/sql/sqltestutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
Expand All @@ -20,6 +24,8 @@ go_test(
"//pkg/util/log",
"//pkg/util/randutil",
"//pkg/util/retry",
"//pkg/util/stop",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
],
Expand Down
127 changes: 127 additions & 0 deletions pkg/ccl/testccl/sqlccl/temp_table_clean_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Copyright 2021 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package sqlccl_test

import (
"context"
"reflect"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

func TestTenantTempTableCleanup(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
t.Helper()
settings := cluster.MakeTestingClusterSettings()
sql.TempObjectCleanupInterval.Override(ctx, &settings.SV, time.Second)
sql.TempObjectWaitInterval.Override(ctx, &settings.SV, time.Second*0)
// Set up sessions to expire within 5 seconds of a
// nodes death.
slinstance.DefaultTTL.Override(ctx, &settings.SV, 5*time.Second)
slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, time.Second)

tc := serverutils.StartNewTestCluster(
t, 3 /* numNodes */, base.TestClusterArgs{ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Settings: settings,
}},
)
log.TestingClearServerIdentifiers()
tenantStoppers := []*stop.Stopper{stop.NewStopper(), stop.NewStopper()}
_, tenantPrimaryDB := serverutils.StartTenant(t, tc.Server(0),
base.TestTenantArgs{
TenantID: serverutils.TestTenantID(),
Settings: settings,
Stopper: tenantStoppers[0],
})
tenantSQL := sqlutils.MakeSQLRunner(tenantPrimaryDB)

// Sanity: Temporary table clean up works fine, we are going
// to start two nodes and close the kill one of
// them.
tenantSQL.Exec(t, "SET experimental_enable_temp_tables = 'on'")
tenantSQL.Exec(t, "set cluster setting sql.temp_object_cleaner.cleanup_interval='1 seconds'")
tenantSQL.Exec(t, "CREATE TEMP TABLE temp_table (x INT PRIMARY KEY, y INT);")
// Prevent a logging assertion that the server ID is initialized multiple times.
log.TestingClearServerIdentifiers()
_, tenantSecondDB := serverutils.StartTenant(t, tc.Server(1),
base.TestTenantArgs{
Existing: true,
TenantID: serverutils.TestTenantID(),
Settings: settings,
Stopper: tenantStoppers[1],
})
log.TestingClearServerIdentifiers()
tenantSecondSQL := sqlutils.MakeSQLRunner(tenantSecondDB)
tenantSecondSQL.CheckQueryResults(t, "SELECT table_name FROM [SHOW TABLES]",
[][]string{
{"temp_table"},
})
tenantSecondSQL.Exec(t, "SET experimental_enable_temp_tables = 'on'")
tenantSecondSQL.Exec(t, "CREATE TEMP TABLE temp_table2 (x INT PRIMARY KEY, y INT);")
tenantSecondSQL.CheckQueryResults(t, "SELECT table_name FROM [SHOW TABLES]",
[][]string{
{"temp_table"},
{"temp_table2"},
})
// Stop the second node, we should one be left with a single temp table.
tenantStoppers[1].Stop(ctx)
// Session should expire in 5 seconds, but
// we will wait for up to 15 seconds.
{
tEnd := timeutil.Now().Add(time.Second * 15)
found := false
lastResult := [][]string{}
expectedResult := [][]string{
{"temp_table"},
}
for tEnd.After(timeutil.Now()) {
lastResult = tenantSQL.QueryStr(t, "SELECT table_name FROM [SHOW TABLES]")
if reflect.DeepEqual(lastResult, expectedResult) {
found = true
break
}
}
if !found {
log.Fatalf(ctx, "temporary table was not correctly dropped, expected %v and got %v", expectedResult, lastResult)
}
}

// Close the primary DB, there should be no temporary
// tables left.
tenantPrimaryDB.Close()
// Prevent a logging assertion that the server ID is initialized multiple times.
log.TestingClearServerIdentifiers()
// Once we restart the tenant, no sessions should exist
// so all temporary tables should be cleaned up.
_, tenantPrimaryDB = serverutils.StartTenant(t, tc.Server(0),
base.TestTenantArgs{
Existing: true,
TenantID: serverutils.TestTenantID(),
Settings: settings,
Stopper: tenantStoppers[0]})
tenantSQL = sqlutils.MakeSQLRunner(tenantPrimaryDB)
tenantSQL.CheckQueryResultsRetry(t, "SELECT table_name FROM [SHOW TABLES]",
[][]string{})
tenantStoppers[0].Stop(ctx)
tc.Stopper().Stop(ctx)
}
3 changes: 3 additions & 0 deletions pkg/rpc/auth_tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ func (a tenantAuthorizer) authorize(
case "/cockroach.server.serverpb.Status/ListSessions":
return a.authTenant(tenID)

case "/cockroach.server.serverpb.Status/ListLocalSessions":
return a.authTenant(tenID)

case "/cockroach.roachpb.Internal/GetSpanConfigs":
return a.authGetSpanConfigs(tenID, req.(*roachpb.GetSpanConfigsRequest))

Expand Down
21 changes: 13 additions & 8 deletions pkg/server/tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,23 +137,28 @@ func StartTenant(
pgLAddr := pgL.Addr().String()
httpLAddr := httpL.Addr().String()
args.advertiseAddr = baseCfg.AdvertiseAddr
s, err := newSQLServer(ctx, args)
if err != nil {
return nil, "", "", err
}

// The tenantStatusServer needs access to the sqlServer,
// but we also need the same object to set up the sqlServer.
// So construct the tenant status server with a nil sqlServer,
// and then assign it once an SQL server gets created. We are
// going to assume that the tenant status server won't require
// the SQL server object.
tenantStatusServer := newTenantStatusServer(
baseCfg.AmbientCtx, &adminPrivilegeChecker{ie: args.circularInternalExecutor},
args.sessionRegistry, args.contentionRegistry, args.flowScheduler, baseCfg.Settings, s,
args.sessionRegistry, args.contentionRegistry, args.flowScheduler, baseCfg.Settings, nil,
args.rpcContext, args.stopper,
)
args.sqlStatusServer = tenantStatusServer
s, err := newSQLServer(ctx, args)
tenantStatusServer.sqlServer = s

s.execCfg.SQLStatusServer = tenantStatusServer
if err != nil {
return nil, "", "", err
}

// TODO(asubiotto): remove this. Right now it is needed to initialize the
// SpanResolver.
s.execCfg.DistSQLPlanner.SetNodeInfo(roachpb.NodeDescriptor{NodeID: 0})

workersCtx := tenantStatusServer.AnnotateCtx(context.Background())

// Register and start gRPC service on pod. This is separate from the
Expand Down
70 changes: 61 additions & 9 deletions pkg/server/tenant_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,60 @@ func newTenantStatusServer(
}
}

// dialCallback used to dial specific pods when
// iterating nodes.
func (t *tenantStatusServer) dialCallback(
ctx context.Context, instanceID base.SQLInstanceID, addr string,
) (interface{}, error) {
client, err := t.dialPod(ctx, instanceID, addr)
return client, err
}

func (t *tenantStatusServer) ListSessions(
ctx context.Context, request *serverpb.ListSessionsRequest,
ctx context.Context, req *serverpb.ListSessionsRequest,
) (*serverpb.ListSessionsResponse, error) {
return t.ListLocalSessions(ctx, request)
ctx = propagateGatewayMetadata(ctx)
ctx = t.AnnotateCtx(ctx)

if _, err := t.privilegeChecker.requireViewActivityPermission(ctx); err != nil {
return nil, err
}
if t.sqlServer.SQLInstanceID() == 0 {
return nil, status.Errorf(codes.Unavailable, "instanceID not set")
}

response := &serverpb.ListSessionsResponse{}
nodeStatement := func(ctx context.Context, client interface{}, instanceID base.SQLInstanceID) (interface{}, error) {
statusClient := client.(serverpb.StatusClient)
localResponse, err := statusClient.ListLocalSessions(ctx, req)
if localResponse == nil {
log.Errorf(ctx, "listing local sessions on %d produced a nil result with error %v",
instanceID,
err)
}
return localResponse, err
}
if err := t.iteratePods(ctx, "sessions for nodes",
t.dialCallback,
nodeStatement,
func(instanceID base.SQLInstanceID, resp interface{}) {
sessionResp := resp.(*serverpb.ListSessionsResponse)
response.Sessions = append(response.Sessions, sessionResp.Sessions...)
response.Errors = append(response.Errors, sessionResp.Errors...)
},
func(instanceID base.SQLInstanceID, err error) {
// Log any errors related to the failures.
log.Warningf(ctx, "fan out statements request recorded error from node %d: %v", instanceID, err)
response.Errors = append(response.Errors,
serverpb.ListSessionsError{
Message: err.Error(),
NodeID: roachpb.NodeID(instanceID),
})
},
); err != nil {
return nil, err
}
return response, nil
}

func (t *tenantStatusServer) ListLocalSessions(
Expand Down Expand Up @@ -227,17 +277,19 @@ func (t *tenantStatusServer) Statements(
return statusClient.Statements(ctx, localReq)
}

dialFn := func(ctx context.Context, instanceID base.SQLInstanceID, addr string) (interface{}, error) {
client, err := t.dialPod(ctx, instanceID, addr)
return client, err
}
nodeStatement := func(ctx context.Context, client interface{}, _ base.SQLInstanceID) (interface{}, error) {
nodeStatement := func(ctx context.Context, client interface{}, instanceID base.SQLInstanceID) (interface{}, error) {
statusClient := client.(serverpb.StatusClient)
return statusClient.Statements(ctx, localReq)
localResponse, err := statusClient.Statements(ctx, localReq)
if localResponse == nil {
log.Errorf(ctx, "listing statements on %d produced a nil result with err: %v",
instanceID,
err)
}
return localResponse, err
}

if err := t.iteratePods(ctx, fmt.Sprintf("statement statistics for node %s", req.NodeID),
dialFn,
t.dialCallback,
nodeStatement,
func(instanceID base.SQLInstanceID, resp interface{}) {
statementsResp := resp.(*serverpb.StatementsResponse)
Expand Down
6 changes: 5 additions & 1 deletion pkg/sql/catalog/descs/kv_descriptors.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,10 +267,14 @@ func (kd *kvDescriptors) getSchemasForDatabase(
}
if _, ok := kd.allSchemasForDatabase[dbID]; !ok {
var err error
kd.allSchemasForDatabase[dbID], err = resolver.GetForDatabase(ctx, txn, kd.codec, dbID)
allSchemas, err := resolver.GetForDatabase(ctx, txn, kd.codec, dbID)
if err != nil {
return nil, err
}
kd.allSchemasForDatabase[dbID] = make(map[descpb.ID]string)
for id, entry := range allSchemas {
kd.allSchemasForDatabase[dbID][id] = entry.Name
}
}
return kd.allSchemasForDatabase[dbID], nil
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/catalog/resolver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ go_library(
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlerrors",
"//pkg/util/hlc",
"//pkg/util/log",
"@com_github_cockroachdb_errors//:errors",
],
Expand Down
Loading

0 comments on commit 6e91bed

Please sign in to comment.