Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…achdb#133280

132851: kvserver/rangefeed: remove context from kvpb.RangeFeedEventSink r=stevendanna a=wenyihu6

Previously, `node.MuxRangefeed` created a child context for each rangefeed
request, storing it in the stream interface to allow the node level to be able
to shut down registration goroutines. This patch simplifies the approach by
passing the stream context directly to `p.Register`, eliminating the need to
store context in `streamSink` or return context via the interface. So this patch
also removes context from `kvpb.RangeFeedEventSink`.

Epic: none
Release note: none

133191: roachtest: add validation for version binary override flag r=srosenberg a=DarrylWong

Previously the help message for this flag had an incorrect example usage that listed the version without the leading `v`. Doing this would cause the override to never be used as the override checked for version with the leading `v`.

This change fixes the usage message as well as adds validation that overrided versions are parseable.

Release note: none
Fixes: none
Epic: none

133202: rac2: introduce cluster setting to reset token counters r=kvoli a=sumeerbhola

kvadmission.flow_controller.token_reset_epoch is an escape hatch for cluster operators to reset RACv2 token counters to the full state.

The operator should increment this epoch (or change it to a value different than before). This can be used to counteract a token leakage bug, but note that if there is indeed a bug, the leakage may resume, and tokens may again be exhausted. So it is expected that this will be used together with disabling replication admission control by setting kvadmission.flow_control.enabled=false. Note that disabling replication admission control should be sufficient, since it should unblock work that is waiting-for-eval. But in case there is another bug that is preventing such work from unblocking, this setting may be useful.

Epic: CRDB-37515

Release note (ops change): The cluster setting
kvadmission.flow_controller.token_reset_epoch is an advanced setting that can be used to refill replication admission control v2 tokens. It should only be used after consultation with an expert.

133280: crosscluster/logical: handle user-defined types in SQL mode r=rafiss a=rafiss

This PR also includes a cleanup refactor:

### importer,crosscluster: move type resolver to a different package

Since we're using this type resolver for more than just IMPORT, it makes
more sense to put it in a more fitting package.

----

### crosscluster/logical: handle user-defined types in SQL mode

Previously, using a user-defined type in SQL mode would fail because of
a low level check in the execution engine that verifies that the input
datum has the same type OID as the destination column.

This does not work for LDR, since the input datums come directly from a
source table in a different cluster, so user-defined types have
different OIDs.

This fixes it by setting the datum type to "unknown" before executing
the insert query. The "unknown" type is what is normally used when any
SQL statement is sent to CRDB without explicit type annotations/hints.
When the execution engine sees this type, it will perform an automatic
(and cheap) immutable assignment cast to change the datum to the
appropriate type.

fixes cockroachdb#132164
Release note (ops change): Logical replication streams that reference
tables with user-defined types can now be created with the `mode =
immediate` option.


Co-authored-by: Wenyi Hu <wenyi@cockroachlabs.com>
Co-authored-by: DarrylWong <darryl@cockroachlabs.com>
Co-authored-by: sumeerbhola <sumeer@cockroachlabs.com>
Co-authored-by: Rafi Shamim <rafi@cockroachlabs.com>
  • Loading branch information
