Skip to content

Commit

Permalink
sql: inject tenant ID in sqlServerArgs, pass through ExecutorConfig
Browse files Browse the repository at this point in the history
Fixes #47903.

Also known as "the grand plumbing", this commit replaces a few instances
of `TODOSQLCodec` in `pkg/sql/sqlbase/index_encoding.go` and watches the
house of cards fall apart. It then glues the world back together, this
time using a properly injected tenant-bound SQLCodec to encode and
decode all SQL table keys.

A tenant ID field is added to `sqlServerArgs`. This is used to construct
a tenant-bound `keys.SQLCodec` during server creation. This codec
morally lives on the `sql.ExecutorConfig`. In practice, it is also
copied onto `tree.EvalContext` and `execinfra.ServerConfig` to help
carry it around. SQL code is adapted to use this codec whenever it
needs to encode or decode keys.

If all tests pass after this refactor, there is a good chance it got
things right. This is because any use of an uninitialized SQLCodec will
panic immediately when the codec is first used. This was helpful in
ensuring that it was properly plumbed everywhere.
  • Loading branch information
nvanbenschoten committed Apr 30, 2020
1 parent f1fae7e commit 164f82b
Show file tree
Hide file tree
Showing 113 changed files with 616 additions and 337 deletions.
8 changes: 4 additions & 4 deletions pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,14 @@ type tableAndIndex struct {
// spansForAllTableIndexes returns non-overlapping spans for every index and
// table passed in. They would normally overlap if any of them are interleaved.
func spansForAllTableIndexes(
tables []*sqlbase.TableDescriptor, revs []BackupManifest_DescriptorRevision,
codec keys.SQLCodec, tables []*sqlbase.TableDescriptor, revs []BackupManifest_DescriptorRevision,
) []roachpb.Span {

added := make(map[tableAndIndex]bool, len(tables))
sstIntervalTree := interval.NewTree(interval.ExclusiveOverlapper)
for _, table := range tables {
for _, index := range table.AllNonDropIndexes() {
if err := sstIntervalTree.Insert(intervalSpan(table.IndexSpan(index.ID)), false); err != nil {
if err := sstIntervalTree.Insert(intervalSpan(table.IndexSpan(codec, index.ID)), false); err != nil {
panic(errors.NewAssertionErrorWithWrappedErrf(err, "IndexSpan"))
}
added[tableAndIndex{tableID: table.ID, indexID: index.ID}] = true
Expand All @@ -108,7 +108,7 @@ func spansForAllTableIndexes(
for _, idx := range tbl.AllNonDropIndexes() {
key := tableAndIndex{tableID: tbl.ID, indexID: idx.ID}
if !added[key] {
if err := sstIntervalTree.Insert(intervalSpan(tbl.IndexSpan(idx.ID)), false); err != nil {
if err := sstIntervalTree.Insert(intervalSpan(tbl.IndexSpan(codec, idx.ID)), false); err != nil {
panic(errors.NewAssertionErrorWithWrappedErrf(err, "IndexSpan"))
}
added[key] = true
Expand Down Expand Up @@ -528,7 +528,7 @@ func backupPlanHook(
}
}

spans := spansForAllTableIndexes(tables, revs)
spans := spansForAllTableIndexes(p.ExecCfg().Codec, tables, revs)

if len(prevBackups) > 0 {
tablesInPrev := make(map[sqlbase.ID]struct{})
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1046,7 +1046,7 @@ func TestBackupRestoreResume(t *testing.T) {

t.Run("backup", func(t *testing.T) {
sqlDB := sqlutils.MakeSQLRunner(outerDB.DB)
backupStartKey := backupTableDesc.PrimaryIndexSpan().Key
backupStartKey := backupTableDesc.PrimaryIndexSpan(keys.SystemSQLCodec).Key
backupEndKey, err := sqlbase.TestingMakePrimaryIndexKey(backupTableDesc, numAccounts/2)
if err != nil {
t.Fatal(err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -904,7 +904,7 @@ func createImportingTables(

// We get the spans of the restoring tables _as they appear in the backup_,
// that is, in the 'old' keyspace, before we reassign the table IDs.
spans := spansForAllTableIndexes(tables, nil)
spans := spansForAllTableIndexes(p.ExecCfg().Codec, tables, nil)

log.Eventf(ctx, "starting restore for %d tables", len(tables))

Expand Down
5 changes: 3 additions & 2 deletions pkg/ccl/backupccl/show_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"strings"
"testing"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -100,8 +101,8 @@ func TestShowBackup(t *testing.T) {

details1Desc := sqlbase.GetTableDescriptor(tc.Server(0).DB(), "data", "details1")
details2Desc := sqlbase.GetTableDescriptor(tc.Server(0).DB(), "data", "details2")
details1Key := roachpb.Key(sqlbase.MakeIndexKeyPrefix(details1Desc, details1Desc.PrimaryIndex.ID))
details2Key := roachpb.Key(sqlbase.MakeIndexKeyPrefix(details2Desc, details2Desc.PrimaryIndex.ID))
details1Key := roachpb.Key(sqlbase.MakeIndexKeyPrefix(keys.SystemSQLCodec, details1Desc, details1Desc.PrimaryIndex.ID))
details2Key := roachpb.Key(sqlbase.MakeIndexKeyPrefix(keys.SystemSQLCodec, details2Desc, details2Desc.PrimaryIndex.ID))

sqlDB.CheckQueryResults(t, fmt.Sprintf(`SHOW BACKUP RANGES '%s'`, details), [][]string{
{"/Table/56/1", "/Table/56/2", string(details1Key), string(details1Key.PrefixEnd())},
Expand Down
6 changes: 4 additions & 2 deletions pkg/ccl/changefeedccl/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvfeed"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/sql"
Expand Down Expand Up @@ -184,7 +185,7 @@ func createBenchmarkChangefeed(
database, table string,
) (*benchSink, func() error, error) {
tableDesc := sqlbase.GetTableDescriptor(s.DB(), database, table)
spans := []roachpb.Span{tableDesc.PrimaryIndexSpan()}
spans := []roachpb.Span{tableDesc.PrimaryIndexSpan(keys.SystemSQLCodec)}
details := jobspb.ChangefeedDetails{
Targets: jobspb.ChangefeedTargets{tableDesc.ID: jobspb.ChangefeedTarget{
StatementTimeName: tableDesc.Name,
Expand Down Expand Up @@ -232,7 +233,8 @@ func createBenchmarkChangefeed(
NeedsInitialScan: needsInitialScan,
}

rowsFn := kvsToRows(s.LeaseManager().(*sql.LeaseManager), details, buf.Get)
rowsFn := kvsToRows(s.ExecutorConfig().(sql.ExecutorConfig).Codec,
s.LeaseManager().(*sql.LeaseManager), details, buf.Get)
sf := span.MakeFrontier(spans...)
tickFn := emitEntries(s.ClusterSettings(), details, hlc.Timestamp{}, sf,
encoder, sink, rowsFn, TestingKnobs{}, metrics)
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/changefeedccl/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,13 @@ type emitEntry struct {
// returns a closure that may be repeatedly called to advance the changefeed.
// The returned closure is not threadsafe.
func kvsToRows(
codec keys.SQLCodec,
leaseMgr *sql.LeaseManager,
details jobspb.ChangefeedDetails,
inputFn func(context.Context) (kvfeed.Event, error),
) func(context.Context) ([]emitEntry, error) {
_, withDiff := details.Opts[changefeedbase.OptDiff]
rfCache := newRowFetcherCache(leaseMgr)
rfCache := newRowFetcherCache(codec, leaseMgr)

var kvs row.SpanKVFetcher
appendEmitEntryForKV := func(
Expand Down
11 changes: 8 additions & 3 deletions pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"context"

"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
Expand Down Expand Up @@ -99,7 +100,7 @@ func distChangefeedFlow(
}

execCfg := phs.ExecCfg()
trackedSpans, err := fetchSpansForTargets(ctx, execCfg.DB, details.Targets, spansTS)
trackedSpans, err := fetchSpansForTargets(ctx, execCfg.DB, execCfg.Codec, details.Targets, spansTS)
if err != nil {
return err
}
Expand Down Expand Up @@ -211,7 +212,11 @@ func distChangefeedFlow(
}

func fetchSpansForTargets(
ctx context.Context, db *kv.DB, targets jobspb.ChangefeedTargets, ts hlc.Timestamp,
ctx context.Context,
db *kv.DB,
codec keys.SQLCodec,
targets jobspb.ChangefeedTargets,
ts hlc.Timestamp,
) ([]roachpb.Span, error) {
var spans []roachpb.Span
err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
Expand All @@ -223,7 +228,7 @@ func fetchSpansForTargets(
if err != nil {
return err
}
spans = append(spans, tableDesc.PrimaryIndexSpan())
spans = append(spans, tableDesc.PrimaryIndexSpan(codec))
}
return nil
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func (ca *changeAggregator) Start(ctx context.Context) context.Context {
_, withDiff := ca.spec.Feed.Opts[changefeedbase.OptDiff]
kvfeedCfg := makeKVFeedCfg(ca.flowCtx.Cfg, leaseMgr, ca.kvFeedMemMon, ca.spec,
spans, withDiff, buf, metrics)
rowsFn := kvsToRows(leaseMgr, ca.spec.Feed, buf.Get)
rowsFn := kvsToRows(ca.flowCtx.Codec(), leaseMgr, ca.spec.Feed, buf.Get)
ca.tickFn = emitEntries(ca.flowCtx.Cfg.Settings, ca.spec.Feed,
kvfeedCfg.InitialHighWater, sf, ca.encoder, ca.sink, rowsFn, knobs, metrics)
ca.startKVFeed(ctx, kvfeedCfg)
Expand Down
9 changes: 6 additions & 3 deletions pkg/ccl/changefeedccl/rowfetcher_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,16 @@ import (
// StartScanFrom can be used to turn that key (or all the keys making up the
// column families of one row) into a row.
type rowFetcherCache struct {
codec keys.SQLCodec
leaseMgr *sql.LeaseManager
fetchers map[*sqlbase.ImmutableTableDescriptor]*row.Fetcher

a sqlbase.DatumAlloc
}

func newRowFetcherCache(leaseMgr *sql.LeaseManager) *rowFetcherCache {
func newRowFetcherCache(codec keys.SQLCodec, leaseMgr *sql.LeaseManager) *rowFetcherCache {
return &rowFetcherCache{
codec: codec,
leaseMgr: leaseMgr,
fetchers: make(map[*sqlbase.ImmutableTableDescriptor]*row.Fetcher),
}
Expand All @@ -44,7 +46,7 @@ func (c *rowFetcherCache) TableDescForKey(
ctx context.Context, key roachpb.Key, ts hlc.Timestamp,
) (*sqlbase.ImmutableTableDescriptor, error) {
var tableDesc *sqlbase.ImmutableTableDescriptor
key, err := keys.TODOSQLCodec.StripTenantPrefix(key)
key, err := c.codec.StripTenantPrefix(key)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -103,13 +105,14 @@ func (c *rowFetcherCache) RowFetcherForTableDesc(

var rf row.Fetcher
if err := rf.Init(
c.codec,
false, /* reverse */
sqlbase.ScanLockingStrength_FOR_NONE,
false, /* returnRangeInfo */
false, /* isCheck */
&c.a,
row.FetcherTableArgs{
Spans: tableDesc.AllIndexSpans(),
Spans: tableDesc.AllIndexSpans(c.codec),
Desc: tableDesc,
Index: &tableDesc.PrimaryIndex,
ColIdxMap: colIdxMap,
Expand Down
14 changes: 8 additions & 6 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,9 +710,10 @@ func importPlanHook(

// Prepare the protected timestamp record.
var spansToProtect []roachpb.Span
codec := p.(sql.PlanHookState).ExecCfg().Codec
for i := range tableDetails {
if td := &tableDetails[i]; !td.IsNew {
spansToProtect = append(spansToProtect, td.Desc.TableSpan())
spansToProtect = append(spansToProtect, td.Desc.TableSpan(codec))
}
}
if len(spansToProtect) > 0 {
Expand Down Expand Up @@ -1260,9 +1261,8 @@ func (r *importResumer) OnFailOrCancel(ctx context.Context, phs interface{}) err
telemetry.Count("import.total.failed")

cfg := phs.(sql.PlanHookState).ExecCfg()
jr := cfg.JobRegistry
return cfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := r.dropTables(ctx, jr, txn); err != nil {
if err := r.dropTables(ctx, txn, cfg); err != nil {
return err
}
return r.releaseProtectedTimestamp(ctx, txn, cfg.ProtectedTimestampProvider)
Expand All @@ -1289,7 +1289,9 @@ func (r *importResumer) releaseProtectedTimestamp(
}

// dropTables implements the OnFailOrCancel logic.
func (r *importResumer) dropTables(ctx context.Context, jr *jobs.Registry, txn *kv.Txn) error {
func (r *importResumer) dropTables(
ctx context.Context, txn *kv.Txn, execCfg *sql.ExecutorConfig,
) error {
details := r.job.Details().(jobspb.ImportDetails)

// Needed to trigger the schema change manager.
Expand Down Expand Up @@ -1325,7 +1327,7 @@ func (r *importResumer) dropTables(ctx context.Context, jr *jobs.Registry, txn *
return errors.Errorf("invalid pre-IMPORT time to rollback")
}
ts := hlc.Timestamp{WallTime: details.Walltime}.Prev()
if err := sql.RevertTables(ctx, txn.DB(), revert, ts, sql.RevertTableDefaultBatchSize); err != nil {
if err := sql.RevertTables(ctx, txn.DB(), execCfg, revert, ts, sql.RevertTableDefaultBatchSize); err != nil {
return errors.Wrap(err, "rolling back partially completed IMPORT")
}
}
Expand Down Expand Up @@ -1382,7 +1384,7 @@ func (r *importResumer) dropTables(ctx context.Context, jr *jobs.Registry, txn *
Progress: jobspb.SchemaChangeGCProgress{},
NonCancelable: true,
}
if _, err := jr.CreateJobWithTxn(ctx, gcJobRecord, txn); err != nil {
if _, err := execCfg.JobRegistry.CreateJobWithTxn(ctx, gcJobRecord, txn); err != nil {
return err
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/importccl/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func Load(
evalCtx := &tree.EvalContext{}
evalCtx.SetTxnTimestamp(curTime)
evalCtx.SetStmtTimestamp(curTime)
evalCtx.Codec = keys.TODOSQLCodec

blobClientFactory := blobs.TestBlobServiceClient(writeToDir)
conf, err := cloud.ExternalStorageConfFromURI(uri)
Expand Down Expand Up @@ -223,7 +224,7 @@ func Load(
}

ri, err = row.MakeInserter(
ctx, nil, tableDesc, tableDesc.Columns, row.SkipFKs, nil /* fkTables */, &sqlbase.DatumAlloc{},
ctx, nil, evalCtx.Codec, tableDesc, tableDesc.Columns, row.SkipFKs, nil /* fkTables */, &sqlbase.DatumAlloc{},
)
if err != nil {
return backupccl.BackupManifest{}, errors.Wrap(err, "make row inserter")
Expand Down
7 changes: 3 additions & 4 deletions pkg/ccl/importccl/read_import_mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/lex"
Expand Down Expand Up @@ -332,7 +331,7 @@ func readMysqlCreateTable(
if match != "" && !found {
return nil, errors.Errorf("table %q not found in file (found tables: %s)", match, strings.Join(names, ", "))
}
if err := addDelayedFKs(ctx, fkDefs, fks.resolver, evalCtx.Settings); err != nil {
if err := addDelayedFKs(ctx, fkDefs, fks.resolver, evalCtx); err != nil {
return nil, err
}
return ret, nil
Expand Down Expand Up @@ -540,11 +539,11 @@ type delayedFK struct {
}

func addDelayedFKs(
ctx context.Context, defs []delayedFK, resolver fkResolver, settings *cluster.Settings,
ctx context.Context, defs []delayedFK, resolver fkResolver, evalCtx *tree.EvalContext,
) error {
for _, def := range defs {
if err := sql.ResolveFK(
ctx, nil, resolver, def.tbl, def.def, map[sqlbase.ID]*sqlbase.MutableTableDescriptor{}, sql.NewTable, tree.ValidationDefault, settings,
ctx, nil, resolver, def.tbl, def.def, map[sqlbase.ID]*sqlbase.MutableTableDescriptor{}, sql.NewTable, tree.ValidationDefault, evalCtx,
); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/read_import_pgdump.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func readPostgresCreateTable(
}
for _, constraint := range constraints {
if err := sql.ResolveFK(
evalCtx.Ctx(), nil /* txn */, fks.resolver, desc, constraint, backrefs, sql.NewTable, tree.ValidationDefault, p.ExecCfg().Settings,
evalCtx.Ctx(), nil /* txn */, fks.resolver, desc, constraint, backrefs, sql.NewTable, tree.ValidationDefault, evalCtx,
); err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/importccl/testutils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ var testEvalCtx = &tree.EvalContext{
},
StmtTimestamp: timeutil.Unix(100000000, 0),
Settings: cluster.MakeTestingClusterSettings(),
Codec: keys.SystemSQLCodec,
}

// Value generator represents a value of some data at specified row/col.
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/partitionccl/drop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/gcjob"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
Expand Down Expand Up @@ -71,7 +72,7 @@ func TestDropIndexWithZoneConfigCCL(t *testing.T) {
if err != nil {
t.Fatal(err)
}
indexSpan := tableDesc.IndexSpan(indexDesc.ID)
indexSpan := tableDesc.IndexSpan(keys.SystemSQLCodec, indexDesc.ID)
tests.CheckKeyCount(t, kvDB, indexSpan, numRows)

// Set zone configs on the primary index, secondary index, and one partition
Expand Down
9 changes: 5 additions & 4 deletions pkg/ccl/partitionccl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func selectPartitionExprs(
if err := tableDesc.ForeachNonDropIndex(func(idxDesc *sqlbase.IndexDescriptor) error {
genExpr := true
return selectPartitionExprsByName(
a, tableDesc, idxDesc, &idxDesc.Partitioning, prefixDatums, exprsByPartName, genExpr)
a, evalCtx, tableDesc, idxDesc, &idxDesc.Partitioning, prefixDatums, exprsByPartName, genExpr)
}); err != nil {
return nil, err
}
Expand Down Expand Up @@ -322,6 +322,7 @@ func selectPartitionExprs(
// that the requested partitions are all valid).
func selectPartitionExprsByName(
a *sqlbase.DatumAlloc,
evalCtx *tree.EvalContext,
tableDesc *sqlbase.TableDescriptor,
idxDesc *sqlbase.IndexDescriptor,
partDesc *sqlbase.PartitioningDescriptor,
Expand All @@ -340,7 +341,7 @@ func selectPartitionExprsByName(
exprsByPartName[l.Name] = tree.DBoolFalse
var fakeDatums tree.Datums
if err := selectPartitionExprsByName(
a, tableDesc, idxDesc, &l.Subpartitioning, fakeDatums, exprsByPartName, genExpr,
a, evalCtx, tableDesc, idxDesc, &l.Subpartitioning, fakeDatums, exprsByPartName, genExpr,
); err != nil {
return err
}
Expand Down Expand Up @@ -380,7 +381,7 @@ func selectPartitionExprsByName(
for _, l := range partDesc.List {
for _, valueEncBuf := range l.Values {
t, _, err := sqlbase.DecodePartitionTuple(
a, tableDesc, idxDesc, partDesc, valueEncBuf, prefixDatums)
a, evalCtx.Codec, tableDesc, idxDesc, partDesc, valueEncBuf, prefixDatums)
if err != nil {
return err
}
Expand Down Expand Up @@ -414,7 +415,7 @@ func selectPartitionExprsByName(
genExpr = false
}
if err := selectPartitionExprsByName(
a, tableDesc, idxDesc, &l.Subpartitioning, allDatums, exprsByPartName, genExpr,
a, evalCtx, tableDesc, idxDesc, &l.Subpartitioning, allDatums, exprsByPartName, genExpr,
); err != nil {
return err
}
Expand Down
Loading

0 comments on commit 164f82b

Please sign in to comment.