From e6d6abf5d57bacc92d5daf0ffa84b950a2272a45 Mon Sep 17 00:00:00 2001 From: Rafi Shamim Date: Tue, 22 Oct 2024 17:39:38 -0400 Subject: [PATCH 1/2] crosscluster/logical: handle user-defined types in SQL mode Previously, using a user-defined type in SQL mode would fail because of a low level check in the execution engine that verifies that the input datum has the same type OID as the destination column. This does not work for LDR, since the input datums come directly from a source table in a different cluster, so user-defined types have different OIDs. This fixes it by setting the datum type to "unknown" before executing the insert query. The "unknown" type is what is normally used when any SQL statement is sent to CRDB without explicit type annotations/hints. When the execution engine sees this type, it will perform an automatic (and cheap) immutable assignment cast to change the datum to the appropriate type. Release note (ops change): Logical replication streams that reference tables with user-defined types can now be created with the `mode = immediate` option. --- .../create_logical_replication_stmt.go | 6 --- .../logical/logical_replication_job_test.go | 39 ++++++++++++------- .../crosscluster/logical/lww_row_processor.go | 13 +++++++ 3 files changed, 39 insertions(+), 19 deletions(-) diff --git a/pkg/ccl/crosscluster/logical/create_logical_replication_stmt.go b/pkg/ccl/crosscluster/logical/create_logical_replication_stmt.go index 6bec89dbddce..3920e019fcca 100644 --- a/pkg/ccl/crosscluster/logical/create_logical_replication_stmt.go +++ b/pkg/ccl/crosscluster/logical/create_logical_replication_stmt.go @@ -36,7 +36,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/syntheticprivilege" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/buildutil" - "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" @@ -212,11 +211,6 @@ func createLogicalReplicationStreamPlanHook( sourceTypes := make([]*descpb.TypeDescriptor, len(spec.TypeDescriptors)) for i, desc := range spec.TypeDescriptors { - // Until https://github.com/cockroachdb/cockroach/issues/132164 is resolved, - // we cannot allow user-defined types on the SQL ingestion path. - if m, ok := options.GetMode(); ok && m != "immediate" { - return unimplemented.NewWithIssue(132164, "MODE = 'immediate' cannot be used with user-defined types") - } sourceTypes[i] = &desc } // TODO(rafi): do we need a different type resolver? diff --git a/pkg/ccl/crosscluster/logical/logical_replication_job_test.go b/pkg/ccl/crosscluster/logical/logical_replication_job_test.go index f70720ebe307..b7d9d403336b 100644 --- a/pkg/ccl/crosscluster/logical/logical_replication_job_test.go +++ b/pkg/ccl/crosscluster/logical/logical_replication_job_test.go @@ -1928,19 +1928,32 @@ func TestUserDefinedTypes(t *testing.T) { dbA.Exec(t, "CREATE TYPE my_composite AS (a INT, b TEXT)") dbB.Exec(t, "CREATE TYPE my_composite AS (a INT, b TEXT)") - dbA.Exec(t, "CREATE TABLE data (pk INT PRIMARY KEY, val1 my_enum DEFAULT 'two', val2 my_composite)") - dbB.Exec(t, "CREATE TABLE data (pk INT PRIMARY KEY, val1 my_enum DEFAULT 'two', val2 my_composite)") - - dbB.Exec(t, "INSERT INTO data VALUES (1, 'one', (3, 'cat'))") - // Force default expression evaluation. - dbB.Exec(t, "INSERT INTO data (pk, val2) VALUES (2, (4, 'dog'))") - - var jobAID jobspb.JobID - dbA.QueryRow(t, "CREATE LOGICAL REPLICATION STREAM FROM TABLE data ON $1 INTO TABLE data", dbBURL.String()).Scan(&jobAID) - WaitUntilReplicatedTime(t, s.Clock().Now(), dbA, jobAID) - require.NoError(t, replicationtestutils.CheckEmptyDLQs(ctx, dbA.DB, "A")) - dbB.CheckQueryResults(t, "SELECT * FROM data", [][]string{{"1", "one", "(3,cat)"}, {"2", "two", "(4,dog)"}}) - dbA.CheckQueryResults(t, "SELECT * FROM data", [][]string{{"1", "one", "(3,cat)"}, {"2", "two", "(4,dog)"}}) + for _, mode := range []string{"validated", "immediate"} { + t.Run(mode, func(t *testing.T) { + dbA.Exec(t, "CREATE TABLE data (pk INT PRIMARY KEY, val1 my_enum DEFAULT 'two', val2 my_composite)") + dbB.Exec(t, "CREATE TABLE data (pk INT PRIMARY KEY, val1 my_enum DEFAULT 'two', val2 my_composite)") + + dbB.Exec(t, "INSERT INTO data VALUES (1, 'one', (3, 'cat'))") + // Force default expression evaluation. + dbB.Exec(t, "INSERT INTO data (pk, val2) VALUES (2, (4, 'dog'))") + + var jobAID jobspb.JobID + dbA.QueryRow(t, + fmt.Sprintf("CREATE LOGICAL REPLICATION STREAM FROM TABLE data ON $1 INTO TABLE data WITH mode = %s", mode), + dbBURL.String(), + ).Scan(&jobAID) + WaitUntilReplicatedTime(t, s.Clock().Now(), dbA, jobAID) + require.NoError(t, replicationtestutils.CheckEmptyDLQs(ctx, dbA.DB, "A")) + dbB.CheckQueryResults(t, "SELECT * FROM data", [][]string{{"1", "one", "(3,cat)"}, {"2", "two", "(4,dog)"}}) + dbA.CheckQueryResults(t, "SELECT * FROM data", [][]string{{"1", "one", "(3,cat)"}, {"2", "two", "(4,dog)"}}) + + dbA.Exec(t, "CANCEL JOB $1", jobAID) + jobutils.WaitForJobToCancel(t, dbA, jobAID) + + dbA.Exec(t, "DROP TABLE data") + dbB.Exec(t, "DROP TABLE data") + }) + } } // TestLogicalReplicationCreationChecks verifies that we check that the table diff --git a/pkg/ccl/crosscluster/logical/lww_row_processor.go b/pkg/ccl/crosscluster/logical/lww_row_processor.go index 645ec83d83ec..63122a4b469a 100644 --- a/pkg/ccl/crosscluster/logical/lww_row_processor.go +++ b/pkg/ccl/crosscluster/logical/lww_row_processor.go @@ -32,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" + "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metamorphic" "github.com/cockroachdb/cockroach/pkg/util/randutil" @@ -90,6 +91,12 @@ func (q *queryBuilder) AddRow(row cdcevent.Row) error { return err } if err := it.Datum(func(d tree.Datum, col cdcevent.ResultColumn) error { + if dEnum, ok := d.(*tree.DEnum); ok { + // Override the type to Unknown to avoid a mismatched type OID error + // during execution. Note that Unknown is the type used by default + // when a SQL statement is executed without type hints. + dEnum.EnumTyp = types.Unknown + } q.scratchDatums = append(q.scratchDatums, d) return nil }); err != nil { @@ -116,6 +123,12 @@ func (q *queryBuilder) AddRowDefaultNull(row *cdcevent.Row) error { continue } if err := it.Datum(func(d tree.Datum, col cdcevent.ResultColumn) error { + if dEnum, ok := d.(*tree.DEnum); ok { + // Override the type to Unknown to avoid a mismatched type OID error + // during execution. Note that Unknown is the type used by default + // when a SQL statement is executed without type hints. + dEnum.EnumTyp = types.Unknown + } q.scratchDatums = append(q.scratchDatums, d) return nil }); err != nil { From 579ecef131c0c9ba81cf4574964d6582021cf164 Mon Sep 17 00:00:00 2001 From: Rafi Shamim Date: Wed, 23 Oct 2024 12:47:30 -0400 Subject: [PATCH 2/2] importer,crosscluster: move type resolver to a different package Since we're using this type resolver for more than just IMPORT, it makes more sense to put it in a more fitting package. Release note: None --- pkg/ccl/crosscluster/BUILD.bazel | 11 +++++++++ .../crosscluster_type_resolver.go} | 24 +++++++++++-------- pkg/ccl/crosscluster/logical/BUILD.bazel | 2 -- .../create_logical_replication_stmt.go | 7 ++---- .../logical/logical_replication_job.go | 7 ++---- pkg/sql/importer/BUILD.bazel | 3 +-- pkg/sql/importer/read_import_base.go | 3 ++- 7 files changed, 32 insertions(+), 25 deletions(-) rename pkg/{sql/importer/import_type_resolver.go => ccl/crosscluster/crosscluster_type_resolver.go} (80%) diff --git a/pkg/ccl/crosscluster/BUILD.bazel b/pkg/ccl/crosscluster/BUILD.bazel index b669e5b4bada..1da9bee6c2c3 100644 --- a/pkg/ccl/crosscluster/BUILD.bazel +++ b/pkg/ccl/crosscluster/BUILD.bazel @@ -4,6 +4,7 @@ go_library( name = "crosscluster", srcs = [ "addresses.go", + "crosscluster_type_resolver.go", "errors.go", "event.go", "settings.go", @@ -15,5 +16,15 @@ go_library( "//pkg/repstream/streampb", "//pkg/roachpb", "//pkg/settings", + "//pkg/sql/catalog", + "//pkg/sql/catalog/descpb", + "//pkg/sql/catalog/typedesc", + "//pkg/sql/pgwire/pgcode", + "//pkg/sql/pgwire/pgerror", + "//pkg/sql/sem/tree", + "//pkg/sql/sqlerrors", + "//pkg/sql/types", + "@com_github_cockroachdb_errors//:errors", + "@com_github_lib_pq//oid", ], ) diff --git a/pkg/sql/importer/import_type_resolver.go b/pkg/ccl/crosscluster/crosscluster_type_resolver.go similarity index 80% rename from pkg/sql/importer/import_type_resolver.go rename to pkg/ccl/crosscluster/crosscluster_type_resolver.go index 061bf464c009..840f8618389c 100644 --- a/pkg/sql/importer/import_type_resolver.go +++ b/pkg/ccl/crosscluster/crosscluster_type_resolver.go @@ -1,9 +1,9 @@ -// Copyright 2017 The Cockroach Authors. +// Copyright 2024 The Cockroach Authors. // // Use of this software is governed by the CockroachDB Software License // included in the /LICENSE file. -package importer +package crosscluster import ( "context" @@ -20,16 +20,18 @@ import ( "github.com/lib/pq/oid" ) -type ImportTypeResolver struct { +// CrossClusterTypeResolver is meant to be used to resolve types using type +// descriptors that originate from a different cluster. +type CrossClusterTypeResolver struct { typeIDToDesc map[descpb.ID]*descpb.TypeDescriptor typeNameToDesc map[string][]*descpb.TypeDescriptor } -var _ tree.TypeReferenceResolver = ImportTypeResolver{} -var _ catalog.TypeDescriptorResolver = ImportTypeResolver{} +var _ tree.TypeReferenceResolver = CrossClusterTypeResolver{} +var _ catalog.TypeDescriptorResolver = CrossClusterTypeResolver{} -func MakeImportTypeResolver(typeDescs []*descpb.TypeDescriptor) ImportTypeResolver { - itr := ImportTypeResolver{ +func MakeCrossClusterTypeResolver(typeDescs []*descpb.TypeDescriptor) CrossClusterTypeResolver { + itr := CrossClusterTypeResolver{ typeIDToDesc: make(map[descpb.ID]*descpb.TypeDescriptor), typeNameToDesc: make(map[string][]*descpb.TypeDescriptor), } @@ -52,7 +54,7 @@ func MakeImportTypeResolver(typeDescs []*descpb.TypeDescriptor) ImportTypeResolv // Note that if a table happens to have multiple types with the same name (but // different schemas), this implementation will return a "feature unsupported" // error. -func (i ImportTypeResolver) ResolveType( +func (i CrossClusterTypeResolver) ResolveType( ctx context.Context, name *tree.UnresolvedObjectName, ) (*types.T, error) { var descs []*descpb.TypeDescriptor @@ -75,12 +77,14 @@ func (i ImportTypeResolver) ResolveType( } // ResolveTypeByOID implements the tree.TypeReferenceResolver interface. -func (i ImportTypeResolver) ResolveTypeByOID(ctx context.Context, oid oid.Oid) (*types.T, error) { +func (i CrossClusterTypeResolver) ResolveTypeByOID( + ctx context.Context, oid oid.Oid, +) (*types.T, error) { return typedesc.ResolveHydratedTByOID(ctx, oid, i) } // GetTypeDescriptor implements the catalog.TypeDescriptorResolver interface. -func (i ImportTypeResolver) GetTypeDescriptor( +func (i CrossClusterTypeResolver) GetTypeDescriptor( _ context.Context, id descpb.ID, ) (tree.TypeName, catalog.TypeDescriptor, error) { var desc *descpb.TypeDescriptor diff --git a/pkg/ccl/crosscluster/logical/BUILD.bazel b/pkg/ccl/crosscluster/logical/BUILD.bazel index a38ef38dd28a..e9b64205f66a 100644 --- a/pkg/ccl/crosscluster/logical/BUILD.bazel +++ b/pkg/ccl/crosscluster/logical/BUILD.bazel @@ -50,7 +50,6 @@ go_library( "//pkg/sql/execinfra", "//pkg/sql/execinfrapb", "//pkg/sql/exprutil", - "//pkg/sql/importer", "//pkg/sql/isql", "//pkg/sql/lexbase", "//pkg/sql/parser", @@ -74,7 +73,6 @@ go_library( "//pkg/util/admission/admissionpb", "//pkg/util/buildutil", "//pkg/util/ctxgroup", - "//pkg/util/errorutil/unimplemented", "//pkg/util/hlc", "//pkg/util/log", "//pkg/util/log/logcrash", diff --git a/pkg/ccl/crosscluster/logical/create_logical_replication_stmt.go b/pkg/ccl/crosscluster/logical/create_logical_replication_stmt.go index 3920e019fcca..a98570a46b29 100644 --- a/pkg/ccl/crosscluster/logical/create_logical_replication_stmt.go +++ b/pkg/ccl/crosscluster/logical/create_logical_replication_stmt.go @@ -26,7 +26,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc" "github.com/cockroachdb/cockroach/pkg/sql/exprutil" - "github.com/cockroachdb/cockroach/pkg/sql/importer" "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" @@ -213,9 +212,7 @@ func createLogicalReplicationStreamPlanHook( for i, desc := range spec.TypeDescriptors { sourceTypes[i] = &desc } - // TODO(rafi): do we need a different type resolver? - // See https://github.com/cockroachdb/cockroach/issues/132164. - importResolver := importer.MakeImportTypeResolver(sourceTypes) + crossClusterResolver := crosscluster.MakeCrossClusterTypeResolver(sourceTypes) // If the user asked to ignore "ttl-deletes", make sure that at least one of // the source tables actually has a TTL job which sets the omit bit that @@ -225,7 +222,7 @@ func createLogicalReplicationStreamPlanHook( for i, name := range srcTableNames { td := spec.TableDescriptors[name] cpy := tabledesc.NewBuilder(&td).BuildCreatedMutableTable() - if err := typedesc.HydrateTypesInDescriptor(ctx, cpy, importResolver); err != nil { + if err := typedesc.HydrateTypesInDescriptor(ctx, cpy, crossClusterResolver); err != nil { return err } srcTableDescs[i] = cpy.TableDesc() diff --git a/pkg/ccl/crosscluster/logical/logical_replication_job.go b/pkg/ccl/crosscluster/logical/logical_replication_job.go index 1a007cae705e..08ccfede3ef8 100644 --- a/pkg/ccl/crosscluster/logical/logical_replication_job.go +++ b/pkg/ccl/crosscluster/logical/logical_replication_job.go @@ -29,7 +29,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" - "github.com/cockroachdb/cockroach/pkg/sql/importer" "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/cockroach/pkg/sql/physicalplan" "github.com/cockroachdb/cockroach/pkg/sql/sem/catid" @@ -359,15 +358,13 @@ func (p *logicalReplicationPlanner) generatePlanImpl( defaultFnOID = catid.FuncIDToOID(catid.DescID(defaultFnID)) } - // TODO(msbutler): is this import type resolver kosher? Should put in a new package. - // See https://github.com/cockroachdb/cockroach/issues/132164. - importResolver := importer.MakeImportTypeResolver(plan.SourceTypes) + crossClusterResolver := crosscluster.MakeCrossClusterTypeResolver(plan.SourceTypes) tableMetadataByDestID := make(map[int32]execinfrapb.TableReplicationMetadata) if err := sql.DescsTxn(ctx, execCfg, func(ctx context.Context, txn isql.Txn, descriptors *descs.Collection) error { for _, pair := range payload.ReplicationPairs { srcTableDesc := plan.DescriptorMap[pair.SrcDescriptorID] cpy := tabledesc.NewBuilder(&srcTableDesc).BuildCreatedMutableTable() - if err := typedesc.HydrateTypesInDescriptor(ctx, cpy, importResolver); err != nil { + if err := typedesc.HydrateTypesInDescriptor(ctx, cpy, crossClusterResolver); err != nil { return err } srcTableDesc = *cpy.TableDesc() diff --git a/pkg/sql/importer/BUILD.bazel b/pkg/sql/importer/BUILD.bazel index 0653253b0609..f3c798eaeb59 100644 --- a/pkg/sql/importer/BUILD.bazel +++ b/pkg/sql/importer/BUILD.bazel @@ -17,7 +17,6 @@ go_library( "import_processor.go", "import_processor_planning.go", "import_table_creation.go", - "import_type_resolver.go", "read_import_avro.go", "read_import_base.go", "read_import_csv.go", @@ -32,6 +31,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/base", + "//pkg/ccl/crosscluster", "//pkg/cloud", "//pkg/cloud/cloudprivilege", "//pkg/clusterversion", @@ -93,7 +93,6 @@ go_library( "//pkg/sql/sem/tree", "//pkg/sql/sessiondata", "//pkg/sql/sqlclustersettings", - "//pkg/sql/sqlerrors", "//pkg/sql/sqltelemetry", "//pkg/sql/stats", "//pkg/sql/types", diff --git a/pkg/sql/importer/read_import_base.go b/pkg/sql/importer/read_import_base.go index 2065079cb29f..963323a86d91 100644 --- a/pkg/sql/importer/read_import_base.go +++ b/pkg/sql/importer/read_import_base.go @@ -18,6 +18,7 @@ import ( "sync/atomic" "time" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster" "github.com/cockroachdb/cockroach/pkg/cloud" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" @@ -55,7 +56,7 @@ func runImport( // Install type metadata in all of the import tables. spec = protoutil.Clone(spec).(*execinfrapb.ReadImportDataSpec) - importResolver := MakeImportTypeResolver(spec.Types) + importResolver := crosscluster.MakeCrossClusterTypeResolver(spec.Types) for _, table := range spec.Tables { cpy := tabledesc.NewBuilder(table.Desc).BuildCreatedMutableTable() if err := typedesc.HydrateTypesInDescriptor(ctx, cpy, importResolver); err != nil {