Skip to content

Commit

Permalink
crosscluster/logical: handle user-defined types in SQL mode
Browse files Browse the repository at this point in the history
Previously, using a user-defined type in SQL mode would fail because of
a low level check in the execution engine that verifies that the input
datum has the same type OID as the destination column.

This does not work for LDR, since the input datums come directly from a
source table in a different cluster, so user-defined types have
different OIDs.

This fixes it by setting the datum type to "unknown" before executing
the insert query. The "unknown" type is what is normally used when any
SQL statement is sent to CRDB without explicit type annotations/hints.
When the execution engine sees this type, it will perform an automatic
(and cheap) immutable assignment cast to change the datum to the
appropriate type.

Release note (ops change): Logical replication streams that reference
tables with user-defined types can now be created with the `mode =
immediate` option.
  • Loading branch information
rafiss committed Oct 23, 2024
1 parent bc87198 commit e6d6abf
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/syntheticprivilege"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -212,11 +211,6 @@ func createLogicalReplicationStreamPlanHook(

sourceTypes := make([]*descpb.TypeDescriptor, len(spec.TypeDescriptors))
for i, desc := range spec.TypeDescriptors {
// Until https://github.com/cockroachdb/cockroach/issues/132164 is resolved,
// we cannot allow user-defined types on the SQL ingestion path.
if m, ok := options.GetMode(); ok && m != "immediate" {
return unimplemented.NewWithIssue(132164, "MODE = 'immediate' cannot be used with user-defined types")
}
sourceTypes[i] = &desc
}
// TODO(rafi): do we need a different type resolver?
Expand Down
39 changes: 26 additions & 13 deletions pkg/ccl/crosscluster/logical/logical_replication_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1928,19 +1928,32 @@ func TestUserDefinedTypes(t *testing.T) {
dbA.Exec(t, "CREATE TYPE my_composite AS (a INT, b TEXT)")
dbB.Exec(t, "CREATE TYPE my_composite AS (a INT, b TEXT)")

dbA.Exec(t, "CREATE TABLE data (pk INT PRIMARY KEY, val1 my_enum DEFAULT 'two', val2 my_composite)")
dbB.Exec(t, "CREATE TABLE data (pk INT PRIMARY KEY, val1 my_enum DEFAULT 'two', val2 my_composite)")

dbB.Exec(t, "INSERT INTO data VALUES (1, 'one', (3, 'cat'))")
// Force default expression evaluation.
dbB.Exec(t, "INSERT INTO data (pk, val2) VALUES (2, (4, 'dog'))")

var jobAID jobspb.JobID
dbA.QueryRow(t, "CREATE LOGICAL REPLICATION STREAM FROM TABLE data ON $1 INTO TABLE data", dbBURL.String()).Scan(&jobAID)
WaitUntilReplicatedTime(t, s.Clock().Now(), dbA, jobAID)
require.NoError(t, replicationtestutils.CheckEmptyDLQs(ctx, dbA.DB, "A"))
dbB.CheckQueryResults(t, "SELECT * FROM data", [][]string{{"1", "one", "(3,cat)"}, {"2", "two", "(4,dog)"}})
dbA.CheckQueryResults(t, "SELECT * FROM data", [][]string{{"1", "one", "(3,cat)"}, {"2", "two", "(4,dog)"}})
for _, mode := range []string{"validated", "immediate"} {
t.Run(mode, func(t *testing.T) {
dbA.Exec(t, "CREATE TABLE data (pk INT PRIMARY KEY, val1 my_enum DEFAULT 'two', val2 my_composite)")
dbB.Exec(t, "CREATE TABLE data (pk INT PRIMARY KEY, val1 my_enum DEFAULT 'two', val2 my_composite)")

dbB.Exec(t, "INSERT INTO data VALUES (1, 'one', (3, 'cat'))")
// Force default expression evaluation.
dbB.Exec(t, "INSERT INTO data (pk, val2) VALUES (2, (4, 'dog'))")

var jobAID jobspb.JobID
dbA.QueryRow(t,
fmt.Sprintf("CREATE LOGICAL REPLICATION STREAM FROM TABLE data ON $1 INTO TABLE data WITH mode = %s", mode),
dbBURL.String(),
).Scan(&jobAID)
WaitUntilReplicatedTime(t, s.Clock().Now(), dbA, jobAID)
require.NoError(t, replicationtestutils.CheckEmptyDLQs(ctx, dbA.DB, "A"))
dbB.CheckQueryResults(t, "SELECT * FROM data", [][]string{{"1", "one", "(3,cat)"}, {"2", "two", "(4,dog)"}})
dbA.CheckQueryResults(t, "SELECT * FROM data", [][]string{{"1", "one", "(3,cat)"}, {"2", "two", "(4,dog)"}})

dbA.Exec(t, "CANCEL JOB $1", jobAID)
jobutils.WaitForJobToCancel(t, dbA, jobAID)

dbA.Exec(t, "DROP TABLE data")
dbB.Exec(t, "DROP TABLE data")
})
}
}

// TestLogicalReplicationCreationChecks verifies that we check that the table
Expand Down
13 changes: 13 additions & 0 deletions pkg/ccl/crosscluster/logical/lww_row_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metamorphic"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
Expand Down Expand Up @@ -90,6 +91,12 @@ func (q *queryBuilder) AddRow(row cdcevent.Row) error {
return err
}
if err := it.Datum(func(d tree.Datum, col cdcevent.ResultColumn) error {
if dEnum, ok := d.(*tree.DEnum); ok {
// Override the type to Unknown to avoid a mismatched type OID error
// during execution. Note that Unknown is the type used by default
// when a SQL statement is executed without type hints.
dEnum.EnumTyp = types.Unknown
}
q.scratchDatums = append(q.scratchDatums, d)
return nil
}); err != nil {
Expand All @@ -116,6 +123,12 @@ func (q *queryBuilder) AddRowDefaultNull(row *cdcevent.Row) error {
continue
}
if err := it.Datum(func(d tree.Datum, col cdcevent.ResultColumn) error {
if dEnum, ok := d.(*tree.DEnum); ok {
// Override the type to Unknown to avoid a mismatched type OID error
// during execution. Note that Unknown is the type used by default
// when a SQL statement is executed without type hints.
dEnum.EnumTyp = types.Unknown
}
q.scratchDatums = append(q.scratchDatums, d)
return nil
}); err != nil {
Expand Down

0 comments on commit e6d6abf

Please sign in to comment.