Skip to content

Commit

Permalink
Merge pull request #133295 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-24.3-133280

release-24.3: crosscluster/logical: handle user-defined types in SQL mode
  • Loading branch information
rafiss authored Oct 23, 2024
2 parents 1258c8d + ee1a856 commit 183bed7
Show file tree
Hide file tree
Showing 9 changed files with 71 additions and 44 deletions.
11 changes: 11 additions & 0 deletions pkg/ccl/crosscluster/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go_library(
name = "crosscluster",
srcs = [
"addresses.go",
"crosscluster_type_resolver.go",
"errors.go",
"event.go",
"settings.go",
Expand All @@ -15,5 +16,15 @@ go_library(
"//pkg/repstream/streampb",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/sql/catalog",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/typedesc",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/sem/tree",
"//pkg/sql/sqlerrors",
"//pkg/sql/types",
"@com_github_cockroachdb_errors//:errors",
"@com_github_lib_pq//oid",
],
)
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
// Copyright 2017 The Cockroach Authors.
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the CockroachDB Software License
// included in the /LICENSE file.

package importer
package crosscluster

import (
"context"
Expand All @@ -20,16 +20,18 @@ import (
"github.com/lib/pq/oid"
)

type ImportTypeResolver struct {
// CrossClusterTypeResolver is meant to be used to resolve types using type
// descriptors that originate from a different cluster.
type CrossClusterTypeResolver struct {
typeIDToDesc map[descpb.ID]*descpb.TypeDescriptor
typeNameToDesc map[string][]*descpb.TypeDescriptor
}

var _ tree.TypeReferenceResolver = ImportTypeResolver{}
var _ catalog.TypeDescriptorResolver = ImportTypeResolver{}
var _ tree.TypeReferenceResolver = CrossClusterTypeResolver{}
var _ catalog.TypeDescriptorResolver = CrossClusterTypeResolver{}

func MakeImportTypeResolver(typeDescs []*descpb.TypeDescriptor) ImportTypeResolver {
itr := ImportTypeResolver{
func MakeCrossClusterTypeResolver(typeDescs []*descpb.TypeDescriptor) CrossClusterTypeResolver {
itr := CrossClusterTypeResolver{
typeIDToDesc: make(map[descpb.ID]*descpb.TypeDescriptor),
typeNameToDesc: make(map[string][]*descpb.TypeDescriptor),
}
Expand All @@ -52,7 +54,7 @@ func MakeImportTypeResolver(typeDescs []*descpb.TypeDescriptor) ImportTypeResolv
// Note that if a table happens to have multiple types with the same name (but
// different schemas), this implementation will return a "feature unsupported"
// error.
func (i ImportTypeResolver) ResolveType(
func (i CrossClusterTypeResolver) ResolveType(
ctx context.Context, name *tree.UnresolvedObjectName,
) (*types.T, error) {
var descs []*descpb.TypeDescriptor
Expand All @@ -75,12 +77,14 @@ func (i ImportTypeResolver) ResolveType(
}

// ResolveTypeByOID implements the tree.TypeReferenceResolver interface.
func (i ImportTypeResolver) ResolveTypeByOID(ctx context.Context, oid oid.Oid) (*types.T, error) {
func (i CrossClusterTypeResolver) ResolveTypeByOID(
ctx context.Context, oid oid.Oid,
) (*types.T, error) {
return typedesc.ResolveHydratedTByOID(ctx, oid, i)
}

// GetTypeDescriptor implements the catalog.TypeDescriptorResolver interface.
func (i ImportTypeResolver) GetTypeDescriptor(
func (i CrossClusterTypeResolver) GetTypeDescriptor(
_ context.Context, id descpb.ID,
) (tree.TypeName, catalog.TypeDescriptor, error) {
var desc *descpb.TypeDescriptor
Expand Down
2 changes: 0 additions & 2 deletions pkg/ccl/crosscluster/logical/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ go_library(
"//pkg/sql/execinfra",
"//pkg/sql/execinfrapb",
"//pkg/sql/exprutil",
"//pkg/sql/importer",
"//pkg/sql/isql",
"//pkg/sql/lexbase",
"//pkg/sql/parser",
Expand All @@ -74,7 +73,6 @@ go_library(
"//pkg/util/admission/admissionpb",
"//pkg/util/buildutil",
"//pkg/util/ctxgroup",
"//pkg/util/errorutil/unimplemented",
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/log/logcrash",
Expand Down
13 changes: 2 additions & 11 deletions pkg/ccl/crosscluster/logical/create_logical_replication_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc"
"github.com/cockroachdb/cockroach/pkg/sql/exprutil"
"github.com/cockroachdb/cockroach/pkg/sql/importer"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
Expand All @@ -36,7 +35,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/syntheticprivilege"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -212,16 +210,9 @@ func createLogicalReplicationStreamPlanHook(

sourceTypes := make([]*descpb.TypeDescriptor, len(spec.TypeDescriptors))
for i, desc := range spec.TypeDescriptors {
// Until https://github.com/cockroachdb/cockroach/issues/132164 is resolved,
// we cannot allow user-defined types on the SQL ingestion path.
if m, ok := options.GetMode(); ok && m != "immediate" {
return unimplemented.NewWithIssue(132164, "MODE = 'immediate' cannot be used with user-defined types")
}
sourceTypes[i] = &desc
}
// TODO(rafi): do we need a different type resolver?
// See https://github.com/cockroachdb/cockroach/issues/132164.
importResolver := importer.MakeImportTypeResolver(sourceTypes)
crossClusterResolver := crosscluster.MakeCrossClusterTypeResolver(sourceTypes)

// If the user asked to ignore "ttl-deletes", make sure that at least one of
// the source tables actually has a TTL job which sets the omit bit that
Expand All @@ -231,7 +222,7 @@ func createLogicalReplicationStreamPlanHook(
for i, name := range srcTableNames {
td := spec.TableDescriptors[name]
cpy := tabledesc.NewBuilder(&td).BuildCreatedMutableTable()
if err := typedesc.HydrateTypesInDescriptor(ctx, cpy, importResolver); err != nil {
if err := typedesc.HydrateTypesInDescriptor(ctx, cpy, crossClusterResolver); err != nil {
return err
}
srcTableDescs[i] = cpy.TableDesc()
Expand Down
7 changes: 2 additions & 5 deletions pkg/ccl/crosscluster/logical/logical_replication_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/importer"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
"github.com/cockroachdb/cockroach/pkg/sql/sem/catid"
Expand Down Expand Up @@ -359,15 +358,13 @@ func (p *logicalReplicationPlanner) generatePlanImpl(
defaultFnOID = catid.FuncIDToOID(catid.DescID(defaultFnID))
}

// TODO(msbutler): is this import type resolver kosher? Should put in a new package.
// See https://github.com/cockroachdb/cockroach/issues/132164.
importResolver := importer.MakeImportTypeResolver(plan.SourceTypes)
crossClusterResolver := crosscluster.MakeCrossClusterTypeResolver(plan.SourceTypes)
tableMetadataByDestID := make(map[int32]execinfrapb.TableReplicationMetadata)
if err := sql.DescsTxn(ctx, execCfg, func(ctx context.Context, txn isql.Txn, descriptors *descs.Collection) error {
for _, pair := range payload.ReplicationPairs {
srcTableDesc := plan.DescriptorMap[pair.SrcDescriptorID]
cpy := tabledesc.NewBuilder(&srcTableDesc).BuildCreatedMutableTable()
if err := typedesc.HydrateTypesInDescriptor(ctx, cpy, importResolver); err != nil {
if err := typedesc.HydrateTypesInDescriptor(ctx, cpy, crossClusterResolver); err != nil {
return err
}
srcTableDesc = *cpy.TableDesc()
Expand Down
39 changes: 26 additions & 13 deletions pkg/ccl/crosscluster/logical/logical_replication_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1956,19 +1956,32 @@ func TestUserDefinedTypes(t *testing.T) {
dbA.Exec(t, "CREATE TYPE my_composite AS (a INT, b TEXT)")
dbB.Exec(t, "CREATE TYPE my_composite AS (a INT, b TEXT)")

dbA.Exec(t, "CREATE TABLE data (pk INT PRIMARY KEY, val1 my_enum DEFAULT 'two', val2 my_composite)")
dbB.Exec(t, "CREATE TABLE data (pk INT PRIMARY KEY, val1 my_enum DEFAULT 'two', val2 my_composite)")

dbB.Exec(t, "INSERT INTO data VALUES (1, 'one', (3, 'cat'))")
// Force default expression evaluation.
dbB.Exec(t, "INSERT INTO data (pk, val2) VALUES (2, (4, 'dog'))")

var jobAID jobspb.JobID
dbA.QueryRow(t, "CREATE LOGICAL REPLICATION STREAM FROM TABLE data ON $1 INTO TABLE data", dbBURL.String()).Scan(&jobAID)
WaitUntilReplicatedTime(t, s.Clock().Now(), dbA, jobAID)
require.NoError(t, replicationtestutils.CheckEmptyDLQs(ctx, dbA.DB, "A"))
dbB.CheckQueryResults(t, "SELECT * FROM data", [][]string{{"1", "one", "(3,cat)"}, {"2", "two", "(4,dog)"}})
dbA.CheckQueryResults(t, "SELECT * FROM data", [][]string{{"1", "one", "(3,cat)"}, {"2", "two", "(4,dog)"}})
for _, mode := range []string{"validated", "immediate"} {
t.Run(mode, func(t *testing.T) {
dbA.Exec(t, "CREATE TABLE data (pk INT PRIMARY KEY, val1 my_enum DEFAULT 'two', val2 my_composite)")
dbB.Exec(t, "CREATE TABLE data (pk INT PRIMARY KEY, val1 my_enum DEFAULT 'two', val2 my_composite)")

dbB.Exec(t, "INSERT INTO data VALUES (1, 'one', (3, 'cat'))")
// Force default expression evaluation.
dbB.Exec(t, "INSERT INTO data (pk, val2) VALUES (2, (4, 'dog'))")

var jobAID jobspb.JobID
dbA.QueryRow(t,
fmt.Sprintf("CREATE LOGICAL REPLICATION STREAM FROM TABLE data ON $1 INTO TABLE data WITH mode = %s", mode),
dbBURL.String(),
).Scan(&jobAID)
WaitUntilReplicatedTime(t, s.Clock().Now(), dbA, jobAID)
require.NoError(t, replicationtestutils.CheckEmptyDLQs(ctx, dbA.DB, "A"))
dbB.CheckQueryResults(t, "SELECT * FROM data", [][]string{{"1", "one", "(3,cat)"}, {"2", "two", "(4,dog)"}})
dbA.CheckQueryResults(t, "SELECT * FROM data", [][]string{{"1", "one", "(3,cat)"}, {"2", "two", "(4,dog)"}})

dbA.Exec(t, "CANCEL JOB $1", jobAID)
jobutils.WaitForJobToCancel(t, dbA, jobAID)

dbA.Exec(t, "DROP TABLE data")
dbB.Exec(t, "DROP TABLE data")
})
}
}

// TestLogicalReplicationCreationChecks verifies that we check that the table
Expand Down
13 changes: 13 additions & 0 deletions pkg/ccl/crosscluster/logical/lww_row_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metamorphic"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
Expand Down Expand Up @@ -90,6 +91,12 @@ func (q *queryBuilder) AddRow(row cdcevent.Row) error {
return err
}
if err := it.Datum(func(d tree.Datum, col cdcevent.ResultColumn) error {
if dEnum, ok := d.(*tree.DEnum); ok {
// Override the type to Unknown to avoid a mismatched type OID error
// during execution. Note that Unknown is the type used by default
// when a SQL statement is executed without type hints.
dEnum.EnumTyp = types.Unknown
}
q.scratchDatums = append(q.scratchDatums, d)
return nil
}); err != nil {
Expand All @@ -116,6 +123,12 @@ func (q *queryBuilder) AddRowDefaultNull(row *cdcevent.Row) error {
continue
}
if err := it.Datum(func(d tree.Datum, col cdcevent.ResultColumn) error {
if dEnum, ok := d.(*tree.DEnum); ok {
// Override the type to Unknown to avoid a mismatched type OID error
// during execution. Note that Unknown is the type used by default
// when a SQL statement is executed without type hints.
dEnum.EnumTyp = types.Unknown
}
q.scratchDatums = append(q.scratchDatums, d)
return nil
}); err != nil {
Expand Down
3 changes: 1 addition & 2 deletions pkg/sql/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ go_library(
"import_processor.go",
"import_processor_planning.go",
"import_table_creation.go",
"import_type_resolver.go",
"read_import_avro.go",
"read_import_base.go",
"read_import_csv.go",
Expand All @@ -32,6 +31,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/ccl/crosscluster",
"//pkg/cloud",
"//pkg/cloud/cloudprivilege",
"//pkg/clusterversion",
Expand Down Expand Up @@ -93,7 +93,6 @@ go_library(
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlclustersettings",
"//pkg/sql/sqlerrors",
"//pkg/sql/sqltelemetry",
"//pkg/sql/stats",
"//pkg/sql/types",
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/importer/read_import_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/crosscluster"
"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
Expand Down Expand Up @@ -55,7 +56,7 @@ func runImport(

// Install type metadata in all of the import tables.
spec = protoutil.Clone(spec).(*execinfrapb.ReadImportDataSpec)
importResolver := MakeImportTypeResolver(spec.Types)
importResolver := crosscluster.MakeCrossClusterTypeResolver(spec.Types)
for _, table := range spec.Tables {
cpy := tabledesc.NewBuilder(table.Desc).BuildCreatedMutableTable()
if err := typedesc.HydrateTypesInDescriptor(ctx, cpy, importResolver); err != nil {
Expand Down

0 comments on commit 183bed7

Please sign in to comment.