Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…db#61397

61036: streamclient: add client for sinkless CREATE REPLICATION STREAM r=miretskiy,dt a=pbardea

This commit adds support for a stream client that can connect to a
cluster and start replicating changes that happen on that stream.

This commit also reorganizes some testing helpers to be used between
streamclient and streamproducer.

Release justification: low impact (very experimental feature), high reward (makes cluster replication work end to end)

Release note: None

61106: sql: disallow creation of interleaved partitioned indexes r=mgartner a=mgartner

Previously, creating interleaved partitioned indexes panicked. This
commit disallows their creation to prevent panicking.

Fixes cockroachdb#60699

Release justification: This is a low risk change that prevents panics
when attempting to create interleaved partitioned tables.

Release note (bug fix): Creating interleaved partitioned indexes is now
disallowed. Previously, the database would crash when trying to create
one.

61347: execinfra: clean up ProcessorBase r=yuzefovich a=yuzefovich

This commit removes redundant context argument from
`trailingMetaCallback` in favor of just using `ProcessorBase.Ctx` when
needed. This was already actually the case since the callback was always
called with `ProcessorBase.Ctx`. We also had the wrong sequences of
actions when collecting trailing metadata in inverted joiner, join
reader, and zigzag joiner, where we first called `InternalClose` and
then generated the metadata. That is wrong because `InternalClose`
replaces the context used throughout the operation of a processor with
the "original" context (the one passed into `StartInternal`). This is
now fixed.

Additionally, a few unnecessary method overwrites are removed and
`ProcessorBase` now provides the default implementation of
`ConsumerClosed` (which simply calls `InternalClose`).

This commit also makes a slight adjustment to context management of the
change aggregator.

Release justification: low-risk cleanup.

Release note: None

61397: server: remove unnecessary % escape r=stevendanna a=stevendanna

As of 1982047 this log line is now formatted with Go's
templating system, which doesn't require that % is escaped.

Before:

```
I210303 09:56:17.459802 311 2@server/status/runtime.go:553 ⋮ [n1] 76
runtime stats: 106 MiB RSS, 261 goroutines (stacks: 5.0 MiB), 29
MiB/63 MiB Go alloc/total (heap fragmentation: 6.8 MiB, heap reserved:
11 MiB, heap released: 12 MiB), 7.6 MiB/13 MiB CGO alloc/total (0.6
CGO/sec), 1.8/1.2 %%(u/s)time, 0.0 %%gc (0x ), 17 KiB/22 KiB (r/w)net
```

After:

```
I210303 09:59:17.607021 59 2@server/status/runtime.go:553 ⋮ [n1] 71
runtime stats: 92 MiB RSS, 255 goroutines (stacks: 3.7 MiB), 27 MiB/48
MiB Go alloc/total (heap fragmentation: 4.8 MiB, heap reserved: 1.3
MiB, heap released: 27 MiB), 10 MiB/15 MiB CGO alloc/total (0.0
CGO/sec), 0.0/0.0 %(u/s)time, 0.0 %gc (0x), 30 KiB/34 KiB (r/w)net
```

Release justification: Negligible risk.
Release note: None

Co-authored-by: Paul Bardea <pbardea@gmail.com>
Co-authored-by: Marcus Gartner <marcus@cockroachlabs.com>
Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
Co-authored-by: Steven Danna <danna@cockroachlabs.com>
  • Loading branch information
