Skip to content

Commit

Permalink
importer,crosscluster: move type resolver to a different package
Browse files Browse the repository at this point in the history
Since we're using this type resolver for more than just IMPORT, it makes
more sense to put it in a more fitting package.

Release note: None
  • Loading branch information
rafiss committed Oct 23, 2024
1 parent 515d198 commit 9b8ac45
Show file tree
Hide file tree
Showing 8 changed files with 38 additions and 30 deletions.
11 changes: 11 additions & 0 deletions pkg/ccl/crosscluster/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go_library(
name = "crosscluster",
srcs = [
"addresses.go",
"crosscluster_type_resolver.go",
"errors.go",
"event.go",
"settings.go",
Expand All @@ -15,5 +16,15 @@ go_library(
"//pkg/repstream/streampb",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/sql/catalog",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/typedesc",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/sem/tree",
"//pkg/sql/sqlerrors",
"//pkg/sql/types",
"@com_github_cockroachdb_errors//:errors",
"@com_github_lib_pq//oid",
],
)
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
// Copyright 2017 The Cockroach Authors.
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the CockroachDB Software License
// included in the /LICENSE file.

package importer
package crosscluster

import (
"context"
Expand All @@ -20,16 +20,18 @@ import (
"github.com/lib/pq/oid"
)

type ImportTypeResolver struct {
// CrossClusterTypeResolver is meant to be used to resolve types using type
// descriptors that originate from a different cluster.
type CrossClusterTypeResolver struct {
typeIDToDesc map[descpb.ID]*descpb.TypeDescriptor
typeNameToDesc map[string][]*descpb.TypeDescriptor
}

var _ tree.TypeReferenceResolver = ImportTypeResolver{}
var _ catalog.TypeDescriptorResolver = ImportTypeResolver{}
var _ tree.TypeReferenceResolver = CrossClusterTypeResolver{}
var _ catalog.TypeDescriptorResolver = CrossClusterTypeResolver{}

func MakeImportTypeResolver(typeDescs []*descpb.TypeDescriptor) ImportTypeResolver {
itr := ImportTypeResolver{
func MakeCrossClusterTypeResolver(typeDescs []*descpb.TypeDescriptor) CrossClusterTypeResolver {
itr := CrossClusterTypeResolver{
typeIDToDesc: make(map[descpb.ID]*descpb.TypeDescriptor),
typeNameToDesc: make(map[string][]*descpb.TypeDescriptor),
}
Expand All @@ -52,7 +54,7 @@ func MakeImportTypeResolver(typeDescs []*descpb.TypeDescriptor) ImportTypeResolv
// Note that if a table happens to have multiple types with the same name (but
// different schemas), this implementation will return a "feature unsupported"
// error.
func (i ImportTypeResolver) ResolveType(
func (i CrossClusterTypeResolver) ResolveType(
ctx context.Context, name *tree.UnresolvedObjectName,
) (*types.T, error) {
var descs []*descpb.TypeDescriptor
Expand All @@ -75,12 +77,14 @@ func (i ImportTypeResolver) ResolveType(
}

// ResolveTypeByOID implements the tree.TypeReferenceResolver interface.
func (i ImportTypeResolver) ResolveTypeByOID(ctx context.Context, oid oid.Oid) (*types.T, error) {
func (i CrossClusterTypeResolver) ResolveTypeByOID(
ctx context.Context, oid oid.Oid,
) (*types.T, error) {
return typedesc.ResolveHydratedTByOID(ctx, oid, i)
}

// GetTypeDescriptor implements the catalog.TypeDescriptorResolver interface.
func (i ImportTypeResolver) GetTypeDescriptor(
func (i CrossClusterTypeResolver) GetTypeDescriptor(
_ context.Context, id descpb.ID,
) (tree.TypeName, catalog.TypeDescriptor, error) {
var desc *descpb.TypeDescriptor
Expand Down
2 changes: 0 additions & 2 deletions pkg/ccl/crosscluster/logical/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ go_library(
"//pkg/sql/execinfra",
"//pkg/sql/execinfrapb",
"//pkg/sql/exprutil",
"//pkg/sql/importer",
"//pkg/sql/isql",
"//pkg/sql/lexbase",
"//pkg/sql/parser",
Expand All @@ -74,7 +73,6 @@ go_library(
"//pkg/util/admission/admissionpb",
"//pkg/util/buildutil",
"//pkg/util/ctxgroup",
"//pkg/util/errorutil/unimplemented",
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/log/logcrash",
Expand Down
12 changes: 2 additions & 10 deletions pkg/ccl/crosscluster/logical/create_logical_replication_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc"
"github.com/cockroachdb/cockroach/pkg/sql/exprutil"
"github.com/cockroachdb/cockroach/pkg/sql/importer"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
Expand Down Expand Up @@ -211,16 +210,9 @@ func createLogicalReplicationStreamPlanHook(

sourceTypes := make([]*descpb.TypeDescriptor, len(spec.TypeDescriptors))
for i, desc := range spec.TypeDescriptors {
// Until https://github.com/cockroachdb/cockroach/issues/132164 is resolved,
// we cannot allow user-defined types on the SQL ingestion path.
if m, ok := options.GetMode(); ok && m != "immediate" {
return unimplemented.NewWithIssue(132164, "MODE = 'immediate' cannot be used with user-defined types")
}
sourceTypes[i] = &desc
}
// TODO(rafi): do we need a different type resolver?
// See https://github.com/cockroachdb/cockroach/issues/132164.
importResolver := importer.MakeImportTypeResolver(sourceTypes)
crossClusterResolver := crosscluster.MakeCrossClusterTypeResolver(sourceTypes)

// If the user asked to ignore "ttl-deletes", make sure that at least one of
// the source tables actually has a TTL job which sets the omit bit that
Expand All @@ -230,7 +222,7 @@ func createLogicalReplicationStreamPlanHook(
for i, name := range srcTableNames {
td := spec.TableDescriptors[name]
cpy := tabledesc.NewBuilder(&td).BuildCreatedMutableTable()
if err := typedesc.HydrateTypesInDescriptor(ctx, cpy, importResolver); err != nil {
if err := typedesc.HydrateTypesInDescriptor(ctx, cpy, crossClusterResolver); err != nil {
return err
}
srcTableDescs[i] = cpy.TableDesc()
Expand Down
7 changes: 2 additions & 5 deletions pkg/ccl/crosscluster/logical/logical_replication_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/importer"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
"github.com/cockroachdb/cockroach/pkg/sql/sem/catid"
Expand Down Expand Up @@ -359,15 +358,13 @@ func (p *logicalReplicationPlanner) generatePlanImpl(
defaultFnOID = catid.FuncIDToOID(catid.DescID(defaultFnID))
}

// TODO(msbutler): is this import type resolver kosher? Should put in a new package.
// See https://github.com/cockroachdb/cockroach/issues/132164.
importResolver := importer.MakeImportTypeResolver(plan.SourceTypes)
crossClusterResolver := crosscluster.MakeCrossClusterTypeResolver(plan.SourceTypes)
tableMetadataByDestID := make(map[int32]execinfrapb.TableReplicationMetadata)
if err := sql.DescsTxn(ctx, execCfg, func(ctx context.Context, txn isql.Txn, descriptors *descs.Collection) error {
for _, pair := range payload.ReplicationPairs {
srcTableDesc := plan.DescriptorMap[pair.SrcDescriptorID]
cpy := tabledesc.NewBuilder(&srcTableDesc).BuildCreatedMutableTable()
if err := typedesc.HydrateTypesInDescriptor(ctx, cpy, importResolver); err != nil {
if err := typedesc.HydrateTypesInDescriptor(ctx, cpy, crossClusterResolver); err != nil {
return err
}
srcTableDesc = *cpy.TableDesc()
Expand Down
6 changes: 6 additions & 0 deletions pkg/ccl/crosscluster/logical/lww_row_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ func (q *queryBuilder) AddRow(row cdcevent.Row) error {
}
if err := it.Datum(func(d tree.Datum, col cdcevent.ResultColumn) error {
if dEnum, ok := d.(*tree.DEnum); ok {
// Override the type to Unknown to avoid a mismatched type OID error
// during execution. Note that Unknown is the type used by default
// when a SQL statement is executed without type hints.
dEnum.EnumTyp = types.Unknown
}
q.scratchDatums = append(q.scratchDatums, d)
Expand Down Expand Up @@ -121,6 +124,9 @@ func (q *queryBuilder) AddRowDefaultNull(row *cdcevent.Row) error {
}
if err := it.Datum(func(d tree.Datum, col cdcevent.ResultColumn) error {
if dEnum, ok := d.(*tree.DEnum); ok {
// Override the type to Unknown to avoid a mismatched type OID error
// during execution. Note that Unknown is the type used by default
// when a SQL statement is executed without type hints.
dEnum.EnumTyp = types.Unknown
}
q.scratchDatums = append(q.scratchDatums, d)
Expand Down
3 changes: 1 addition & 2 deletions pkg/sql/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ go_library(
"import_processor.go",
"import_processor_planning.go",
"import_table_creation.go",
"import_type_resolver.go",
"read_import_avro.go",
"read_import_base.go",
"read_import_csv.go",
Expand All @@ -32,6 +31,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/ccl/crosscluster",
"//pkg/cloud",
"//pkg/cloud/cloudprivilege",
"//pkg/clusterversion",
Expand Down Expand Up @@ -93,7 +93,6 @@ go_library(
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlclustersettings",
"//pkg/sql/sqlerrors",
"//pkg/sql/sqltelemetry",
"//pkg/sql/stats",
"//pkg/sql/types",
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/importer/read_import_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/crosscluster"
"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
Expand Down Expand Up @@ -55,7 +56,7 @@ func runImport(

// Install type metadata in all of the import tables.
spec = protoutil.Clone(spec).(*execinfrapb.ReadImportDataSpec)
importResolver := MakeImportTypeResolver(spec.Types)
importResolver := crosscluster.MakeCrossClusterTypeResolver(spec.Types)
for _, table := range spec.Tables {
cpy := tabledesc.NewBuilder(table.Desc).BuildCreatedMutableTable()
if err := typedesc.HydrateTypesInDescriptor(ctx, cpy, importResolver); err != nil {
Expand Down

0 comments on commit 9b8ac45

Please sign in to comment.