5 people committed Oct 23, 2024
5 parents 9e6a644 + a4cbf7d + 2a80007 + d25ceaf + 579ecef commit 23adad6
Show file tree
Hide file tree
Showing 33 changed files with 226 additions and 135 deletions.
11 changes: 11 additions & 0 deletions pkg/ccl/crosscluster/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go_library(
name = "crosscluster",
srcs = [
"addresses.go",
"crosscluster_type_resolver.go",
"errors.go",
"event.go",
"settings.go",
Expand All @@ -15,5 +16,15 @@ go_library(
"//pkg/repstream/streampb",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/sql/catalog",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/typedesc",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/sem/tree",
"//pkg/sql/sqlerrors",
"//pkg/sql/types",
"@com_github_cockroachdb_errors//:errors",
"@com_github_lib_pq//oid",
],
)
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
// Copyright 2017 The Cockroach Authors.
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the CockroachDB Software License
// included in the /LICENSE file.

package importer
package crosscluster

import (
"context"
Expand All @@ -20,16 +20,18 @@ import (
"github.com/lib/pq/oid"
)

type ImportTypeResolver struct {
// CrossClusterTypeResolver is meant to be used to resolve types using type
// descriptors that originate from a different cluster.
type CrossClusterTypeResolver struct {
typeIDToDesc map[descpb.ID]*descpb.TypeDescriptor
typeNameToDesc map[string][]*descpb.TypeDescriptor
}

var _ tree.TypeReferenceResolver = ImportTypeResolver{}
var _ catalog.TypeDescriptorResolver = ImportTypeResolver{}
var _ tree.TypeReferenceResolver = CrossClusterTypeResolver{}
var _ catalog.TypeDescriptorResolver = CrossClusterTypeResolver{}

func MakeImportTypeResolver(typeDescs []*descpb.TypeDescriptor) ImportTypeResolver {
itr := ImportTypeResolver{
func MakeCrossClusterTypeResolver(typeDescs []*descpb.TypeDescriptor) CrossClusterTypeResolver {
itr := CrossClusterTypeResolver{
typeIDToDesc: make(map[descpb.ID]*descpb.TypeDescriptor),
typeNameToDesc: make(map[string][]*descpb.TypeDescriptor),
}
Expand All @@ -52,7 +54,7 @@ func MakeImportTypeResolver(typeDescs []*descpb.TypeDescriptor) ImportTypeResolv
// Note that if a table happens to have multiple types with the same name (but
// different schemas), this implementation will return a "feature unsupported"
// error.
func (i ImportTypeResolver) ResolveType(
func (i CrossClusterTypeResolver) ResolveType(
ctx context.Context, name *tree.UnresolvedObjectName,
) (*types.T, error) {
var descs []*descpb.TypeDescriptor
Expand All @@ -75,12 +77,14 @@ func (i ImportTypeResolver) ResolveType(
}

// ResolveTypeByOID implements the tree.TypeReferenceResolver interface.
func (i ImportTypeResolver) ResolveTypeByOID(ctx context.Context, oid oid.Oid) (*types.T, error) {
func (i CrossClusterTypeResolver) ResolveTypeByOID(
ctx context.Context, oid oid.Oid,
) (*types.T, error) {
return typedesc.ResolveHydratedTByOID(ctx, oid, i)
}

// GetTypeDescriptor implements the catalog.TypeDescriptorResolver interface.
func (i ImportTypeResolver) GetTypeDescriptor(
func (i CrossClusterTypeResolver) GetTypeDescriptor(
_ context.Context, id descpb.ID,
) (tree.TypeName, catalog.TypeDescriptor, error) {
var desc *descpb.TypeDescriptor
Expand Down
2 changes: 0 additions & 2 deletions pkg/ccl/crosscluster/logical/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ go_library(
"//pkg/sql/execinfra",
"//pkg/sql/execinfrapb",
"//pkg/sql/exprutil",
"//pkg/sql/importer",
"//pkg/sql/isql",
"//pkg/sql/lexbase",
"//pkg/sql/parser",
Expand All @@ -74,7 +73,6 @@ go_library(
"//pkg/util/admission/admissionpb",
"//pkg/util/buildutil",
"//pkg/util/ctxgroup",
"//pkg/util/errorutil/unimplemented",
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/log/logcrash",
Expand Down
13 changes: 2 additions & 11 deletions pkg/ccl/crosscluster/logical/create_logical_replication_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc"
"github.com/cockroachdb/cockroach/pkg/sql/exprutil"
"github.com/cockroachdb/cockroach/pkg/sql/importer"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
Expand All @@ -36,7 +35,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/syntheticprivilege"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -212,16 +210,9 @@ func createLogicalReplicationStreamPlanHook(

sourceTypes := make([]*descpb.TypeDescriptor, len(spec.TypeDescriptors))
for i, desc := range spec.TypeDescriptors {
// Until https://github.com/cockroachdb/cockroach/issues/132164 is resolved,
// we cannot allow user-defined types on the SQL ingestion path.
if m, ok := options.GetMode(); ok && m != "immediate" {
return unimplemented.NewWithIssue(132164, "MODE = 'immediate' cannot be used with user-defined types")
}
sourceTypes[i] = &desc
}
// TODO(rafi): do we need a different type resolver?
// See https://github.com/cockroachdb/cockroach/issues/132164.
importResolver := importer.MakeImportTypeResolver(sourceTypes)
crossClusterResolver := crosscluster.MakeCrossClusterTypeResolver(sourceTypes)

// If the user asked to ignore "ttl-deletes", make sure that at least one of
// the source tables actually has a TTL job which sets the omit bit that
Expand All @@ -231,7 +222,7 @@ func createLogicalReplicationStreamPlanHook(
for i, name := range srcTableNames {
td := spec.TableDescriptors[name]
cpy := tabledesc.NewBuilder(&td).BuildCreatedMutableTable()
if err := typedesc.HydrateTypesInDescriptor(ctx, cpy, importResolver); err != nil {
if err := typedesc.HydrateTypesInDescriptor(ctx, cpy, crossClusterResolver); err != nil {
return err
}
srcTableDescs[i] = cpy.TableDesc()
Expand Down
7 changes: 2 additions & 5 deletions pkg/ccl/crosscluster/logical/logical_replication_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/importer"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
"github.com/cockroachdb/cockroach/pkg/sql/sem/catid"
Expand Down Expand Up @@ -359,15 +358,13 @@ func (p *logicalReplicationPlanner) generatePlanImpl(
defaultFnOID = catid.FuncIDToOID(catid.DescID(defaultFnID))
}

// TODO(msbutler): is this import type resolver kosher? Should put in a new package.
// See https://github.com/cockroachdb/cockroach/issues/132164.
importResolver := importer.MakeImportTypeResolver(plan.SourceTypes)
crossClusterResolver := crosscluster.MakeCrossClusterTypeResolver(plan.SourceTypes)
tableMetadataByDestID := make(map[int32]execinfrapb.TableReplicationMetadata)
if err := sql.DescsTxn(ctx, execCfg, func(ctx context.Context, txn isql.Txn, descriptors *descs.Collection) error {
for _, pair := range payload.ReplicationPairs {
srcTableDesc := plan.DescriptorMap[pair.SrcDescriptorID]
cpy := tabledesc.NewBuilder(&srcTableDesc).BuildCreatedMutableTable()
if err := typedesc.HydrateTypesInDescriptor(ctx, cpy, importResolver); err != nil {
if err := typedesc.HydrateTypesInDescriptor(ctx, cpy, crossClusterResolver); err != nil {
return err
}
srcTableDesc = *cpy.TableDesc()
Expand Down
39 changes: 26 additions & 13 deletions pkg/ccl/crosscluster/logical/logical_replication_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1928,19 +1928,32 @@ func TestUserDefinedTypes(t *testing.T) {
dbA.Exec(t, "CREATE TYPE my_composite AS (a INT, b TEXT)")
dbB.Exec(t, "CREATE TYPE my_composite AS (a INT, b TEXT)")

dbA.Exec(t, "CREATE TABLE data (pk INT PRIMARY KEY, val1 my_enum DEFAULT 'two', val2 my_composite)")
dbB.Exec(t, "CREATE TABLE data (pk INT PRIMARY KEY, val1 my_enum DEFAULT 'two', val2 my_composite)")

dbB.Exec(t, "INSERT INTO data VALUES (1, 'one', (3, 'cat'))")
// Force default expression evaluation.
dbB.Exec(t, "INSERT INTO data (pk, val2) VALUES (2, (4, 'dog'))")

var jobAID jobspb.JobID
dbA.QueryRow(t, "CREATE LOGICAL REPLICATION STREAM FROM TABLE data ON $1 INTO TABLE data", dbBURL.String()).Scan(&jobAID)
WaitUntilReplicatedTime(t, s.Clock().Now(), dbA, jobAID)
require.NoError(t, replicationtestutils.CheckEmptyDLQs(ctx, dbA.DB, "A"))
dbB.CheckQueryResults(t, "SELECT * FROM data", [][]string{{"1", "one", "(3,cat)"}, {"2", "two", "(4,dog)"}})
dbA.CheckQueryResults(t, "SELECT * FROM data", [][]string{{"1", "one", "(3,cat)"}, {"2", "two", "(4,dog)"}})
for _, mode := range []string{"validated", "immediate"} {
t.Run(mode, func(t *testing.T) {
dbA.Exec(t, "CREATE TABLE data (pk INT PRIMARY KEY, val1 my_enum DEFAULT 'two', val2 my_composite)")
dbB.Exec(t, "CREATE TABLE data (pk INT PRIMARY KEY, val1 my_enum DEFAULT 'two', val2 my_composite)")

dbB.Exec(t, "INSERT INTO data VALUES (1, 'one', (3, 'cat'))")
// Force default expression evaluation.
dbB.Exec(t, "INSERT INTO data (pk, val2) VALUES (2, (4, 'dog'))")

var jobAID jobspb.JobID
dbA.QueryRow(t,
fmt.Sprintf("CREATE LOGICAL REPLICATION STREAM FROM TABLE data ON $1 INTO TABLE data WITH mode = %s", mode),
dbBURL.String(),
).Scan(&jobAID)
WaitUntilReplicatedTime(t, s.Clock().Now(), dbA, jobAID)
require.NoError(t, replicationtestutils.CheckEmptyDLQs(ctx, dbA.DB, "A"))
dbB.CheckQueryResults(t, "SELECT * FROM data", [][]string{{"1", "one", "(3,cat)"}, {"2", "two", "(4,dog)"}})
dbA.CheckQueryResults(t, "SELECT * FROM data", [][]string{{"1", "one", "(3,cat)"}, {"2", "two", "(4,dog)"}})

dbA.Exec(t, "CANCEL JOB $1", jobAID)
jobutils.WaitForJobToCancel(t, dbA, jobAID)

dbA.Exec(t, "DROP TABLE data")
dbB.Exec(t, "DROP TABLE data")
})
}
}

// TestLogicalReplicationCreationChecks verifies that we check that the table
Expand Down
13 changes: 13 additions & 0 deletions pkg/ccl/crosscluster/logical/lww_row_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metamorphic"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
Expand Down Expand Up @@ -90,6 +91,12 @@ func (q *queryBuilder) AddRow(row cdcevent.Row) error {
return err
}
if err := it.Datum(func(d tree.Datum, col cdcevent.ResultColumn) error {
if dEnum, ok := d.(*tree.DEnum); ok {
// Override the type to Unknown to avoid a mismatched type OID error
// during execution. Note that Unknown is the type used by default
// when a SQL statement is executed without type hints.
dEnum.EnumTyp = types.Unknown
}
q.scratchDatums = append(q.scratchDatums, d)
return nil
}); err != nil {
Expand All @@ -116,6 +123,12 @@ func (q *queryBuilder) AddRowDefaultNull(row *cdcevent.Row) error {
continue
}
if err := it.Datum(func(d tree.Datum, col cdcevent.ResultColumn) error {
if dEnum, ok := d.(*tree.DEnum); ok {
// Override the type to Unknown to avoid a mismatched type OID error
// during execution. Note that Unknown is the type used by default
// when a SQL statement is executed without type hints.
dEnum.EnumTyp = types.Unknown
}
q.scratchDatums = append(q.scratchDatums, d)
return nil
}); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/roachtest/roachtestflags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ var (
List of <version>=<path to cockroach binary>. If a certain version <ver>
is present in the list, the respective binary will be used when a
mixed-version test asks for the respective binary, instead of roachprod
stage <ver>. Example: 20.1.4=cockroach-20.1,20.2.0=cockroach-20.2.`,
stage <ver>. Example: v20.1.4=cockroach-20.1,v20.2.0=cockroach-20.2.`,
})

SlackToken string
Expand Down
7 changes: 7 additions & 0 deletions pkg/cmd/roachtest/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log/logpb"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/version"
"github.com/cockroachdb/errors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -304,6 +305,12 @@ func initRunFlagsBinariesAndLibraries(cmd *cobra.Command) error {
if roachtestflags.SelectProbability > 0 && roachtestflags.SelectProbability < 1 {
fmt.Printf("Matching tests will be selected with probability %.2f\n", roachtestflags.SelectProbability)
}

for override := range roachtestflags.VersionsBinaryOverride {
if _, err := version.Parse(override); err != nil {
return errors.Wrapf(err, "binary version override %s is not a valid version", override)
}
}
return nil
}

Expand Down
6 changes: 1 addition & 5 deletions pkg/kv/kvclient/rangefeed/rangefeed_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1456,7 +1456,7 @@ func TestRangeFeedIntentResolutionRace(t *testing.T) {
}
eventC := make(chan *kvpb.RangeFeedEvent)
sink := newChannelSink(ctx, eventC)
require.NoError(t, s3.RangeFeed(&req, sink)) // check if we've errored yet
require.NoError(t, s3.RangeFeed(sink.ctx, &req, sink)) // check if we've errored yet
require.NoError(t, sink.Error())
t.Logf("started rangefeed on %s", repl3)

Expand Down Expand Up @@ -1628,10 +1628,6 @@ func newChannelSink(ctx context.Context, ch chan<- *kvpb.RangeFeedEvent) *channe
return &channelSink{ctx: ctx, ch: ch, done: make(chan *kvpb.Error, 1)}
}

func (c *channelSink) Context() context.Context {
return c.ctx
}

func (c *channelSink) SendUnbufferedIsThreadSafe() {}

func (c *channelSink) SendUnbuffered(e *kvpb.RangeFeedEvent) error {
Expand Down
2 changes: 0 additions & 2 deletions pkg/kv/kvpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2520,8 +2520,6 @@ func (s *ScanStats) String() string {

// RangeFeedEventSink is an interface for sending a single rangefeed event.
type RangeFeedEventSink interface {
// Context returns the context for this stream.
Context() context.Context
// SendUnbuffered blocks until it sends the RangeFeedEvent, the stream is
// done, or the stream breaks. Send must be safe to call on the same stream in
// different goroutines.
Expand Down
6 changes: 1 addition & 5 deletions pkg/kv/kvserver/client_replica_circuit_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,10 +457,6 @@ func newDummyStream(ctx context.Context, name string) *dummyStream {
}
}

func (s *dummyStream) Context() context.Context {
return s.ctx
}

func (s *dummyStream) SendUnbufferedIsThreadSafe() {}

func (s *dummyStream) SendUnbuffered(ev *kvpb.RangeFeedEvent) error {
Expand Down Expand Up @@ -493,7 +489,7 @@ func waitReplicaRangeFeed(
return stream.SendUnbuffered(&event)
}

err := r.RangeFeed(req, stream, nil /* pacer */)
err := r.RangeFeed(stream.ctx, req, stream, nil /* pacer */)
if err != nil {
return sendErrToStream(kvpb.NewError(err))
}
Expand Down
17 changes: 17 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,23 @@ var validateTokenRange = settings.WithValidateInt(func(b int64) error {
return nil
})

// TokenCounterResetEpoch is an escape hatch for administrators that should
// never be needed. By incrementing this epoch (or changing it to a value
// different than before), an administrator can restore all RACv2 token
// counters to their default (full) state. This can be used to counteract a
// token leakage bug, but note that if there is indeed a bug, the leakage may
// resume, and tokens may again be exhausted. So it is expected that this will
// be used together with disabling replication admission control by setting
// kvadmission.flow_control.enabled=false. Note that disabling replication
// admission control should be sufficient, since it should unblock work that
// is waiting-for-eval. But in case there is another bug that is preventing
// such work from unblocking, this setting may be useful.
var TokenCounterResetEpoch = settings.RegisterIntSetting(
settings.SystemOnly,
"kvadmission.flow_controller.token_reset_epoch",
"escape hatch for administrators to reset all token counters to their default (full) state",
0)

// V2EnabledWhenLeaderLevel captures the level at which RACv2 is enabled when
// this replica is the leader.
//
Expand Down
Loading

0 comments on commit 23adad6

Please sign in to comment.