Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: remove a bunch of allocations from name and type resolution paths #57201

Merged
merged 5 commits into from
Dec 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 47 additions & 34 deletions pkg/sql/catalog/descs/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,9 @@ func (tc *Collection) getLeasedDescriptorByName(
// continue to use N to refer to X even if N is renamed during the
// transaction.
if desc = tc.leasedDescriptors.getByName(parentID, parentSchemaID, name); desc != nil {
log.VEventf(ctx, 2, "found descriptor in collection for '%s'", name)
if log.V(2) {
log.Eventf(ctx, "found descriptor in collection for '%s'", name)
}
return desc, false, nil
}

Expand All @@ -236,7 +238,9 @@ func (tc *Collection) getLeasedDescriptorByName(
}

tc.leasedDescriptors.add(desc)
log.VEventf(ctx, 2, "added descriptor '%s' to collection: %+v", name, desc)
if log.V(2) {
log.Eventf(ctx, "added descriptor '%s' to collection: %+v", name, desc)
}

// If the descriptor we just acquired expires before the txn's deadline,
// reduce the deadline. We use ReadTimestamp() that doesn't return the commit
Expand Down Expand Up @@ -290,7 +294,7 @@ func (tc *Collection) GetMutableDatabaseDescriptor(
func (tc *Collection) GetMutableTableDescriptor(
ctx context.Context, txn *kv.Txn, tn *tree.TableName, flags tree.ObjectLookupFlags,
) (*tabledesc.Mutable, error) {
desc, err := tc.getMutableObjectDescriptor(ctx, txn, tn, flags)
desc, err := tc.getMutableObjectDescriptor(ctx, txn, tn.Catalog(), tn.Schema(), tn.Object(), flags)
if err != nil {
return nil, err
}
Expand All @@ -306,14 +310,17 @@ func (tc *Collection) GetMutableTableDescriptor(
}

func (tc *Collection) getMutableObjectDescriptor(
ctx context.Context, txn *kv.Txn, name tree.ObjectName, flags tree.ObjectLookupFlags,
ctx context.Context,
txn *kv.Txn,
catalogName, schemaName, objectName string,
flags tree.ObjectLookupFlags,
) (catalog.MutableDescriptor, error) {
if log.V(2) {
log.Infof(ctx, "reading mutable descriptor on '%s'", name)
log.Infof(ctx, "reading mutable descriptor on '%s.%s.%s'", catalogName, schemaName, objectName)
}

// Resolve the database.
db, err := tc.GetDatabaseVersion(ctx, txn, name.Catalog(),
db, err := tc.GetDatabaseVersion(ctx, txn, catalogName,
tree.DatabaseLookupFlags{
Required: flags.Required,
AvoidCached: flags.AvoidCached,
Expand All @@ -326,7 +333,7 @@ func (tc *Collection) getMutableObjectDescriptor(
dbID := db.GetID()

// Resolve the schema to the ID of the schema.
foundSchema, resolvedSchema, err := tc.ResolveSchema(ctx, txn, dbID, name.Schema(),
foundSchema, resolvedSchema, err := tc.ResolveSchema(ctx, txn, dbID, schemaName,
tree.SchemaLookupFlags{
Required: flags.Required,
AvoidCached: flags.AvoidCached,
Expand All @@ -340,7 +347,7 @@ func (tc *Collection) getMutableObjectDescriptor(
if refuseFurtherLookup, desc, err := tc.getUncommittedDescriptor(
dbID,
resolvedSchema.ID,
name.Object(),
objectName,
flags.CommonLookupFlags,
); refuseFurtherLookup || err != nil {
return nil, err
Expand All @@ -354,9 +361,9 @@ func (tc *Collection) getMutableObjectDescriptor(
txn,
tc.settings,
tc.codec(),
name.Catalog(),
name.Schema(),
name.Object(),
catalogName,
schemaName,
objectName,
flags,
)
if err != nil || obj == nil {
Expand Down Expand Up @@ -621,14 +628,19 @@ func (tc *Collection) GetDatabaseVersion(
func (tc *Collection) GetTableVersion(
ctx context.Context, txn *kv.Txn, tn *tree.TableName, flags tree.ObjectLookupFlags,
) (*tabledesc.Immutable, error) {
desc, err := tc.getObjectVersion(ctx, txn, tn, flags)
desc, err := tc.getObjectVersion(ctx, txn, tn.Catalog(), tn.Schema(), tn.Object(), flags)
if err != nil {
return nil, err
}
table, ok := desc.(*tabledesc.Immutable)
if !ok {
if flags.Required {
return nil, sqlerrors.NewUndefinedRelationError(tn)
// Copy the input TableName to avoid allocations:
// NewUndefinedRelationError requires that we promote TableName to a
// NodeFormatter, which causes the input TableName to get heap allocated
// even in cases where it wouldn't otherwise.
errorTn := *tn
return nil, sqlerrors.NewUndefinedRelationError(&errorTn)
}
return nil, nil
}
Expand All @@ -640,23 +652,26 @@ func (tc *Collection) GetTableVersion(
}

func (tc *Collection) getObjectVersion(
ctx context.Context, txn *kv.Txn, name tree.ObjectName, flags tree.ObjectLookupFlags,
ctx context.Context,
txn *kv.Txn,
catalogName, schemaName, objectName string,
flags tree.ObjectLookupFlags,
) (catalog.Descriptor, error) {
readObjectFromStore := func() (catalog.Descriptor, error) {
return getObjectDesc(
ctx,
txn,
tc.settings,
tc.codec(),
name.Catalog(),
name.Schema(),
name.Object(),
catalogName,
schemaName,
objectName,
flags,
)
}

// Resolve the database.
db, err := tc.GetDatabaseVersion(ctx, txn, name.Catalog(),
db, err := tc.GetDatabaseVersion(ctx, txn, catalogName,
tree.DatabaseLookupFlags{
Required: flags.Required,
AvoidCached: flags.AvoidCached,
Expand All @@ -669,7 +684,7 @@ func (tc *Collection) getObjectVersion(
dbID := db.GetID()

// Resolve the schema to the ID of the schema.
foundSchema, resolvedSchema, err := tc.ResolveSchema(ctx, txn, dbID, name.Schema(),
foundSchema, resolvedSchema, err := tc.ResolveSchema(ctx, txn, dbID, schemaName,
tree.SchemaLookupFlags{
Required: flags.Required,
AvoidCached: flags.AvoidCached,
Expand All @@ -689,12 +704,12 @@ func (tc *Collection) getObjectVersion(
// system.users. For now we're sticking to disabling caching of
// all system descriptors except the role-members-desc.
avoidCache := flags.AvoidCached || lease.TestingTableLeasesAreDisabled() ||
(name.Catalog() == systemschema.SystemDatabaseName && name.Object() != systemschema.RoleMembersTable.Name)
(catalogName == systemschema.SystemDatabaseName && objectName != systemschema.RoleMembersTable.Name)

if refuseFurtherLookup, desc, err := tc.getUncommittedDescriptor(
dbID,
schemaID,
name.Object(),
objectName,
flags.CommonLookupFlags,
); refuseFurtherLookup || err != nil {
return nil, err
Expand All @@ -715,7 +730,7 @@ func (tc *Collection) getObjectVersion(
return readObjectFromStore()
}

desc, shouldReadFromStore, err := tc.getLeasedDescriptorByName(ctx, txn, dbID, schemaID, name.Object())
desc, shouldReadFromStore, err := tc.getLeasedDescriptorByName(ctx, txn, dbID, schemaID, objectName)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1171,7 +1186,7 @@ func (tc *Collection) GetUncommittedTables() (tables []*tabledesc.Immutable) {
func (tc *Collection) GetMutableTypeDescriptor(
ctx context.Context, txn *kv.Txn, tn *tree.TypeName, flags tree.ObjectLookupFlags,
) (*typedesc.Mutable, error) {
desc, err := tc.getMutableObjectDescriptor(ctx, txn, tn, flags)
desc, err := tc.getMutableObjectDescriptor(ctx, txn, tn.Catalog(), tn.Schema(), tn.Object(), flags)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1201,7 +1216,7 @@ func (tc *Collection) GetMutableTypeVersionByID(
func (tc *Collection) GetTypeVersion(
ctx context.Context, txn *kv.Txn, tn *tree.TypeName, flags tree.ObjectLookupFlags,
) (*typedesc.Immutable, error) {
desc, err := tc.getObjectVersion(ctx, txn, tn, flags)
desc, err := tc.getObjectVersion(ctx, txn, tn.Catalog(), tn.Schema(), tn.Object(), flags)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1503,9 +1518,9 @@ type DistSQLTypeResolverFactory struct {

// NewTypeResolver creates a new TypeResolver that is bound under the input
// transaction. It returns a nil resolver if the factory itself is nil.
func (df *DistSQLTypeResolverFactory) NewTypeResolver(txn *kv.Txn) *DistSQLTypeResolver {
func (df *DistSQLTypeResolverFactory) NewTypeResolver(txn *kv.Txn) DistSQLTypeResolver {
if df == nil {
return nil
return DistSQLTypeResolver{}
}
return NewDistSQLTypeResolver(df.Descriptors, txn)
}
Expand All @@ -1526,24 +1541,22 @@ type DistSQLTypeResolver struct {
}

// NewDistSQLTypeResolver creates a new DistSQLTypeResolver.
func NewDistSQLTypeResolver(descs *Collection, txn *kv.Txn) *DistSQLTypeResolver {
return &DistSQLTypeResolver{
func NewDistSQLTypeResolver(descs *Collection, txn *kv.Txn) DistSQLTypeResolver {
return DistSQLTypeResolver{
descriptors: descs,
txn: txn,
}
}

// ResolveType implements the tree.TypeReferenceResolver interface.
func (dt *DistSQLTypeResolver) ResolveType(
func (dt DistSQLTypeResolver) ResolveType(
context.Context, *tree.UnresolvedObjectName,
) (*types.T, error) {
return nil, errors.AssertionFailedf("cannot resolve types in DistSQL by name")
}

// ResolveTypeByOID implements the tree.TypeReferenceResolver interface.
func (dt *DistSQLTypeResolver) ResolveTypeByOID(
ctx context.Context, oid oid.Oid,
) (*types.T, error) {
func (dt DistSQLTypeResolver) ResolveTypeByOID(ctx context.Context, oid oid.Oid) (*types.T, error) {
name, desc, err := dt.GetTypeDescriptor(ctx, typedesc.UserDefinedTypeOIDToID(oid))
if err != nil {
return nil, err
Expand All @@ -1552,7 +1565,7 @@ func (dt *DistSQLTypeResolver) ResolveTypeByOID(
}

// GetTypeDescriptor implements the sqlbase.TypeDescriptorResolver interface.
func (dt *DistSQLTypeResolver) GetTypeDescriptor(
func (dt DistSQLTypeResolver) GetTypeDescriptor(
ctx context.Context, id descpb.ID,
) (tree.TypeName, catalog.TypeDescriptor, error) {
desc, err := dt.descriptors.getDescriptorVersionByID(
Expand All @@ -1570,7 +1583,7 @@ func (dt *DistSQLTypeResolver) GetTypeDescriptor(
}

// HydrateTypeSlice installs metadata into a slice of types.T's.
func (dt *DistSQLTypeResolver) HydrateTypeSlice(ctx context.Context, typs []*types.T) error {
func (dt DistSQLTypeResolver) HydrateTypeSlice(ctx context.Context, typs []*types.T) error {
for _, t := range typs {
if t.UserDefined() {
name, desc, err := dt.GetTypeDescriptor(ctx, typedesc.GetTypeDescID(t))
Expand Down
2 changes: 0 additions & 2 deletions pkg/sql/colfetcher/colbatch_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,11 @@ func NewColBatchScan(
columnIdxMap.Set(sysColDescs[i].ID, columnIdxMap.Len())
}

semaCtx := tree.MakeSemaContext()
// Before we can safely use types from the table descriptor, we need to
// make sure they are hydrated. In row execution engine it is done during
// the processor initialization, but neither ColBatchScan nor cFetcher are
// processors, so we need to do the hydration ourselves.
resolver := flowCtx.TypeResolverFactory.NewTypeResolver(evalCtx.Txn)
semaCtx.TypeResolver = resolver
if err := resolver.HydrateTypeSlice(ctx, typs); err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colflow/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ go_test(
"//pkg/server/serverpb",
"//pkg/settings/cluster",
"//pkg/sql/catalog/catalogkv",
"//pkg/sql/catalog/descs",
"//pkg/sql/colcontainer",
"//pkg/sql/colexec",
"//pkg/sql/colexec/colbuilder",
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ type vectorizedFlowCreator struct {
nodeDialer *nodedialer.Dialer
flowID execinfrapb.FlowID
exprHelper *colexec.ExprHelper
typeResolver *descs.DistSQLTypeResolver
typeResolver descs.DistSQLTypeResolver

// numOutboxes counts how many exec.Outboxes have been set up on this node.
// It must be accessed atomically.
Expand Down Expand Up @@ -538,7 +538,7 @@ func newVectorizedFlowCreator(
flowID execinfrapb.FlowID,
diskQueueCfg colcontainer.DiskQueueCfg,
fdSemaphore semaphore.Semaphore,
typeResolver *descs.DistSQLTypeResolver,
typeResolver descs.DistSQLTypeResolver,
) *vectorizedFlowCreator {
creator := vectorizedFlowCreatorPool.Get().(*vectorizedFlowCreator)
*creator = vectorizedFlowCreator{
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/colflow/vectorized_flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/colcontainer"
"github.com/cockroachdb/cockroach/pkg/sql/colexecbase"
"github.com/cockroachdb/cockroach/pkg/sql/colflow/colrpc"
Expand Down Expand Up @@ -228,7 +229,7 @@ func TestDrainOnlyInputDAG(t *testing.T) {
vfc := newVectorizedFlowCreator(
&vectorizedFlowCreatorHelper{f: f}, componentCreator, false, &wg, &execinfra.RowChannel{},
nil /* nodeDialer */, execinfrapb.FlowID{}, colcontainer.DiskQueueCfg{},
nil /* fdSemaphore */, nil, /* typeResolver */
nil /* fdSemaphore */, descs.DistSQLTypeResolver{},
)

_, err := vfc.setupFlow(ctx, &f.FlowCtx, procs, flowinfra.FuseNormally)
Expand Down
10 changes: 7 additions & 3 deletions pkg/sql/execinfra/processorsbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,9 @@ type ProcessorBase struct {
// EvalCtx is used for expression evaluation. It overrides the one in flowCtx.
EvalCtx *tree.EvalContext

// SemaCtx is used to avoid allocating a new SemaCtx during processor setup.
SemaCtx tree.SemaContext

// MemMonitor is the processor's memory monitor.
MemMonitor *mon.BytesMonitor

Expand Down Expand Up @@ -857,10 +860,11 @@ func (pb *ProcessorBase) InitWithEvalCtx(
if err := resolver.HydrateTypeSlice(evalCtx.Context, coreOutputTypes); err != nil {
return err
}
semaCtx := tree.MakeSemaContext()
semaCtx.TypeResolver = resolver

return pb.Out.Init(post, coreOutputTypes, &semaCtx, pb.EvalCtx, output)
pb.SemaCtx = tree.MakeSemaContext()
pb.SemaCtx.TypeResolver = resolver

return pb.Out.Init(post, coreOutputTypes, &pb.SemaCtx, pb.EvalCtx, output)
}

// AddInputToDrain adds an input to drain when moving the processor to a
Expand Down