5 people committed Mar 3, 2021
5 parents 3e3521c + ca845ce + 5306d4b + 93e39fc + 85450d6 commit 320dab6
Show file tree
Hide file tree
Showing 53 changed files with 951 additions and 413 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/split_and_scatter_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func newSplitAndScatterProcessor(
if err := ssp.Init(ssp, post, splitAndScatterOutputTypes, flowCtx, processorID, output, nil, /* memMonitor */
execinfra.ProcStateOpts{
InputsToDrain: nil, // there are no inputs to drain
TrailingMetaCallback: func(context.Context) []execinfrapb.ProducerMetadata {
TrailingMetaCallback: func() []execinfrapb.ProducerMetadata {
ssp.close()
return nil
},
Expand Down
22 changes: 13 additions & 9 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func newChangeAggregatorProcessor(
output,
memMonitor,
execinfra.ProcStateOpts{
TrailingMetaCallback: func(context.Context) []execinfrapb.ProducerMetadata {
TrailingMetaCallback: func() []execinfrapb.ProducerMetadata {
ca.close()
return nil
},
Expand Down Expand Up @@ -191,11 +191,14 @@ func newChangeAggregatorProcessor(

// Start is part of the RowSource interface.
func (ca *changeAggregator) Start(ctx context.Context) {
ctx, ca.cancel = context.WithCancel(ctx)
// StartInternal called at the beginning of the function because there are
// early returns if errors are detected.
ctx = ca.StartInternal(ctx, changeAggregatorProcName)

// Derive a separate context so that we can shutdown the poller. Note that
// we need to update both ctx (used throughout this function) and
// ProcessorBase.Ctx (used in all other methods) to the new context.
ctx, ca.cancel = context.WithCancel(ctx)
ca.Ctx = ctx

spans := ca.setupSpansAndFrontier()
timestampOracle := &changeAggregatorLowerBoundOracle{sf: ca.spanFrontier, initialInclusiveLowerBound: ca.spec.Feed.StatementTime}

Expand Down Expand Up @@ -369,10 +372,11 @@ func (ca *changeAggregator) setupSpansAndFrontier() []roachpb.Span {
// checking.
func (ca *changeAggregator) close() {
if ca.InternalClose() {
// Shut down the poller if it wasn't already.
if ca.cancel != nil {
ca.cancel()
}
// Shut down the poller if it wasn't already. Note that it will cancel
// the context used by all components, but ca.Ctx has been updated by
// InternalClose() to the "original" context (the one passed into
// StartInternal()).
ca.cancel()
// Wait for the poller to finish shutting down.
if ca.kvFeedDoneCh != nil {
<-ca.kvFeedDoneCh
Expand Down Expand Up @@ -874,7 +878,7 @@ func newChangeFrontierProcessor(
output,
memMonitor,
execinfra.ProcStateOpts{
TrailingMetaCallback: func(context.Context) []execinfrapb.ProducerMetadata {
TrailingMetaCallback: func() []execinfrapb.ProducerMetadata {
cf.close()
return nil
},
Expand Down
5 changes: 0 additions & 5 deletions pkg/ccl/importccl/import_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,6 @@ func (idp *readImportDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Pro
}, nil
}

// ConsumerDone is part of the RowSource interface.
func (idp *readImportDataProcessor) ConsumerDone() {
idp.MoveToDraining(nil /* err */)
}

// ConsumerClosed is part of the RowSource interface.
func (idp *readImportDataProcessor) ConsumerClosed() {
// The consumer is done, Next() will not be called again.
Expand Down
12 changes: 12 additions & 0 deletions pkg/ccl/logictestccl/testdata/logic_test/partitioning_index
Original file line number Diff line number Diff line change
Expand Up @@ -212,3 +212,15 @@ CREATE TABLE public.t60019 (
FAMILY fam_0_pk_a_b (pk, a, b)
)
-- Warning: Partitioned table with no zone configurations.

# Regression test for #60699. Do not allow creation of interleaved partitioned
# indexes.
statement ok
SET CLUSTER SETTING sql.defaults.interleaved_tables.enabled = true;
CREATE TABLE t60699_a (a INT PRIMARY KEY);
CREATE TABLE t60699_b (b INT PRIMARY KEY, a INT REFERENCES t60699_a (a));

statement error interleaved indexes cannot be partitioned
CREATE INDEX i ON t60699_b (a) INTERLEAVE IN PARENT t60699_a (a) PARTITION BY LIST (a) (
partition part1 VALUES IN (1)
)
26 changes: 24 additions & 2 deletions pkg/ccl/streamingccl/streamclient/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ go_library(
name = "streamclient",
srcs = [
"client.go",
"cockroach_sinkless_replication_client.go",
"random_stream_client.go",
"stream_client.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient",
visibility = ["//visibility:public"],
Expand All @@ -20,21 +20,43 @@ go_library(
"//pkg/sql/rowenc",
"//pkg/sql/sem/tree",
"//pkg/util/hlc",
"//pkg/util/protoutil",
"//pkg/util/randutil",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
],
)

go_test(
name = "streamclient_test",
size = "small",
srcs = ["client_test.go"],
srcs = [
"client_test.go",
"cockroach_sinkless_replication_client_test.go",
"main_test.go",
],
embed = [":streamclient"],
deps = [
"//pkg/ccl/changefeedccl",
"//pkg/ccl/kvccl/kvtenantccl",
"//pkg/ccl/storageccl",
"//pkg/ccl/streamingccl",
"//pkg/ccl/streamingccl/streamingtest",
"//pkg/ccl/streamingccl/streamproducer",
"//pkg/ccl/utilccl",
"//pkg/roachpb",
"//pkg/security",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/sql/catalog/catalogkv",
"//pkg/testutils/serverutils",
"//pkg/testutils/testcluster",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/randutil",
"//pkg/util/timeutil",
"@com_github_stretchr_testify//require",
],
)
13 changes: 9 additions & 4 deletions pkg/ccl/streamingccl/streamclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ package streamclient

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/errors"
)

// Client provides a way for the stream ingestion job to consume a
Expand All @@ -28,7 +29,7 @@ type Client interface {
//
// Canceling the context will stop reading the partition and close the event
// channel.
ConsumePartition(ctx context.Context, address streamingccl.PartitionAddress, startTime time.Time) (chan streamingccl.Event, chan error, error)
ConsumePartition(ctx context.Context, address streamingccl.PartitionAddress, startTime hlc.Timestamp) (chan streamingccl.Event, chan error, error)
}

// NewStreamClient creates a new stream client based on the stream
Expand All @@ -41,13 +42,17 @@ func NewStreamClient(streamAddress streamingccl.StreamAddress) (Client, error) {
}

switch streamURL.Scheme {
case TestScheme:
case "postgres", "postgresql":
// The canonical PostgreSQL URL scheme is "postgresql", however our
// own client commands also accept "postgres".
return &sinklessReplicationClient{}, nil
case RandomGenScheme:
streamClient, err = newRandomStreamClient(streamURL)
if err != nil {
return streamClient, err
}
default:
streamClient = &mockClient{}
return nil, errors.Newf("stream replication from scheme %q is unsupported", streamURL.Scheme)
}

return streamClient, nil
Expand Down
5 changes: 2 additions & 3 deletions pkg/ccl/streamingccl/streamclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ package streamclient
import (
"context"
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand All @@ -35,7 +34,7 @@ func (sc testStreamClient) GetTopology(

// ConsumePartition implements the Client interface.
func (sc testStreamClient) ConsumePartition(
_ context.Context, _ streamingccl.PartitionAddress, _ time.Time,
_ context.Context, _ streamingccl.PartitionAddress, _ hlc.Timestamp,
) (chan streamingccl.Event, chan error, error) {
sampleKV := roachpb.KeyValue{
Key: []byte("key_1"),
Expand All @@ -62,7 +61,7 @@ func ExampleClient() {
panic(err)
}

startTimestamp := timeutil.Now()
startTimestamp := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}

for _, partition := range topology.Partitions {
eventCh, _ /* errCh */, err := client.ConsumePartition(context.Background(), partition, startTimestamp)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Copyright 2021 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package streamclient

import (
"context"
gosql "database/sql"
"fmt"
"strconv"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/errors"
)

// sinklessReplicationClient creates and reads a stream from the source cluster.
type sinklessReplicationClient struct{}

var _ Client = &sinklessReplicationClient{}

// GetTopology implements the Client interface.
func (m *sinklessReplicationClient) GetTopology(
sa streamingccl.StreamAddress,
) (streamingccl.Topology, error) {
// The core changefeed clients only have 1 partition, and it's located at the
// stream address.
return streamingccl.Topology{
Partitions: []streamingccl.PartitionAddress{streamingccl.PartitionAddress(sa)},
}, nil
}

// ConsumePartition implements the Client interface.
func (m *sinklessReplicationClient) ConsumePartition(
ctx context.Context, pa streamingccl.PartitionAddress, startTime hlc.Timestamp,
) (chan streamingccl.Event, chan error, error) {
pgURL, err := pa.URL()
if err != nil {
return nil, nil, err
}

q := pgURL.Query()
tenantToReplicate := q.Get(TenantID)
if len(tenantToReplicate) == 0 {
return nil, nil, errors.New("no tenant specified")
}
tenantID, err := strconv.Atoi(tenantToReplicate)
if err != nil {
return nil, nil, errors.Wrap(err, "parsing tenant")
}

streamTenantQuery := fmt.Sprintf(
`CREATE REPLICATION STREAM FOR TENANT %d`, tenantID)
if startTime.WallTime != 0 {
streamTenantQuery = fmt.Sprintf(
`CREATE REPLICATION STREAM FOR TENANT %d WITH cursor='%s'`, tenantID, startTime.AsOfSystemTime())
}

db, err := gosql.Open("postgres", pgURL.String())
if err != nil {
return nil, nil, err
}

conn, err := db.Conn(ctx)
if err != nil {
return nil, nil, err
}

_, err = conn.QueryContext(ctx, `SET enable_experimental_stream_replication = true`)
if err != nil {
return nil, nil, err
}
rows, err := conn.QueryContext(ctx, streamTenantQuery)
if err != nil {
return nil, nil, errors.Wrap(err, "creating source replication stream")
}

eventCh := make(chan streamingccl.Event)
errCh := make(chan error, 1)

go func() {
defer close(eventCh)
defer close(errCh)
defer db.Close()
defer rows.Close()
for rows.Next() {
var ignoreTopic gosql.NullString
var k, v []byte
if err := rows.Scan(&ignoreTopic, &k, &v); err != nil {
errCh <- err
return
}

var event streamingccl.Event
if len(k) == 0 {
var resolved hlc.Timestamp
if err := protoutil.Unmarshal(v, &resolved); err != nil {
errCh <- err
return
}
event = streamingccl.MakeCheckpointEvent(resolved)
} else {
var kv roachpb.KeyValue
kv.Key = k
if err := protoutil.Unmarshal(v, &kv.Value); err != nil {
errCh <- err
return
}
event = streamingccl.MakeKVEvent(kv)
}

select {
case eventCh <- event:
case <-ctx.Done():
errCh <- ctx.Err()
return
}
}
}()

return eventCh, errCh, nil
}
Loading

0 comments on commit 320dab6

Please sign in to comment.