diff --git a/pkg/ccl/crosscluster/BUILD.bazel b/pkg/ccl/crosscluster/BUILD.bazel index b669e5b4bada..1da9bee6c2c3 100644 --- a/pkg/ccl/crosscluster/BUILD.bazel +++ b/pkg/ccl/crosscluster/BUILD.bazel @@ -4,6 +4,7 @@ go_library( name = "crosscluster", srcs = [ "addresses.go", + "crosscluster_type_resolver.go", "errors.go", "event.go", "settings.go", @@ -15,5 +16,15 @@ go_library( "//pkg/repstream/streampb", "//pkg/roachpb", "//pkg/settings", + "//pkg/sql/catalog", + "//pkg/sql/catalog/descpb", + "//pkg/sql/catalog/typedesc", + "//pkg/sql/pgwire/pgcode", + "//pkg/sql/pgwire/pgerror", + "//pkg/sql/sem/tree", + "//pkg/sql/sqlerrors", + "//pkg/sql/types", + "@com_github_cockroachdb_errors//:errors", + "@com_github_lib_pq//oid", ], ) diff --git a/pkg/sql/importer/import_type_resolver.go b/pkg/ccl/crosscluster/crosscluster_type_resolver.go similarity index 80% rename from pkg/sql/importer/import_type_resolver.go rename to pkg/ccl/crosscluster/crosscluster_type_resolver.go index 061bf464c009..840f8618389c 100644 --- a/pkg/sql/importer/import_type_resolver.go +++ b/pkg/ccl/crosscluster/crosscluster_type_resolver.go @@ -1,9 +1,9 @@ -// Copyright 2017 The Cockroach Authors. +// Copyright 2024 The Cockroach Authors. // // Use of this software is governed by the CockroachDB Software License // included in the /LICENSE file. -package importer +package crosscluster import ( "context" @@ -20,16 +20,18 @@ import ( "github.com/lib/pq/oid" ) -type ImportTypeResolver struct { +// CrossClusterTypeResolver is meant to be used to resolve types using type +// descriptors that originate from a different cluster. +type CrossClusterTypeResolver struct { typeIDToDesc map[descpb.ID]*descpb.TypeDescriptor typeNameToDesc map[string][]*descpb.TypeDescriptor } -var _ tree.TypeReferenceResolver = ImportTypeResolver{} -var _ catalog.TypeDescriptorResolver = ImportTypeResolver{} +var _ tree.TypeReferenceResolver = CrossClusterTypeResolver{} +var _ catalog.TypeDescriptorResolver = CrossClusterTypeResolver{} -func MakeImportTypeResolver(typeDescs []*descpb.TypeDescriptor) ImportTypeResolver { - itr := ImportTypeResolver{ +func MakeCrossClusterTypeResolver(typeDescs []*descpb.TypeDescriptor) CrossClusterTypeResolver { + itr := CrossClusterTypeResolver{ typeIDToDesc: make(map[descpb.ID]*descpb.TypeDescriptor), typeNameToDesc: make(map[string][]*descpb.TypeDescriptor), } @@ -52,7 +54,7 @@ func MakeImportTypeResolver(typeDescs []*descpb.TypeDescriptor) ImportTypeResolv // Note that if a table happens to have multiple types with the same name (but // different schemas), this implementation will return a "feature unsupported" // error. -func (i ImportTypeResolver) ResolveType( +func (i CrossClusterTypeResolver) ResolveType( ctx context.Context, name *tree.UnresolvedObjectName, ) (*types.T, error) { var descs []*descpb.TypeDescriptor @@ -75,12 +77,14 @@ func (i ImportTypeResolver) ResolveType( } // ResolveTypeByOID implements the tree.TypeReferenceResolver interface. -func (i ImportTypeResolver) ResolveTypeByOID(ctx context.Context, oid oid.Oid) (*types.T, error) { +func (i CrossClusterTypeResolver) ResolveTypeByOID( + ctx context.Context, oid oid.Oid, +) (*types.T, error) { return typedesc.ResolveHydratedTByOID(ctx, oid, i) } // GetTypeDescriptor implements the catalog.TypeDescriptorResolver interface. -func (i ImportTypeResolver) GetTypeDescriptor( +func (i CrossClusterTypeResolver) GetTypeDescriptor( _ context.Context, id descpb.ID, ) (tree.TypeName, catalog.TypeDescriptor, error) { var desc *descpb.TypeDescriptor diff --git a/pkg/ccl/crosscluster/logical/BUILD.bazel b/pkg/ccl/crosscluster/logical/BUILD.bazel index a38ef38dd28a..e9b64205f66a 100644 --- a/pkg/ccl/crosscluster/logical/BUILD.bazel +++ b/pkg/ccl/crosscluster/logical/BUILD.bazel @@ -50,7 +50,6 @@ go_library( "//pkg/sql/execinfra", "//pkg/sql/execinfrapb", "//pkg/sql/exprutil", - "//pkg/sql/importer", "//pkg/sql/isql", "//pkg/sql/lexbase", "//pkg/sql/parser", @@ -74,7 +73,6 @@ go_library( "//pkg/util/admission/admissionpb", "//pkg/util/buildutil", "//pkg/util/ctxgroup", - "//pkg/util/errorutil/unimplemented", "//pkg/util/hlc", "//pkg/util/log", "//pkg/util/log/logcrash", diff --git a/pkg/ccl/crosscluster/logical/create_logical_replication_stmt.go b/pkg/ccl/crosscluster/logical/create_logical_replication_stmt.go index 6bec89dbddce..a98570a46b29 100644 --- a/pkg/ccl/crosscluster/logical/create_logical_replication_stmt.go +++ b/pkg/ccl/crosscluster/logical/create_logical_replication_stmt.go @@ -26,7 +26,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc" "github.com/cockroachdb/cockroach/pkg/sql/exprutil" - "github.com/cockroachdb/cockroach/pkg/sql/importer" "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" @@ -36,7 +35,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/syntheticprivilege" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/buildutil" - "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" @@ -212,16 +210,9 @@ func createLogicalReplicationStreamPlanHook( sourceTypes := make([]*descpb.TypeDescriptor, len(spec.TypeDescriptors)) for i, desc := range spec.TypeDescriptors { - // Until https://github.com/cockroachdb/cockroach/issues/132164 is resolved, - // we cannot allow user-defined types on the SQL ingestion path. - if m, ok := options.GetMode(); ok && m != "immediate" { - return unimplemented.NewWithIssue(132164, "MODE = 'immediate' cannot be used with user-defined types") - } sourceTypes[i] = &desc } - // TODO(rafi): do we need a different type resolver? - // See https://github.com/cockroachdb/cockroach/issues/132164. - importResolver := importer.MakeImportTypeResolver(sourceTypes) + crossClusterResolver := crosscluster.MakeCrossClusterTypeResolver(sourceTypes) // If the user asked to ignore "ttl-deletes", make sure that at least one of // the source tables actually has a TTL job which sets the omit bit that @@ -231,7 +222,7 @@ func createLogicalReplicationStreamPlanHook( for i, name := range srcTableNames { td := spec.TableDescriptors[name] cpy := tabledesc.NewBuilder(&td).BuildCreatedMutableTable() - if err := typedesc.HydrateTypesInDescriptor(ctx, cpy, importResolver); err != nil { + if err := typedesc.HydrateTypesInDescriptor(ctx, cpy, crossClusterResolver); err != nil { return err } srcTableDescs[i] = cpy.TableDesc() diff --git a/pkg/ccl/crosscluster/logical/logical_replication_job.go b/pkg/ccl/crosscluster/logical/logical_replication_job.go index 1a007cae705e..08ccfede3ef8 100644 --- a/pkg/ccl/crosscluster/logical/logical_replication_job.go +++ b/pkg/ccl/crosscluster/logical/logical_replication_job.go @@ -29,7 +29,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" - "github.com/cockroachdb/cockroach/pkg/sql/importer" "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/cockroach/pkg/sql/physicalplan" "github.com/cockroachdb/cockroach/pkg/sql/sem/catid" @@ -359,15 +358,13 @@ func (p *logicalReplicationPlanner) generatePlanImpl( defaultFnOID = catid.FuncIDToOID(catid.DescID(defaultFnID)) } - // TODO(msbutler): is this import type resolver kosher? Should put in a new package. - // See https://github.com/cockroachdb/cockroach/issues/132164. - importResolver := importer.MakeImportTypeResolver(plan.SourceTypes) + crossClusterResolver := crosscluster.MakeCrossClusterTypeResolver(plan.SourceTypes) tableMetadataByDestID := make(map[int32]execinfrapb.TableReplicationMetadata) if err := sql.DescsTxn(ctx, execCfg, func(ctx context.Context, txn isql.Txn, descriptors *descs.Collection) error { for _, pair := range payload.ReplicationPairs { srcTableDesc := plan.DescriptorMap[pair.SrcDescriptorID] cpy := tabledesc.NewBuilder(&srcTableDesc).BuildCreatedMutableTable() - if err := typedesc.HydrateTypesInDescriptor(ctx, cpy, importResolver); err != nil { + if err := typedesc.HydrateTypesInDescriptor(ctx, cpy, crossClusterResolver); err != nil { return err } srcTableDesc = *cpy.TableDesc() diff --git a/pkg/ccl/crosscluster/logical/logical_replication_job_test.go b/pkg/ccl/crosscluster/logical/logical_replication_job_test.go index f70720ebe307..b7d9d403336b 100644 --- a/pkg/ccl/crosscluster/logical/logical_replication_job_test.go +++ b/pkg/ccl/crosscluster/logical/logical_replication_job_test.go @@ -1928,19 +1928,32 @@ func TestUserDefinedTypes(t *testing.T) { dbA.Exec(t, "CREATE TYPE my_composite AS (a INT, b TEXT)") dbB.Exec(t, "CREATE TYPE my_composite AS (a INT, b TEXT)") - dbA.Exec(t, "CREATE TABLE data (pk INT PRIMARY KEY, val1 my_enum DEFAULT 'two', val2 my_composite)") - dbB.Exec(t, "CREATE TABLE data (pk INT PRIMARY KEY, val1 my_enum DEFAULT 'two', val2 my_composite)") - - dbB.Exec(t, "INSERT INTO data VALUES (1, 'one', (3, 'cat'))") - // Force default expression evaluation. - dbB.Exec(t, "INSERT INTO data (pk, val2) VALUES (2, (4, 'dog'))") - - var jobAID jobspb.JobID - dbA.QueryRow(t, "CREATE LOGICAL REPLICATION STREAM FROM TABLE data ON $1 INTO TABLE data", dbBURL.String()).Scan(&jobAID) - WaitUntilReplicatedTime(t, s.Clock().Now(), dbA, jobAID) - require.NoError(t, replicationtestutils.CheckEmptyDLQs(ctx, dbA.DB, "A")) - dbB.CheckQueryResults(t, "SELECT * FROM data", [][]string{{"1", "one", "(3,cat)"}, {"2", "two", "(4,dog)"}}) - dbA.CheckQueryResults(t, "SELECT * FROM data", [][]string{{"1", "one", "(3,cat)"}, {"2", "two", "(4,dog)"}}) + for _, mode := range []string{"validated", "immediate"} { + t.Run(mode, func(t *testing.T) { + dbA.Exec(t, "CREATE TABLE data (pk INT PRIMARY KEY, val1 my_enum DEFAULT 'two', val2 my_composite)") + dbB.Exec(t, "CREATE TABLE data (pk INT PRIMARY KEY, val1 my_enum DEFAULT 'two', val2 my_composite)") + + dbB.Exec(t, "INSERT INTO data VALUES (1, 'one', (3, 'cat'))") + // Force default expression evaluation. + dbB.Exec(t, "INSERT INTO data (pk, val2) VALUES (2, (4, 'dog'))") + + var jobAID jobspb.JobID + dbA.QueryRow(t, + fmt.Sprintf("CREATE LOGICAL REPLICATION STREAM FROM TABLE data ON $1 INTO TABLE data WITH mode = %s", mode), + dbBURL.String(), + ).Scan(&jobAID) + WaitUntilReplicatedTime(t, s.Clock().Now(), dbA, jobAID) + require.NoError(t, replicationtestutils.CheckEmptyDLQs(ctx, dbA.DB, "A")) + dbB.CheckQueryResults(t, "SELECT * FROM data", [][]string{{"1", "one", "(3,cat)"}, {"2", "two", "(4,dog)"}}) + dbA.CheckQueryResults(t, "SELECT * FROM data", [][]string{{"1", "one", "(3,cat)"}, {"2", "two", "(4,dog)"}}) + + dbA.Exec(t, "CANCEL JOB $1", jobAID) + jobutils.WaitForJobToCancel(t, dbA, jobAID) + + dbA.Exec(t, "DROP TABLE data") + dbB.Exec(t, "DROP TABLE data") + }) + } } // TestLogicalReplicationCreationChecks verifies that we check that the table diff --git a/pkg/ccl/crosscluster/logical/lww_row_processor.go b/pkg/ccl/crosscluster/logical/lww_row_processor.go index 645ec83d83ec..63122a4b469a 100644 --- a/pkg/ccl/crosscluster/logical/lww_row_processor.go +++ b/pkg/ccl/crosscluster/logical/lww_row_processor.go @@ -32,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" + "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metamorphic" "github.com/cockroachdb/cockroach/pkg/util/randutil" @@ -90,6 +91,12 @@ func (q *queryBuilder) AddRow(row cdcevent.Row) error { return err } if err := it.Datum(func(d tree.Datum, col cdcevent.ResultColumn) error { + if dEnum, ok := d.(*tree.DEnum); ok { + // Override the type to Unknown to avoid a mismatched type OID error + // during execution. Note that Unknown is the type used by default + // when a SQL statement is executed without type hints. + dEnum.EnumTyp = types.Unknown + } q.scratchDatums = append(q.scratchDatums, d) return nil }); err != nil { @@ -116,6 +123,12 @@ func (q *queryBuilder) AddRowDefaultNull(row *cdcevent.Row) error { continue } if err := it.Datum(func(d tree.Datum, col cdcevent.ResultColumn) error { + if dEnum, ok := d.(*tree.DEnum); ok { + // Override the type to Unknown to avoid a mismatched type OID error + // during execution. Note that Unknown is the type used by default + // when a SQL statement is executed without type hints. + dEnum.EnumTyp = types.Unknown + } q.scratchDatums = append(q.scratchDatums, d) return nil }); err != nil { diff --git a/pkg/sql/importer/BUILD.bazel b/pkg/sql/importer/BUILD.bazel index 0653253b0609..f3c798eaeb59 100644 --- a/pkg/sql/importer/BUILD.bazel +++ b/pkg/sql/importer/BUILD.bazel @@ -17,7 +17,6 @@ go_library( "import_processor.go", "import_processor_planning.go", "import_table_creation.go", - "import_type_resolver.go", "read_import_avro.go", "read_import_base.go", "read_import_csv.go", @@ -32,6 +31,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/base", + "//pkg/ccl/crosscluster", "//pkg/cloud", "//pkg/cloud/cloudprivilege", "//pkg/clusterversion", @@ -93,7 +93,6 @@ go_library( "//pkg/sql/sem/tree", "//pkg/sql/sessiondata", "//pkg/sql/sqlclustersettings", - "//pkg/sql/sqlerrors", "//pkg/sql/sqltelemetry", "//pkg/sql/stats", "//pkg/sql/types", diff --git a/pkg/sql/importer/read_import_base.go b/pkg/sql/importer/read_import_base.go index 2065079cb29f..963323a86d91 100644 --- a/pkg/sql/importer/read_import_base.go +++ b/pkg/sql/importer/read_import_base.go @@ -18,6 +18,7 @@ import ( "sync/atomic" "time" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster" "github.com/cockroachdb/cockroach/pkg/cloud" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" @@ -55,7 +56,7 @@ func runImport( // Install type metadata in all of the import tables. spec = protoutil.Clone(spec).(*execinfrapb.ReadImportDataSpec) - importResolver := MakeImportTypeResolver(spec.Types) + importResolver := crosscluster.MakeCrossClusterTypeResolver(spec.Types) for _, table := range spec.Tables { cpy := tabledesc.NewBuilder(table.Desc).BuildCreatedMutableTable() if err := typedesc.HydrateTypesInDescriptor(ctx, cpy, importResolver); err != nil {