From aa5a86bda3ff249e5150a1d303dccda0c50048f4 Mon Sep 17 00:00:00 2001 From: Jordan Lewis Date: Fri, 27 Nov 2020 16:53:58 -0400 Subject: [PATCH 1/5] colfetcher: remove unused semactx object Release note: None --- pkg/sql/colfetcher/colbatch_scan.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/sql/colfetcher/colbatch_scan.go b/pkg/sql/colfetcher/colbatch_scan.go index 7240dd537e66..d938095c9cbf 100644 --- a/pkg/sql/colfetcher/colbatch_scan.go +++ b/pkg/sql/colfetcher/colbatch_scan.go @@ -174,13 +174,11 @@ func NewColBatchScan( columnIdxMap.Set(sysColDescs[i].ID, columnIdxMap.Len()) } - semaCtx := tree.MakeSemaContext() // Before we can safely use types from the table descriptor, we need to // make sure they are hydrated. In row execution engine it is done during // the processor initialization, but neither ColBatchScan nor cFetcher are // processors, so we need to do the hydration ourselves. resolver := flowCtx.TypeResolverFactory.NewTypeResolver(evalCtx.Txn) - semaCtx.TypeResolver = resolver if err := resolver.HydrateTypeSlice(ctx, typs); err != nil { return nil, err } From 7d6cd82a3ba3c9829b1f4fcb99abe7937ce89723 Mon Sep 17 00:00:00 2001 From: Jordan Lewis Date: Fri, 27 Nov 2020 17:47:00 -0400 Subject: [PATCH 2/5] colflow: store type resolver as a non-pointer type This commit changes the VectorizedFlow to store its type resolver as a struct directly, rather than a pointer. This prevents a pointless allocation per flow. Release note: None --- pkg/sql/catalog/descs/collection.go | 18 ++++++++---------- pkg/sql/colflow/BUILD.bazel | 1 + pkg/sql/colflow/vectorized_flow.go | 4 ++-- pkg/sql/colflow/vectorized_flow_test.go | 3 ++- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/pkg/sql/catalog/descs/collection.go b/pkg/sql/catalog/descs/collection.go index ecabfc0c49b9..f9bc4abff53e 100644 --- a/pkg/sql/catalog/descs/collection.go +++ b/pkg/sql/catalog/descs/collection.go @@ -1503,9 +1503,9 @@ type DistSQLTypeResolverFactory struct { // NewTypeResolver creates a new TypeResolver that is bound under the input // transaction. It returns a nil resolver if the factory itself is nil. -func (df *DistSQLTypeResolverFactory) NewTypeResolver(txn *kv.Txn) *DistSQLTypeResolver { +func (df *DistSQLTypeResolverFactory) NewTypeResolver(txn *kv.Txn) DistSQLTypeResolver { if df == nil { - return nil + return DistSQLTypeResolver{} } return NewDistSQLTypeResolver(df.Descriptors, txn) } @@ -1526,24 +1526,22 @@ type DistSQLTypeResolver struct { } // NewDistSQLTypeResolver creates a new DistSQLTypeResolver. -func NewDistSQLTypeResolver(descs *Collection, txn *kv.Txn) *DistSQLTypeResolver { - return &DistSQLTypeResolver{ +func NewDistSQLTypeResolver(descs *Collection, txn *kv.Txn) DistSQLTypeResolver { + return DistSQLTypeResolver{ descriptors: descs, txn: txn, } } // ResolveType implements the tree.TypeReferenceResolver interface. -func (dt *DistSQLTypeResolver) ResolveType( +func (dt DistSQLTypeResolver) ResolveType( context.Context, *tree.UnresolvedObjectName, ) (*types.T, error) { return nil, errors.AssertionFailedf("cannot resolve types in DistSQL by name") } // ResolveTypeByOID implements the tree.TypeReferenceResolver interface. -func (dt *DistSQLTypeResolver) ResolveTypeByOID( - ctx context.Context, oid oid.Oid, -) (*types.T, error) { +func (dt DistSQLTypeResolver) ResolveTypeByOID(ctx context.Context, oid oid.Oid) (*types.T, error) { name, desc, err := dt.GetTypeDescriptor(ctx, typedesc.UserDefinedTypeOIDToID(oid)) if err != nil { return nil, err @@ -1552,7 +1550,7 @@ func (dt *DistSQLTypeResolver) ResolveTypeByOID( } // GetTypeDescriptor implements the sqlbase.TypeDescriptorResolver interface. -func (dt *DistSQLTypeResolver) GetTypeDescriptor( +func (dt DistSQLTypeResolver) GetTypeDescriptor( ctx context.Context, id descpb.ID, ) (tree.TypeName, catalog.TypeDescriptor, error) { desc, err := dt.descriptors.getDescriptorVersionByID( @@ -1570,7 +1568,7 @@ func (dt *DistSQLTypeResolver) GetTypeDescriptor( } // HydrateTypeSlice installs metadata into a slice of types.T's. -func (dt *DistSQLTypeResolver) HydrateTypeSlice(ctx context.Context, typs []*types.T) error { +func (dt DistSQLTypeResolver) HydrateTypeSlice(ctx context.Context, typs []*types.T) error { for _, t := range typs { if t.UserDefined() { name, desc, err := dt.GetTypeDescriptor(ctx, typedesc.GetTypeDescID(t)) diff --git a/pkg/sql/colflow/BUILD.bazel b/pkg/sql/colflow/BUILD.bazel index 2e82a2962b7a..625159acb8ac 100644 --- a/pkg/sql/colflow/BUILD.bazel +++ b/pkg/sql/colflow/BUILD.bazel @@ -65,6 +65,7 @@ go_test( "//pkg/server/serverpb", "//pkg/settings/cluster", "//pkg/sql/catalog/catalogkv", + "//pkg/sql/catalog/descs", "//pkg/sql/colcontainer", "//pkg/sql/colexec", "//pkg/sql/colexec/colbuilder", diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index 7e70e3d530be..6c10abdde8ca 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -480,7 +480,7 @@ type vectorizedFlowCreator struct { nodeDialer *nodedialer.Dialer flowID execinfrapb.FlowID exprHelper *colexec.ExprHelper - typeResolver *descs.DistSQLTypeResolver + typeResolver descs.DistSQLTypeResolver // numOutboxes counts how many exec.Outboxes have been set up on this node. // It must be accessed atomically. @@ -538,7 +538,7 @@ func newVectorizedFlowCreator( flowID execinfrapb.FlowID, diskQueueCfg colcontainer.DiskQueueCfg, fdSemaphore semaphore.Semaphore, - typeResolver *descs.DistSQLTypeResolver, + typeResolver descs.DistSQLTypeResolver, ) *vectorizedFlowCreator { creator := vectorizedFlowCreatorPool.Get().(*vectorizedFlowCreator) *creator = vectorizedFlowCreator{ diff --git a/pkg/sql/colflow/vectorized_flow_test.go b/pkg/sql/colflow/vectorized_flow_test.go index f46ddf80cb9a..a90be4f9488c 100644 --- a/pkg/sql/colflow/vectorized_flow_test.go +++ b/pkg/sql/colflow/vectorized_flow_test.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/colcontainer" "github.com/cockroachdb/cockroach/pkg/sql/colexecbase" "github.com/cockroachdb/cockroach/pkg/sql/colflow/colrpc" @@ -228,7 +229,7 @@ func TestDrainOnlyInputDAG(t *testing.T) { vfc := newVectorizedFlowCreator( &vectorizedFlowCreatorHelper{f: f}, componentCreator, false, &wg, &execinfra.RowChannel{}, nil /* nodeDialer */, execinfrapb.FlowID{}, colcontainer.DiskQueueCfg{}, - nil /* fdSemaphore */, nil, /* typeResolver */ + nil /* fdSemaphore */, descs.DistSQLTypeResolver{}, ) _, err := vfc.setupFlow(ctx, &f.FlowCtx, procs, flowinfra.FuseNormally) From beb56d4f34f4ad3211fe2e50decaa164d173b30b Mon Sep 17 00:00:00 2001 From: Jordan Lewis Date: Fri, 27 Nov 2020 18:04:41 -0400 Subject: [PATCH 3/5] execinfra: keep SemaCtx memory in ProcessorBase Previously, when initializing a processor, we'd need to allocate a fresh SemaCtx for every processor. Instead of doing this, we add a SemaCtx value inside of ProcessorBase to share the ProcessorBase-allocated memory. Release note: None --- pkg/sql/execinfra/processorsbase.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/pkg/sql/execinfra/processorsbase.go b/pkg/sql/execinfra/processorsbase.go index 28d349c4801e..9e174dc601a2 100644 --- a/pkg/sql/execinfra/processorsbase.go +++ b/pkg/sql/execinfra/processorsbase.go @@ -473,6 +473,9 @@ type ProcessorBase struct { // EvalCtx is used for expression evaluation. It overrides the one in flowCtx. EvalCtx *tree.EvalContext + // SemaCtx is used to avoid allocating a new SemaCtx during processor setup. + SemaCtx tree.SemaContext + // MemMonitor is the processor's memory monitor. MemMonitor *mon.BytesMonitor @@ -857,10 +860,11 @@ func (pb *ProcessorBase) InitWithEvalCtx( if err := resolver.HydrateTypeSlice(evalCtx.Context, coreOutputTypes); err != nil { return err } - semaCtx := tree.MakeSemaContext() - semaCtx.TypeResolver = resolver - return pb.Out.Init(post, coreOutputTypes, &semaCtx, pb.EvalCtx, output) + pb.SemaCtx = tree.MakeSemaContext() + pb.SemaCtx.TypeResolver = resolver + + return pb.Out.Init(post, coreOutputTypes, &pb.SemaCtx, pb.EvalCtx, output) } // AddInputToDrain adds an input to drain when moving the processor to a From 46bc27433eb1c98e74984d480d3095bf1e4f099a Mon Sep 17 00:00:00 2001 From: Jordan Lewis Date: Fri, 27 Nov 2020 20:20:51 -0400 Subject: [PATCH 4/5] descs: remove pointless log allocs on the hot path Release note: None --- pkg/sql/catalog/descs/collection.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pkg/sql/catalog/descs/collection.go b/pkg/sql/catalog/descs/collection.go index f9bc4abff53e..9392e19b1005 100644 --- a/pkg/sql/catalog/descs/collection.go +++ b/pkg/sql/catalog/descs/collection.go @@ -212,7 +212,9 @@ func (tc *Collection) getLeasedDescriptorByName( // continue to use N to refer to X even if N is renamed during the // transaction. if desc = tc.leasedDescriptors.getByName(parentID, parentSchemaID, name); desc != nil { - log.VEventf(ctx, 2, "found descriptor in collection for '%s'", name) + if log.V(2) { + log.Eventf(ctx, "found descriptor in collection for '%s'", name) + } return desc, false, nil } @@ -236,7 +238,9 @@ func (tc *Collection) getLeasedDescriptorByName( } tc.leasedDescriptors.add(desc) - log.VEventf(ctx, 2, "added descriptor '%s' to collection: %+v", name, desc) + if log.V(2) { + log.Eventf(ctx, "added descriptor '%s' to collection: %+v", name, desc) + } // If the descriptor we just acquired expires before the txn's deadline, // reduce the deadline. We use ReadTimestamp() that doesn't return the commit From 0bd8a5d65a67962e882855c9b77f16a79d7e268b Mon Sep 17 00:00:00 2001 From: Jordan Lewis Date: Fri, 27 Nov 2020 21:20:43 -0400 Subject: [PATCH 5/5] descs: avoid heap allocation for TableName Previously, resolving a table would always need to heap allocate the TableName that is used to look up the names. This was unnecessary. Release note: None --- pkg/sql/catalog/descs/collection.go | 55 +++++++++++++++++------------ 1 file changed, 33 insertions(+), 22 deletions(-) diff --git a/pkg/sql/catalog/descs/collection.go b/pkg/sql/catalog/descs/collection.go index 9392e19b1005..cf3479123d7a 100644 --- a/pkg/sql/catalog/descs/collection.go +++ b/pkg/sql/catalog/descs/collection.go @@ -294,7 +294,7 @@ func (tc *Collection) GetMutableDatabaseDescriptor( func (tc *Collection) GetMutableTableDescriptor( ctx context.Context, txn *kv.Txn, tn *tree.TableName, flags tree.ObjectLookupFlags, ) (*tabledesc.Mutable, error) { - desc, err := tc.getMutableObjectDescriptor(ctx, txn, tn, flags) + desc, err := tc.getMutableObjectDescriptor(ctx, txn, tn.Catalog(), tn.Schema(), tn.Object(), flags) if err != nil { return nil, err } @@ -310,14 +310,17 @@ func (tc *Collection) GetMutableTableDescriptor( } func (tc *Collection) getMutableObjectDescriptor( - ctx context.Context, txn *kv.Txn, name tree.ObjectName, flags tree.ObjectLookupFlags, + ctx context.Context, + txn *kv.Txn, + catalogName, schemaName, objectName string, + flags tree.ObjectLookupFlags, ) (catalog.MutableDescriptor, error) { if log.V(2) { - log.Infof(ctx, "reading mutable descriptor on '%s'", name) + log.Infof(ctx, "reading mutable descriptor on '%s.%s.%s'", catalogName, schemaName, objectName) } // Resolve the database. - db, err := tc.GetDatabaseVersion(ctx, txn, name.Catalog(), + db, err := tc.GetDatabaseVersion(ctx, txn, catalogName, tree.DatabaseLookupFlags{ Required: flags.Required, AvoidCached: flags.AvoidCached, @@ -330,7 +333,7 @@ func (tc *Collection) getMutableObjectDescriptor( dbID := db.GetID() // Resolve the schema to the ID of the schema. - foundSchema, resolvedSchema, err := tc.ResolveSchema(ctx, txn, dbID, name.Schema(), + foundSchema, resolvedSchema, err := tc.ResolveSchema(ctx, txn, dbID, schemaName, tree.SchemaLookupFlags{ Required: flags.Required, AvoidCached: flags.AvoidCached, @@ -344,7 +347,7 @@ func (tc *Collection) getMutableObjectDescriptor( if refuseFurtherLookup, desc, err := tc.getUncommittedDescriptor( dbID, resolvedSchema.ID, - name.Object(), + objectName, flags.CommonLookupFlags, ); refuseFurtherLookup || err != nil { return nil, err @@ -358,9 +361,9 @@ func (tc *Collection) getMutableObjectDescriptor( txn, tc.settings, tc.codec(), - name.Catalog(), - name.Schema(), - name.Object(), + catalogName, + schemaName, + objectName, flags, ) if err != nil || obj == nil { @@ -625,14 +628,19 @@ func (tc *Collection) GetDatabaseVersion( func (tc *Collection) GetTableVersion( ctx context.Context, txn *kv.Txn, tn *tree.TableName, flags tree.ObjectLookupFlags, ) (*tabledesc.Immutable, error) { - desc, err := tc.getObjectVersion(ctx, txn, tn, flags) + desc, err := tc.getObjectVersion(ctx, txn, tn.Catalog(), tn.Schema(), tn.Object(), flags) if err != nil { return nil, err } table, ok := desc.(*tabledesc.Immutable) if !ok { if flags.Required { - return nil, sqlerrors.NewUndefinedRelationError(tn) + // Copy the input TableName to avoid allocations: + // NewUndefinedRelationError requires that we promote TableName to a + // NodeFormatter, which causes the input TableName to get heap allocated + // even in cases where it wouldn't otherwise. + errorTn := *tn + return nil, sqlerrors.NewUndefinedRelationError(&errorTn) } return nil, nil } @@ -644,7 +652,10 @@ func (tc *Collection) GetTableVersion( } func (tc *Collection) getObjectVersion( - ctx context.Context, txn *kv.Txn, name tree.ObjectName, flags tree.ObjectLookupFlags, + ctx context.Context, + txn *kv.Txn, + catalogName, schemaName, objectName string, + flags tree.ObjectLookupFlags, ) (catalog.Descriptor, error) { readObjectFromStore := func() (catalog.Descriptor, error) { return getObjectDesc( @@ -652,15 +663,15 @@ func (tc *Collection) getObjectVersion( txn, tc.settings, tc.codec(), - name.Catalog(), - name.Schema(), - name.Object(), + catalogName, + schemaName, + objectName, flags, ) } // Resolve the database. - db, err := tc.GetDatabaseVersion(ctx, txn, name.Catalog(), + db, err := tc.GetDatabaseVersion(ctx, txn, catalogName, tree.DatabaseLookupFlags{ Required: flags.Required, AvoidCached: flags.AvoidCached, @@ -673,7 +684,7 @@ func (tc *Collection) getObjectVersion( dbID := db.GetID() // Resolve the schema to the ID of the schema. - foundSchema, resolvedSchema, err := tc.ResolveSchema(ctx, txn, dbID, name.Schema(), + foundSchema, resolvedSchema, err := tc.ResolveSchema(ctx, txn, dbID, schemaName, tree.SchemaLookupFlags{ Required: flags.Required, AvoidCached: flags.AvoidCached, @@ -693,12 +704,12 @@ func (tc *Collection) getObjectVersion( // system.users. For now we're sticking to disabling caching of // all system descriptors except the role-members-desc. avoidCache := flags.AvoidCached || lease.TestingTableLeasesAreDisabled() || - (name.Catalog() == systemschema.SystemDatabaseName && name.Object() != systemschema.RoleMembersTable.Name) + (catalogName == systemschema.SystemDatabaseName && objectName != systemschema.RoleMembersTable.Name) if refuseFurtherLookup, desc, err := tc.getUncommittedDescriptor( dbID, schemaID, - name.Object(), + objectName, flags.CommonLookupFlags, ); refuseFurtherLookup || err != nil { return nil, err @@ -719,7 +730,7 @@ func (tc *Collection) getObjectVersion( return readObjectFromStore() } - desc, shouldReadFromStore, err := tc.getLeasedDescriptorByName(ctx, txn, dbID, schemaID, name.Object()) + desc, shouldReadFromStore, err := tc.getLeasedDescriptorByName(ctx, txn, dbID, schemaID, objectName) if err != nil { return nil, err } @@ -1175,7 +1186,7 @@ func (tc *Collection) GetUncommittedTables() (tables []*tabledesc.Immutable) { func (tc *Collection) GetMutableTypeDescriptor( ctx context.Context, txn *kv.Txn, tn *tree.TypeName, flags tree.ObjectLookupFlags, ) (*typedesc.Mutable, error) { - desc, err := tc.getMutableObjectDescriptor(ctx, txn, tn, flags) + desc, err := tc.getMutableObjectDescriptor(ctx, txn, tn.Catalog(), tn.Schema(), tn.Object(), flags) if err != nil { return nil, err } @@ -1205,7 +1216,7 @@ func (tc *Collection) GetMutableTypeVersionByID( func (tc *Collection) GetTypeVersion( ctx context.Context, txn *kv.Txn, tn *tree.TypeName, flags tree.ObjectLookupFlags, ) (*typedesc.Immutable, error) { - desc, err := tc.getObjectVersion(ctx, txn, tn, flags) + desc, err := tc.getObjectVersion(ctx, txn, tn.Catalog(), tn.Schema(), tn.Object(), flags) if err != nil { return nil, err }