Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DNM] sql: caching descriptors in distsql #53747

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/sql/catalog/descriptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ type TableDescriptor interface {
IndexSpan(codec keys.SQLCodec, id descpb.IndexID) roachpb.Span
FindIndexByID(id descpb.IndexID) (*descpb.IndexDescriptor, error)
FindIndexByName(name string) (_ *descpb.IndexDescriptor, dropped bool, _ error)
FindIndexByIndexIdx(idx int) (*descpb.IndexDescriptor, bool, error)
FindIndexesWithPartition(name string) []*descpb.IndexDescriptor
GetIndexMutationCapabilities(id descpb.IndexID) (isMutation, isWriteOnly bool)
KeysPerRow(id descpb.IndexID) (int, error)
Expand All @@ -125,6 +126,7 @@ type TableDescriptor interface {
PrimaryKeyString() string

GetPublicColumns() []descpb.ColumnDescriptor
GetReadableColumns() []descpb.ColumnDescriptor
ForeachPublicColumn(f func(col *descpb.ColumnDescriptor) error) error
ForeachNonDropColumn(f func(col *descpb.ColumnDescriptor) error) error
NamesForColumnIDs(ids descpb.ColumnIDs) ([]string, error)
Expand Down Expand Up @@ -172,6 +174,8 @@ type TableDescriptor interface {
ForeachInboundFK(f func(fk *descpb.ForeignKeyConstraint) error) error
FindActiveColumnByName(s string) (*descpb.ColumnDescriptor, error)
WritableColumns() []descpb.ColumnDescriptor
ColumnTypes() []*types.T
ColumnTypesWithMutations(mutations bool) []*types.T
}

// TypeDescriptor will eventually be called typedesc.Descriptor.
Expand Down
58 changes: 47 additions & 11 deletions pkg/sql/catalog/descs/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -536,7 +537,7 @@ func (tc *Collection) getUserDefinedSchemaVersion(
// the database which doesn't reflect the changes to the schema. But this
// isn't a problem for correctness; it can only happen on other sessions
// before the schema change has returned results.
desc, err := tc.getDescriptorVersionByID(ctx, txn, schemaInfo.ID, flags.CommonLookupFlags, true /* setTxnDeadline */)
desc, err := tc.getDescriptorVersionByID(ctx, txn, txn.ReadTimestamp(), schemaInfo.ID, flags.CommonLookupFlags, true /* setTxnDeadline */)
if err != nil {
if errors.Is(err, catalog.ErrDescriptorNotFound) {
return nil, sqlerrors.NewUndefinedSchemaError(schemaName)
Expand Down Expand Up @@ -853,7 +854,7 @@ func (tc *Collection) GetDatabaseVersionByID(
return tc.deprecatedDatabaseCache().GetDatabaseDescByID(ctx, txn, dbID)
}

desc, err := tc.getDescriptorVersionByID(ctx, txn, dbID, flags.CommonLookupFlags, true /* setTxnDeadline */)
desc, err := tc.getDescriptorVersionByID(ctx, txn, txn.ReadTimestamp(), dbID, flags.CommonLookupFlags, true /* setTxnDeadline */)
if err != nil {
if errors.Is(err, catalog.ErrDescriptorNotFound) {
return nil, sqlerrors.NewUndefinedDatabaseError(fmt.Sprintf("[%d]", dbID))
Expand All @@ -867,11 +868,34 @@ func (tc *Collection) GetDatabaseVersionByID(
return db, nil
}

// GetTableVersionByID is a by-ID variant of GetTableVersion (i.e. uses same cache).
func (tc *Collection) GetTableVersionByID(
ctx context.Context, txn *kv.Txn, tableID descpb.ID, flags tree.ObjectLookupFlags,
func (tc *Collection) GetTableVersionByIDWithMinVersion(
ctx context.Context,
minTimestamp hlc.Timestamp,
txn *kv.Txn,
tableID descpb.ID,
version descpb.DescriptorVersion,
flags tree.ObjectLookupFlags,
) (*tabledesc.Immutable, error) {
desc, err := tc.getTableVersionByIDImpl(ctx, txn, minTimestamp, tableID, flags)
if err != nil {
return nil, err
}
if desc.GetVersion() != version {
if err := tc.leaseMgr.AcquireFreshestFromStore(ctx, desc.GetID()); err != nil {
return nil, err
}
}
return tc.getTableVersionByIDImpl(ctx, txn, minTimestamp, tableID, flags)
}

func (tc *Collection) getTableVersionByIDImpl(
ctx context.Context,
txn *kv.Txn,
readTimestamp hlc.Timestamp,
tableID descpb.ID,
flags tree.ObjectLookupFlags,
) (*tabledesc.Immutable, error) {
desc, err := tc.getDescriptorVersionByID(ctx, txn, tableID, flags.CommonLookupFlags, true /* setTxnDeadline */)
desc, err := tc.getDescriptorVersionByID(ctx, txn, readTimestamp, tableID, flags.CommonLookupFlags, txn != nil)
if err != nil {
if errors.Is(err, catalog.ErrDescriptorNotFound) {
return nil, sqlerrors.NewUndefinedRelationError(
Expand All @@ -891,8 +915,20 @@ func (tc *Collection) GetTableVersionByID(
return hydrated.(*tabledesc.Immutable), nil
}

// GetTableVersionByID is a by-ID variant of GetTableVersion (i.e. uses same cache).
func (tc *Collection) GetTableVersionByID(
ctx context.Context, txn *kv.Txn, tableID descpb.ID, flags tree.ObjectLookupFlags,
) (*tabledesc.Immutable, error) {
return tc.getTableVersionByIDImpl(ctx, txn, txn.ReadTimestamp(), tableID, flags)
}

func (tc *Collection) getDescriptorVersionByID(
ctx context.Context, txn *kv.Txn, id descpb.ID, flags tree.CommonLookupFlags, setTxnDeadline bool,
ctx context.Context,
txn *kv.Txn,
readTimestamp hlc.Timestamp,
id descpb.ID,
flags tree.CommonLookupFlags,
setTxnDeadline bool,
) (catalog.Descriptor, error) {
if flags.AvoidCached || lease.TestingTableLeasesAreDisabled() {
desc, err := catalogkv.GetDescriptorByID(ctx, txn, tc.codec(), id, catalogkv.Immutable,
Expand Down Expand Up @@ -926,7 +962,6 @@ func (tc *Collection) getDescriptorVersionByID(
return desc, nil
}

readTimestamp := txn.ReadTimestamp()
desc, expiration, err := tc.leaseMgr.Acquire(ctx, readTimestamp, id)
if err != nil {
return nil, err
Expand All @@ -939,7 +974,7 @@ func (tc *Collection) getDescriptorVersionByID(
tc.leasedDescriptors.add(desc)
log.VEventf(ctx, 2, "added descriptor %q to collection", desc.GetName())

if setTxnDeadline {
if setTxnDeadline && txn != nil {
// If the descriptor we just acquired expires before the txn's deadline,
// reduce the deadline. We use ReadTimestamp() that doesn't return the commit
// timestamp, so we need to set a deadline on the transaction to prevent it
Expand Down Expand Up @@ -1018,7 +1053,7 @@ func (tc *Collection) ResolveSchemaByID(

// Otherwise, fall back to looking up the descriptor with the desired ID.
desc, err := tc.getDescriptorVersionByID(
ctx, txn, schemaID, tree.CommonLookupFlags{Required: true}, true /* setTxnDeadline */)
ctx, txn, txn.ReadTimestamp(), schemaID, tree.CommonLookupFlags{Required: true}, true /* setTxnDeadline */)
if err != nil {
return catalog.ResolvedSchema{}, err
}
Expand Down Expand Up @@ -1319,7 +1354,7 @@ func (tc *Collection) GetTypeVersion(
func (tc *Collection) GetTypeVersionByID(
ctx context.Context, txn *kv.Txn, typeID descpb.ID, flags tree.ObjectLookupFlags,
) (*typedesc.Immutable, error) {
desc, err := tc.getDescriptorVersionByID(ctx, txn, typeID, flags.CommonLookupFlags, true /* setTxnDeadline */)
desc, err := tc.getDescriptorVersionByID(ctx, txn, txn.ReadTimestamp(), typeID, flags.CommonLookupFlags, true /* setTxnDeadline */)
if err != nil {
if errors.Is(err, catalog.ErrDescriptorNotFound) {
return nil, pgerror.Newf(
Expand Down Expand Up @@ -1701,6 +1736,7 @@ func (dt *DistSQLTypeResolver) GetTypeDescriptor(
desc, err := dt.descriptors.getDescriptorVersionByID(
ctx,
dt.txn,
dt.txn.ReadTimestamp(),
id,
tree.CommonLookupFlags{Required: true},
false, /* setTxnDeadline */
Expand Down
7 changes: 4 additions & 3 deletions pkg/sql/catalog/tabledesc/structured.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,12 @@ func MakeImmutable(tbl descpb.TableDescriptor) Immutable {
}

// Iterate through all mutation columns.
for _, c := range publicAndNonPublicCols[len(tbl.Columns):] {
for i := range publicAndNonPublicCols[len(tbl.Columns):] {
// Mutation column may need to be fetched, but may not be completely backfilled
// and have be null values (even though they may be configured as NOT NULL).
c := &publicAndNonPublicCols[i]
c.Nullable = true
readableCols = append(readableCols, c)
readableCols = append(readableCols, *c)
}
}

Expand Down Expand Up @@ -2472,7 +2473,7 @@ func (desc *Immutable) FindColumnMutationByName(name tree.Name) *descpb.Descript
}

// ColumnIdxMap returns a map from Column ID to the ordinal position of that
// column.
// column. It must not be modified as it is shared.
func (desc *Immutable) ColumnIdxMap() map[descpb.ColumnID]int {
return desc.ColumnIdxMapWithMutations(false)
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/catalog/tabledesc/table_desc.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type Immutable struct {
// are all set to nullable while column backfilling is still in
// progress, as mutation columns may have NULL values.
ReadableColumns []descpb.ColumnDescriptor
// columnIdxMap map[descpb.ColumnID]int

// columnsWithUDTs is a set of indexes into publicAndNonPublicCols containing
// indexes of columns that contain user defined types.
Expand Down Expand Up @@ -123,3 +124,7 @@ func (desc *Mutable) IsUncommittedVersion() bool {
func (desc *Mutable) SetDrainingNames(names []descpb.NameInfo) {
desc.DrainingNames = names
}

func (desc *Immutable) GetReadableColumns() []descpb.ColumnDescriptor {
return desc.ReadableColumns
}
63 changes: 46 additions & 17 deletions pkg/sql/colfetcher/colbatch_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
Expand Down Expand Up @@ -137,30 +138,58 @@ func NewColBatchScan(
// indicates that we're probably doing this wrong. Instead we should be
// just seting the ID and Version in the spec or something like that and
// retrieving the hydrated Immutable from cache.
table := tabledesc.NewImmutable(spec.Table)
var table catalog.TableDescriptor
if spec.TableIsUncommittedVersion {
var err error
table, err = flowCtx.TypeResolverFactory.Descriptors.GetTableVersionByIDWithMinVersion(
flowCtx.EvalCtx.Ctx(),
spec.Table.ModificationTime,
flowCtx.EvalCtx.Txn,
spec.Table.ID, spec.Table.Version, tree.ObjectLookupFlagsWithRequired())
if err != nil {
return nil, err
}
} else {
table = tabledesc.NewImmutable(spec.Table)
}

typs := table.ColumnTypesWithMutations(returnMutations)
columnIdxMap := table.ColumnIdxMapWithMutations(returnMutations)

// Add all requested system columns to the output.
sysColTypes, sysColDescs, err := colinfo.GetSystemColumnTypesAndDescriptors(spec.SystemColumns)
if err != nil {
return nil, err
}
typs = append(typs, sysColTypes...)
for i := range sysColDescs {
columnIdxMap[sysColDescs[i].ID] = len(columnIdxMap)
var sysColDescs []descpb.ColumnDescriptor
if len(spec.SystemColumns) > 0 {
columnIdxMapCpy := make(map[descpb.ColumnID]int, len(columnIdxMap)+len(spec.SystemColumns))
for id, idx := range columnIdxMap {
columnIdxMapCpy[id] = idx
}
columnIdxMap = columnIdxMapCpy
var sysColTypes []*types.T
var err error
sysColTypes, sysColDescs, err = colinfo.GetSystemColumnTypesAndDescriptors(spec.SystemColumns)
if err != nil {
return nil, err
}
typs = append(typs, sysColTypes...)
for i := range sysColDescs {
columnIdxMap[sysColDescs[i].ID] = len(columnIdxMap)
}
}

semaCtx := tree.MakeSemaContext()
evalCtx := flowCtx.NewEvalCtx()
// Before we can safely use types from the table descriptor, we need to
// make sure they are hydrated. In row execution engine it is done during
// the processor initialization, but neither ColBatchScan nor cFetcher are
// processors, so we need to do the hydration ourselves.
resolver := flowCtx.TypeResolverFactory.NewTypeResolver(evalCtx.Txn)
semaCtx.TypeResolver = resolver
if err := resolver.HydrateTypeSlice(evalCtx.Context, typs); err != nil {
return nil, err
if spec.TableIsUncommittedVersion {
// Before we can safely use types from the table descriptor, we need to
// make sure they are hydrated. In row execution engine it is done during
// the processor initialization, but neither ColBatchScan nor cFetcher are
// processors, so we need to do the hydration ourselves.
resolver := flowCtx.TypeResolverFactory.NewTypeResolver(evalCtx.Txn)
semaCtx.TypeResolver = resolver
if err := resolver.HydrateTypeSlice(evalCtx.Context, typs); err != nil {
return nil, err
}
}

helper := execinfra.ProcOutputHelper{}
if err := helper.Init(
post,
Expand Down Expand Up @@ -227,7 +256,7 @@ func initCRowFetcher(

cols := immutDesc.Columns
if scanVisibility == execinfra.ScanVisibilityPublicAndNotPublic {
cols = immutDesc.ReadableColumns
cols = immutDesc.GetReadableColumns()
}
// Add on any requested system columns. We slice cols to avoid modifying
// the underlying table descriptor.
Expand Down
55 changes: 31 additions & 24 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -941,12 +941,13 @@ func initTableReaderSpec(
) (*execinfrapb.TableReaderSpec, execinfrapb.PostProcessSpec, error) {
s := physicalplan.NewTableReaderSpec()
*s = execinfrapb.TableReaderSpec{
Table: *n.desc.TableDesc(),
Reverse: n.reverse,
IsCheck: n.isCheck,
Visibility: n.colCfg.visibility,
LockingStrength: n.lockingStrength,
LockingWaitPolicy: n.lockingWaitPolicy,
Table: *n.desc.TableDesc(),
TableIsUncommittedVersion: n.desc.IsUncommittedVersion(),
Reverse: n.reverse,
IsCheck: n.isCheck,
Visibility: n.colCfg.visibility,
LockingStrength: n.lockingStrength,
LockingWaitPolicy: n.lockingWaitPolicy,

// Retain the capacity of the spans slice.
Spans: s.Spans[:0],
Expand Down Expand Up @@ -1940,13 +1941,14 @@ func (dsp *DistSQLPlanner) createPlanForIndexJoin(
plan.AddProjection(pkCols)

joinReaderSpec := execinfrapb.JoinReaderSpec{
Table: *n.table.desc.TableDesc(),
IndexIdx: 0,
Visibility: n.table.colCfg.visibility,
LockingStrength: n.table.lockingStrength,
LockingWaitPolicy: n.table.lockingWaitPolicy,
MaintainOrdering: len(n.reqOrdering) > 0,
SystemColumns: n.table.systemColumns,
Table: *n.table.desc.TableDesc(),
TableIsUncommittedVersion: n.table.desc.IsUncommittedVersion(),
IndexIdx: 0,
Visibility: n.table.colCfg.visibility,
LockingStrength: n.table.lockingStrength,
LockingWaitPolicy: n.table.lockingWaitPolicy,
MaintainOrdering: len(n.reqOrdering) > 0,
SystemColumns: n.table.systemColumns,
}

post := execinfrapb.PostProcessSpec{
Expand Down Expand Up @@ -1998,13 +2000,14 @@ func (dsp *DistSQLPlanner) createPlanForLookupJoin(
}

joinReaderSpec := execinfrapb.JoinReaderSpec{
Table: *n.table.desc.TableDesc(),
Type: n.joinType,
Visibility: n.table.colCfg.visibility,
LockingStrength: n.table.lockingStrength,
LockingWaitPolicy: n.table.lockingWaitPolicy,
MaintainOrdering: len(n.reqOrdering) > 0,
SystemColumns: n.table.systemColumns,
Table: *n.table.desc.TableDesc(),
TableIsUncommittedVersion: n.table.desc.IsUncommittedVersion(),
Type: n.joinType,
Visibility: n.table.colCfg.visibility,
LockingStrength: n.table.lockingStrength,
LockingWaitPolicy: n.table.lockingWaitPolicy,
MaintainOrdering: len(n.reqOrdering) > 0,
SystemColumns: n.table.systemColumns,
}
joinReaderSpec.IndexIdx, err = getIndexIdx(n.table.index, n.table.desc)
if err != nil {
Expand Down Expand Up @@ -2180,11 +2183,14 @@ func (dsp *DistSQLPlanner) createPlanForZigzagJoin(
plan = &p

tables := make([]descpb.TableDescriptor, len(n.sides))
tableIsUncommittedVersions := make([]bool, len(n.sides))
indexOrdinals := make([]uint32, len(n.sides))
cols := make([]execinfrapb.Columns, len(n.sides))

numStreamCols := 0
for i, side := range n.sides {
tables[i] = *side.scan.desc.TableDesc()
tableIsUncommittedVersions[i] = side.scan.desc.IsUncommittedVersion()
indexOrdinals[i], err = getIndexIdx(side.scan.index, side.scan.desc)
if err != nil {
return nil, err
Expand All @@ -2201,10 +2207,11 @@ func (dsp *DistSQLPlanner) createPlanForZigzagJoin(
// The zigzag join node only represents inner joins, so hardcode Type to
// InnerJoin.
zigzagJoinerSpec := execinfrapb.ZigzagJoinerSpec{
Tables: tables,
IndexOrdinals: indexOrdinals,
EqColumns: cols,
Type: descpb.InnerJoin,
Tables: tables,
TableIsUncommittedVersion: tableIsUncommittedVersions,
IndexOrdinals: indexOrdinals,
EqColumns: cols,
Type: descpb.InnerJoin,
}
zigzagJoinerSpec.FixedValues = make([]*execinfrapb.ValuesCoreSpec, len(n.sides))

Expand Down
9 changes: 5 additions & 4 deletions pkg/sql/distsql_spec_exec_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,11 @@ func (e *distSQLSpecExecFactory) ConstructScan(
colsToTableOrdinalMap := toTableOrdinals(cols, tabDesc, colCfg.visibility)
trSpec := physicalplan.NewTableReaderSpec()
*trSpec = execinfrapb.TableReaderSpec{
Table: *tabDesc.TableDesc(),
Reverse: params.Reverse,
IsCheck: false,
Visibility: colCfg.visibility,
Table: *tabDesc.TableDesc(),
TableIsUncommittedVersion: tabDesc.IsUncommittedVersion(),
Reverse: params.Reverse,
IsCheck: false,
Visibility: colCfg.visibility,
// Retain the capacity of the spans slice.
Spans: trSpec.Spans[:0],
SystemColumns: systemColumns,
Expand Down
Loading