Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

crosscluster/logical: handle user-defined types in SQL mode #133280

Merged
merged 2 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions pkg/ccl/crosscluster/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go_library(
name = "crosscluster",
srcs = [
"addresses.go",
"crosscluster_type_resolver.go",
"errors.go",
"event.go",
"settings.go",
Expand All @@ -15,5 +16,15 @@ go_library(
"//pkg/repstream/streampb",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/sql/catalog",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/typedesc",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/sem/tree",
"//pkg/sql/sqlerrors",
"//pkg/sql/types",
"@com_github_cockroachdb_errors//:errors",
"@com_github_lib_pq//oid",
],
)
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
// Copyright 2017 The Cockroach Authors.
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the CockroachDB Software License
// included in the /LICENSE file.

package importer
package crosscluster

import (
"context"
Expand All @@ -20,16 +20,18 @@ import (
"github.com/lib/pq/oid"
)

type ImportTypeResolver struct {
// CrossClusterTypeResolver is meant to be used to resolve types using type
// descriptors that originate from a different cluster.
type CrossClusterTypeResolver struct {
typeIDToDesc map[descpb.ID]*descpb.TypeDescriptor
typeNameToDesc map[string][]*descpb.TypeDescriptor
}

var _ tree.TypeReferenceResolver = ImportTypeResolver{}
var _ catalog.TypeDescriptorResolver = ImportTypeResolver{}
var _ tree.TypeReferenceResolver = CrossClusterTypeResolver{}
var _ catalog.TypeDescriptorResolver = CrossClusterTypeResolver{}

func MakeImportTypeResolver(typeDescs []*descpb.TypeDescriptor) ImportTypeResolver {
itr := ImportTypeResolver{
func MakeCrossClusterTypeResolver(typeDescs []*descpb.TypeDescriptor) CrossClusterTypeResolver {
itr := CrossClusterTypeResolver{
typeIDToDesc: make(map[descpb.ID]*descpb.TypeDescriptor),
typeNameToDesc: make(map[string][]*descpb.TypeDescriptor),
}
Expand All @@ -52,7 +54,7 @@ func MakeImportTypeResolver(typeDescs []*descpb.TypeDescriptor) ImportTypeResolv
// Note that if a table happens to have multiple types with the same name (but
// different schemas), this implementation will return a "feature unsupported"
// error.
func (i ImportTypeResolver) ResolveType(
func (i CrossClusterTypeResolver) ResolveType(
ctx context.Context, name *tree.UnresolvedObjectName,
) (*types.T, error) {
var descs []*descpb.TypeDescriptor
Expand All @@ -75,12 +77,14 @@ func (i ImportTypeResolver) ResolveType(
}

// ResolveTypeByOID implements the tree.TypeReferenceResolver interface.
func (i ImportTypeResolver) ResolveTypeByOID(ctx context.Context, oid oid.Oid) (*types.T, error) {
func (i CrossClusterTypeResolver) ResolveTypeByOID(
ctx context.Context, oid oid.Oid,
) (*types.T, error) {
return typedesc.ResolveHydratedTByOID(ctx, oid, i)
}

// GetTypeDescriptor implements the catalog.TypeDescriptorResolver interface.
func (i ImportTypeResolver) GetTypeDescriptor(
func (i CrossClusterTypeResolver) GetTypeDescriptor(
_ context.Context, id descpb.ID,
) (tree.TypeName, catalog.TypeDescriptor, error) {
var desc *descpb.TypeDescriptor
Expand Down
2 changes: 0 additions & 2 deletions pkg/ccl/crosscluster/logical/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ go_library(
"//pkg/sql/execinfra",
"//pkg/sql/execinfrapb",
"//pkg/sql/exprutil",
"//pkg/sql/importer",
"//pkg/sql/isql",
"//pkg/sql/lexbase",
"//pkg/sql/parser",
Expand All @@ -74,7 +73,6 @@ go_library(
"//pkg/util/admission/admissionpb",
"//pkg/util/buildutil",
"//pkg/util/ctxgroup",
"//pkg/util/errorutil/unimplemented",
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/log/logcrash",
Expand Down
13 changes: 2 additions & 11 deletions pkg/ccl/crosscluster/logical/create_logical_replication_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc"
"github.com/cockroachdb/cockroach/pkg/sql/exprutil"
"github.com/cockroachdb/cockroach/pkg/sql/importer"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
Expand All @@ -36,7 +35,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/syntheticprivilege"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -212,16 +210,9 @@ func createLogicalReplicationStreamPlanHook(

sourceTypes := make([]*descpb.TypeDescriptor, len(spec.TypeDescriptors))
for i, desc := range spec.TypeDescriptors {
// Until https://github.com/cockroachdb/cockroach/issues/132164 is resolved,
// we cannot allow user-defined types on the SQL ingestion path.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this little block of code should be in the previous commit.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if m, ok := options.GetMode(); ok && m != "immediate" {
return unimplemented.NewWithIssue(132164, "MODE = 'immediate' cannot be used with user-defined types")
}
sourceTypes[i] = &desc
}
// TODO(rafi): do we need a different type resolver?
// See https://github.com/cockroachdb/cockroach/issues/132164.
importResolver := importer.MakeImportTypeResolver(sourceTypes)
crossClusterResolver := crosscluster.MakeCrossClusterTypeResolver(sourceTypes)

// If the user asked to ignore "ttl-deletes", make sure that at least one of
// the source tables actually has a TTL job which sets the omit bit that
Expand All @@ -231,7 +222,7 @@ func createLogicalReplicationStreamPlanHook(
for i, name := range srcTableNames {
td := spec.TableDescriptors[name]
cpy := tabledesc.NewBuilder(&td).BuildCreatedMutableTable()
if err := typedesc.HydrateTypesInDescriptor(ctx, cpy, importResolver); err != nil {
if err := typedesc.HydrateTypesInDescriptor(ctx, cpy, crossClusterResolver); err != nil {
return err
}
srcTableDescs[i] = cpy.TableDesc()
Expand Down
7 changes: 2 additions & 5 deletions pkg/ccl/crosscluster/logical/logical_replication_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/importer"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
"github.com/cockroachdb/cockroach/pkg/sql/sem/catid"
Expand Down Expand Up @@ -359,15 +358,13 @@ func (p *logicalReplicationPlanner) generatePlanImpl(
defaultFnOID = catid.FuncIDToOID(catid.DescID(defaultFnID))
}

// TODO(msbutler): is this import type resolver kosher? Should put in a new package.
// See https://github.com/cockroachdb/cockroach/issues/132164.
importResolver := importer.MakeImportTypeResolver(plan.SourceTypes)
crossClusterResolver := crosscluster.MakeCrossClusterTypeResolver(plan.SourceTypes)
tableMetadataByDestID := make(map[int32]execinfrapb.TableReplicationMetadata)
if err := sql.DescsTxn(ctx, execCfg, func(ctx context.Context, txn isql.Txn, descriptors *descs.Collection) error {
for _, pair := range payload.ReplicationPairs {
srcTableDesc := plan.DescriptorMap[pair.SrcDescriptorID]
cpy := tabledesc.NewBuilder(&srcTableDesc).BuildCreatedMutableTable()
if err := typedesc.HydrateTypesInDescriptor(ctx, cpy, importResolver); err != nil {
if err := typedesc.HydrateTypesInDescriptor(ctx, cpy, crossClusterResolver); err != nil {
return err
}
srcTableDesc = *cpy.TableDesc()
Expand Down
39 changes: 26 additions & 13 deletions pkg/ccl/crosscluster/logical/logical_replication_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1928,19 +1928,32 @@ func TestUserDefinedTypes(t *testing.T) {
dbA.Exec(t, "CREATE TYPE my_composite AS (a INT, b TEXT)")
dbB.Exec(t, "CREATE TYPE my_composite AS (a INT, b TEXT)")

dbA.Exec(t, "CREATE TABLE data (pk INT PRIMARY KEY, val1 my_enum DEFAULT 'two', val2 my_composite)")
dbB.Exec(t, "CREATE TABLE data (pk INT PRIMARY KEY, val1 my_enum DEFAULT 'two', val2 my_composite)")

dbB.Exec(t, "INSERT INTO data VALUES (1, 'one', (3, 'cat'))")
// Force default expression evaluation.
dbB.Exec(t, "INSERT INTO data (pk, val2) VALUES (2, (4, 'dog'))")

var jobAID jobspb.JobID
dbA.QueryRow(t, "CREATE LOGICAL REPLICATION STREAM FROM TABLE data ON $1 INTO TABLE data", dbBURL.String()).Scan(&jobAID)
WaitUntilReplicatedTime(t, s.Clock().Now(), dbA, jobAID)
require.NoError(t, replicationtestutils.CheckEmptyDLQs(ctx, dbA.DB, "A"))
dbB.CheckQueryResults(t, "SELECT * FROM data", [][]string{{"1", "one", "(3,cat)"}, {"2", "two", "(4,dog)"}})
dbA.CheckQueryResults(t, "SELECT * FROM data", [][]string{{"1", "one", "(3,cat)"}, {"2", "two", "(4,dog)"}})
for _, mode := range []string{"validated", "immediate"} {
t.Run(mode, func(t *testing.T) {
dbA.Exec(t, "CREATE TABLE data (pk INT PRIMARY KEY, val1 my_enum DEFAULT 'two', val2 my_composite)")
dbB.Exec(t, "CREATE TABLE data (pk INT PRIMARY KEY, val1 my_enum DEFAULT 'two', val2 my_composite)")

dbB.Exec(t, "INSERT INTO data VALUES (1, 'one', (3, 'cat'))")
// Force default expression evaluation.
dbB.Exec(t, "INSERT INTO data (pk, val2) VALUES (2, (4, 'dog'))")

var jobAID jobspb.JobID
dbA.QueryRow(t,
fmt.Sprintf("CREATE LOGICAL REPLICATION STREAM FROM TABLE data ON $1 INTO TABLE data WITH mode = %s", mode),
dbBURL.String(),
).Scan(&jobAID)
WaitUntilReplicatedTime(t, s.Clock().Now(), dbA, jobAID)
require.NoError(t, replicationtestutils.CheckEmptyDLQs(ctx, dbA.DB, "A"))
dbB.CheckQueryResults(t, "SELECT * FROM data", [][]string{{"1", "one", "(3,cat)"}, {"2", "two", "(4,dog)"}})
dbA.CheckQueryResults(t, "SELECT * FROM data", [][]string{{"1", "one", "(3,cat)"}, {"2", "two", "(4,dog)"}})

dbA.Exec(t, "CANCEL JOB $1", jobAID)
jobutils.WaitForJobToCancel(t, dbA, jobAID)

dbA.Exec(t, "DROP TABLE data")
dbB.Exec(t, "DROP TABLE data")
})
}
}

// TestLogicalReplicationCreationChecks verifies that we check that the table
Expand Down
13 changes: 13 additions & 0 deletions pkg/ccl/crosscluster/logical/lww_row_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metamorphic"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
Expand Down Expand Up @@ -90,6 +91,12 @@ func (q *queryBuilder) AddRow(row cdcevent.Row) error {
return err
}
if err := it.Datum(func(d tree.Datum, col cdcevent.ResultColumn) error {
if dEnum, ok := d.(*tree.DEnum); ok {
// Override the type to Unknown to avoid a mismatched type OID error
// during execution. Note that Unknown is the type used by default
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the diff in this file should be in the previous commit.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// when a SQL statement is executed without type hints.
dEnum.EnumTyp = types.Unknown
}
q.scratchDatums = append(q.scratchDatums, d)
return nil
}); err != nil {
Expand All @@ -116,6 +123,12 @@ func (q *queryBuilder) AddRowDefaultNull(row *cdcevent.Row) error {
continue
}
if err := it.Datum(func(d tree.Datum, col cdcevent.ResultColumn) error {
if dEnum, ok := d.(*tree.DEnum); ok {
// Override the type to Unknown to avoid a mismatched type OID error
// during execution. Note that Unknown is the type used by default
// when a SQL statement is executed without type hints.
dEnum.EnumTyp = types.Unknown
}
q.scratchDatums = append(q.scratchDatums, d)
return nil
}); err != nil {
Expand Down
3 changes: 1 addition & 2 deletions pkg/sql/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ go_library(
"import_processor.go",
"import_processor_planning.go",
"import_table_creation.go",
"import_type_resolver.go",
"read_import_avro.go",
"read_import_base.go",
"read_import_csv.go",
Expand All @@ -32,6 +31,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/ccl/crosscluster",
"//pkg/cloud",
"//pkg/cloud/cloudprivilege",
"//pkg/clusterversion",
Expand Down Expand Up @@ -93,7 +93,6 @@ go_library(
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlclustersettings",
"//pkg/sql/sqlerrors",
"//pkg/sql/sqltelemetry",
"//pkg/sql/stats",
"//pkg/sql/types",
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/importer/read_import_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/crosscluster"
"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
Expand Down Expand Up @@ -55,7 +56,7 @@ func runImport(

// Install type metadata in all of the import tables.
spec = protoutil.Clone(spec).(*execinfrapb.ReadImportDataSpec)
importResolver := MakeImportTypeResolver(spec.Types)
importResolver := crosscluster.MakeCrossClusterTypeResolver(spec.Types)
for _, table := range spec.Tables {
cpy := tabledesc.NewBuilder(table.Desc).BuildCreatedMutableTable()
if err := typedesc.HydrateTypesInDescriptor(ctx, cpy, importResolver); err != nil {
Expand Down
Loading