diff --git a/pkg/ccl/crosscluster/BUILD.bazel b/pkg/ccl/crosscluster/BUILD.bazel index b669e5b4bada..1da9bee6c2c3 100644 --- a/pkg/ccl/crosscluster/BUILD.bazel +++ b/pkg/ccl/crosscluster/BUILD.bazel @@ -4,6 +4,7 @@ go_library( name = "crosscluster", srcs = [ "addresses.go", + "crosscluster_type_resolver.go", "errors.go", "event.go", "settings.go", @@ -15,5 +16,15 @@ go_library( "//pkg/repstream/streampb", "//pkg/roachpb", "//pkg/settings", + "//pkg/sql/catalog", + "//pkg/sql/catalog/descpb", + "//pkg/sql/catalog/typedesc", + "//pkg/sql/pgwire/pgcode", + "//pkg/sql/pgwire/pgerror", + "//pkg/sql/sem/tree", + "//pkg/sql/sqlerrors", + "//pkg/sql/types", + "@com_github_cockroachdb_errors//:errors", + "@com_github_lib_pq//oid", ], ) diff --git a/pkg/sql/importer/import_type_resolver.go b/pkg/ccl/crosscluster/crosscluster_type_resolver.go similarity index 80% rename from pkg/sql/importer/import_type_resolver.go rename to pkg/ccl/crosscluster/crosscluster_type_resolver.go index 061bf464c009..840f8618389c 100644 --- a/pkg/sql/importer/import_type_resolver.go +++ b/pkg/ccl/crosscluster/crosscluster_type_resolver.go @@ -1,9 +1,9 @@ -// Copyright 2017 The Cockroach Authors. +// Copyright 2024 The Cockroach Authors. // // Use of this software is governed by the CockroachDB Software License // included in the /LICENSE file. -package importer +package crosscluster import ( "context" @@ -20,16 +20,18 @@ import ( "github.com/lib/pq/oid" ) -type ImportTypeResolver struct { +// CrossClusterTypeResolver is meant to be used to resolve types using type +// descriptors that originate from a different cluster. +type CrossClusterTypeResolver struct { typeIDToDesc map[descpb.ID]*descpb.TypeDescriptor typeNameToDesc map[string][]*descpb.TypeDescriptor } -var _ tree.TypeReferenceResolver = ImportTypeResolver{} -var _ catalog.TypeDescriptorResolver = ImportTypeResolver{} +var _ tree.TypeReferenceResolver = CrossClusterTypeResolver{} +var _ catalog.TypeDescriptorResolver = CrossClusterTypeResolver{} -func MakeImportTypeResolver(typeDescs []*descpb.TypeDescriptor) ImportTypeResolver { - itr := ImportTypeResolver{ +func MakeCrossClusterTypeResolver(typeDescs []*descpb.TypeDescriptor) CrossClusterTypeResolver { + itr := CrossClusterTypeResolver{ typeIDToDesc: make(map[descpb.ID]*descpb.TypeDescriptor), typeNameToDesc: make(map[string][]*descpb.TypeDescriptor), } @@ -52,7 +54,7 @@ func MakeImportTypeResolver(typeDescs []*descpb.TypeDescriptor) ImportTypeResolv // Note that if a table happens to have multiple types with the same name (but // different schemas), this implementation will return a "feature unsupported" // error. -func (i ImportTypeResolver) ResolveType( +func (i CrossClusterTypeResolver) ResolveType( ctx context.Context, name *tree.UnresolvedObjectName, ) (*types.T, error) { var descs []*descpb.TypeDescriptor @@ -75,12 +77,14 @@ func (i ImportTypeResolver) ResolveType( } // ResolveTypeByOID implements the tree.TypeReferenceResolver interface. -func (i ImportTypeResolver) ResolveTypeByOID(ctx context.Context, oid oid.Oid) (*types.T, error) { +func (i CrossClusterTypeResolver) ResolveTypeByOID( + ctx context.Context, oid oid.Oid, +) (*types.T, error) { return typedesc.ResolveHydratedTByOID(ctx, oid, i) } // GetTypeDescriptor implements the catalog.TypeDescriptorResolver interface. -func (i ImportTypeResolver) GetTypeDescriptor( +func (i CrossClusterTypeResolver) GetTypeDescriptor( _ context.Context, id descpb.ID, ) (tree.TypeName, catalog.TypeDescriptor, error) { var desc *descpb.TypeDescriptor diff --git a/pkg/ccl/crosscluster/logical/BUILD.bazel b/pkg/ccl/crosscluster/logical/BUILD.bazel index a38ef38dd28a..e9b64205f66a 100644 --- a/pkg/ccl/crosscluster/logical/BUILD.bazel +++ b/pkg/ccl/crosscluster/logical/BUILD.bazel @@ -50,7 +50,6 @@ go_library( "//pkg/sql/execinfra", "//pkg/sql/execinfrapb", "//pkg/sql/exprutil", - "//pkg/sql/importer", "//pkg/sql/isql", "//pkg/sql/lexbase", "//pkg/sql/parser", @@ -74,7 +73,6 @@ go_library( "//pkg/util/admission/admissionpb", "//pkg/util/buildutil", "//pkg/util/ctxgroup", - "//pkg/util/errorutil/unimplemented", "//pkg/util/hlc", "//pkg/util/log", "//pkg/util/log/logcrash", diff --git a/pkg/ccl/crosscluster/logical/create_logical_replication_stmt.go b/pkg/ccl/crosscluster/logical/create_logical_replication_stmt.go index 4427ddcb034f..a98570a46b29 100644 --- a/pkg/ccl/crosscluster/logical/create_logical_replication_stmt.go +++ b/pkg/ccl/crosscluster/logical/create_logical_replication_stmt.go @@ -26,7 +26,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc" "github.com/cockroachdb/cockroach/pkg/sql/exprutil" - "github.com/cockroachdb/cockroach/pkg/sql/importer" "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" @@ -211,16 +210,9 @@ func createLogicalReplicationStreamPlanHook( sourceTypes := make([]*descpb.TypeDescriptor, len(spec.TypeDescriptors)) for i, desc := range spec.TypeDescriptors { - // Until https://github.com/cockroachdb/cockroach/issues/132164 is resolved, - // we cannot allow user-defined types on the SQL ingestion path. - if m, ok := options.GetMode(); ok && m != "immediate" { - return unimplemented.NewWithIssue(132164, "MODE = 'immediate' cannot be used with user-defined types") - } sourceTypes[i] = &desc } - // TODO(rafi): do we need a different type resolver? - // See https://github.com/cockroachdb/cockroach/issues/132164. - importResolver := importer.MakeImportTypeResolver(sourceTypes) + crossClusterResolver := crosscluster.MakeCrossClusterTypeResolver(sourceTypes) // If the user asked to ignore "ttl-deletes", make sure that at least one of // the source tables actually has a TTL job which sets the omit bit that @@ -230,7 +222,7 @@ func createLogicalReplicationStreamPlanHook( for i, name := range srcTableNames { td := spec.TableDescriptors[name] cpy := tabledesc.NewBuilder(&td).BuildCreatedMutableTable() - if err := typedesc.HydrateTypesInDescriptor(ctx, cpy, importResolver); err != nil { + if err := typedesc.HydrateTypesInDescriptor(ctx, cpy, crossClusterResolver); err != nil { return err } srcTableDescs[i] = cpy.TableDesc() diff --git a/pkg/ccl/crosscluster/logical/logical_replication_job.go b/pkg/ccl/crosscluster/logical/logical_replication_job.go index 1a007cae705e..08ccfede3ef8 100644 --- a/pkg/ccl/crosscluster/logical/logical_replication_job.go +++ b/pkg/ccl/crosscluster/logical/logical_replication_job.go @@ -29,7 +29,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" - "github.com/cockroachdb/cockroach/pkg/sql/importer" "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/cockroach/pkg/sql/physicalplan" "github.com/cockroachdb/cockroach/pkg/sql/sem/catid" @@ -359,15 +358,13 @@ func (p *logicalReplicationPlanner) generatePlanImpl( defaultFnOID = catid.FuncIDToOID(catid.DescID(defaultFnID)) } - // TODO(msbutler): is this import type resolver kosher? Should put in a new package. - // See https://github.com/cockroachdb/cockroach/issues/132164. - importResolver := importer.MakeImportTypeResolver(plan.SourceTypes) + crossClusterResolver := crosscluster.MakeCrossClusterTypeResolver(plan.SourceTypes) tableMetadataByDestID := make(map[int32]execinfrapb.TableReplicationMetadata) if err := sql.DescsTxn(ctx, execCfg, func(ctx context.Context, txn isql.Txn, descriptors *descs.Collection) error { for _, pair := range payload.ReplicationPairs { srcTableDesc := plan.DescriptorMap[pair.SrcDescriptorID] cpy := tabledesc.NewBuilder(&srcTableDesc).BuildCreatedMutableTable() - if err := typedesc.HydrateTypesInDescriptor(ctx, cpy, importResolver); err != nil { + if err := typedesc.HydrateTypesInDescriptor(ctx, cpy, crossClusterResolver); err != nil { return err } srcTableDesc = *cpy.TableDesc() diff --git a/pkg/ccl/crosscluster/logical/lww_row_processor.go b/pkg/ccl/crosscluster/logical/lww_row_processor.go index 16d33010f564..63122a4b469a 100644 --- a/pkg/ccl/crosscluster/logical/lww_row_processor.go +++ b/pkg/ccl/crosscluster/logical/lww_row_processor.go @@ -92,6 +92,9 @@ func (q *queryBuilder) AddRow(row cdcevent.Row) error { } if err := it.Datum(func(d tree.Datum, col cdcevent.ResultColumn) error { if dEnum, ok := d.(*tree.DEnum); ok { + // Override the type to Unknown to avoid a mismatched type OID error + // during execution. Note that Unknown is the type used by default + // when a SQL statement is executed without type hints. dEnum.EnumTyp = types.Unknown } q.scratchDatums = append(q.scratchDatums, d) @@ -121,6 +124,9 @@ func (q *queryBuilder) AddRowDefaultNull(row *cdcevent.Row) error { } if err := it.Datum(func(d tree.Datum, col cdcevent.ResultColumn) error { if dEnum, ok := d.(*tree.DEnum); ok { + // Override the type to Unknown to avoid a mismatched type OID error + // during execution. Note that Unknown is the type used by default + // when a SQL statement is executed without type hints. dEnum.EnumTyp = types.Unknown } q.scratchDatums = append(q.scratchDatums, d) diff --git a/pkg/sql/importer/BUILD.bazel b/pkg/sql/importer/BUILD.bazel index 0653253b0609..f3c798eaeb59 100644 --- a/pkg/sql/importer/BUILD.bazel +++ b/pkg/sql/importer/BUILD.bazel @@ -17,7 +17,6 @@ go_library( "import_processor.go", "import_processor_planning.go", "import_table_creation.go", - "import_type_resolver.go", "read_import_avro.go", "read_import_base.go", "read_import_csv.go", @@ -32,6 +31,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/base", + "//pkg/ccl/crosscluster", "//pkg/cloud", "//pkg/cloud/cloudprivilege", "//pkg/clusterversion", @@ -93,7 +93,6 @@ go_library( "//pkg/sql/sem/tree", "//pkg/sql/sessiondata", "//pkg/sql/sqlclustersettings", - "//pkg/sql/sqlerrors", "//pkg/sql/sqltelemetry", "//pkg/sql/stats", "//pkg/sql/types", diff --git a/pkg/sql/importer/read_import_base.go b/pkg/sql/importer/read_import_base.go index 2065079cb29f..963323a86d91 100644 --- a/pkg/sql/importer/read_import_base.go +++ b/pkg/sql/importer/read_import_base.go @@ -18,6 +18,7 @@ import ( "sync/atomic" "time" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster" "github.com/cockroachdb/cockroach/pkg/cloud" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" @@ -55,7 +56,7 @@ func runImport( // Install type metadata in all of the import tables. spec = protoutil.Clone(spec).(*execinfrapb.ReadImportDataSpec) - importResolver := MakeImportTypeResolver(spec.Types) + importResolver := crosscluster.MakeCrossClusterTypeResolver(spec.Types) for _, table := range spec.Tables { cpy := tabledesc.NewBuilder(table.Desc).BuildCreatedMutableTable() if err := typedesc.HydrateTypesInDescriptor(ctx, cpy, importResolver); err != nil {