From 7a7577b9f1f3e04b6739ad84df43e8fae235efa5 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Tue, 29 Dec 2020 00:29:40 -0500 Subject: [PATCH 1/2] kvclient/rangefeed: introduce package for using rangefeeds This commit introduces a new package underneath kvclient to simplify using rangefeeds. Namely, it provides sane retry and shutdown logic as well as support for performing an initial data scan. It also hides the nasty unwrapping of the DistSender as performed elsewhere. A longer-term vision here would be to support multiple spans and utilize this layer in the `kvfeed` of `changefeedccl`. Release note: None --- pkg/kv/kvclient/rangefeed/BUILD.bazel | 62 +++ pkg/kv/kvclient/rangefeed/config.go | 72 ++++ pkg/kv/kvclient/rangefeed/db_adapter.go | 97 +++++ .../rangefeed/db_adapter_external_test.go | 87 +++++ pkg/kv/kvclient/rangefeed/doc.go | 21 + pkg/kv/kvclient/rangefeed/helpers_test.go | 25 ++ pkg/kv/kvclient/rangefeed/main_test.go | 34 ++ pkg/kv/kvclient/rangefeed/rangefeed.go | 271 +++++++++++++ .../rangefeed/rangefeed_external_test.go | 98 +++++ .../kvclient/rangefeed/rangefeed_mock_test.go | 369 ++++++++++++++++++ 10 files changed, 1136 insertions(+) create mode 100644 pkg/kv/kvclient/rangefeed/BUILD.bazel create mode 100644 pkg/kv/kvclient/rangefeed/config.go create mode 100644 pkg/kv/kvclient/rangefeed/db_adapter.go create mode 100644 pkg/kv/kvclient/rangefeed/db_adapter_external_test.go create mode 100644 pkg/kv/kvclient/rangefeed/doc.go create mode 100644 pkg/kv/kvclient/rangefeed/helpers_test.go create mode 100644 pkg/kv/kvclient/rangefeed/main_test.go create mode 100644 pkg/kv/kvclient/rangefeed/rangefeed.go create mode 100644 pkg/kv/kvclient/rangefeed/rangefeed_external_test.go create mode 100644 pkg/kv/kvclient/rangefeed/rangefeed_mock_test.go diff --git a/pkg/kv/kvclient/rangefeed/BUILD.bazel b/pkg/kv/kvclient/rangefeed/BUILD.bazel new file mode 100644 index 00000000000..3155f0db97d --- /dev/null +++ b/pkg/kv/kvclient/rangefeed/BUILD.bazel @@ -0,0 +1,62 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "rangefeed", + srcs = [ + "config.go", + "db_adapter.go", + "doc.go", + "rangefeed.go", + ], + importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed", + visibility = ["//visibility:public"], + deps = [ + "//pkg/kv", + "//pkg/kv/kvclient/kvcoord", + "//pkg/roachpb", + "//pkg/util/hlc", + "//pkg/util/log", + "//pkg/util/retry", + "//pkg/util/span", + "//pkg/util/stop", + "//pkg/util/timeutil", + "//vendor/github.com/cockroachdb/errors", + "//vendor/github.com/cockroachdb/logtags", + ], +) + +go_test( + name = "rangefeed_test", + srcs = [ + "db_adapter_external_test.go", + "helpers_test.go", + "main_test.go", + "rangefeed_external_test.go", + "rangefeed_mock_test.go", + ], + embed = [":rangefeed"], + deps = [ + "//pkg/base", + "//pkg/roachpb", + "//pkg/rpc", + "//pkg/security", + "//pkg/security/securitytest", + "//pkg/server", + "//pkg/testutils", + "//pkg/testutils/serverutils", + "//pkg/testutils/testcluster", + "//pkg/util/encoding", + "//pkg/util/hlc", + "//pkg/util/leaktest", + "//pkg/util/log", + "//pkg/util/log/logpb", + "//pkg/util/retry", + "//pkg/util/stop", + "//pkg/util/syncutil", + "//pkg/util/timeutil", + "//vendor/github.com/cockroachdb/errors", + "//vendor/github.com/stretchr/testify/assert", + "//vendor/github.com/stretchr/testify/require", + "//vendor/google.golang.org/grpc", + ], +) diff --git a/pkg/kv/kvclient/rangefeed/config.go b/pkg/kv/kvclient/rangefeed/config.go new file mode 100644 index 00000000000..68d07b9d82c --- /dev/null +++ b/pkg/kv/kvclient/rangefeed/config.go @@ -0,0 +1,72 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package rangefeed + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/util/retry" +) + +// Option configures a RangeFeed. +type Option interface { + set(*config) +} + +type config struct { + retryOptions retry.Options + onInitialScanDone OnInitialScanDone + withInitialScan bool + withDiff bool +} + +type optionFunc func(*config) + +func (o optionFunc) set(c *config) { o(c) } + +// OnInitialScanDone is called when an initial scan is finished before any rows +// from the rangefeed are supplied. +type OnInitialScanDone func(ctx context.Context) + +// WithInitialScan enables an initial scan of the data in the span. The rows of +// an initial scan will be passed to the value function used to construct the +// RangeFeed. Upon completion of the initial scan, the passed function (if +// non-nil) will be called. The initial scan may be restarted and thus rows +// may be observed multiple times. The caller cannot rely on rows being returned +// in order. +func WithInitialScan(f OnInitialScanDone) Option { + return optionFunc(func(c *config) { + c.withInitialScan = true + c.onInitialScanDone = f + }) +} + +// WithDiff makes an option to enable an initial scan which defaults to +// false. +func WithDiff() Option { return withDiff } + +var withDiff = optionFunc(func(c *config) { c.withDiff = true }) + +// WithRetry configures the retry options for the rangefeed. +func WithRetry(options retry.Options) Option { + return optionFunc(func(c *config) { + c.retryOptions = options + }) +} + +var defaultConfig = config{} + +func initConfig(c *config, options []Option) { + *c = defaultConfig + for _, o := range options { + o.set(c) + } +} diff --git a/pkg/kv/kvclient/rangefeed/db_adapter.go b/pkg/kv/kvclient/rangefeed/db_adapter.go new file mode 100644 index 00000000000..68dbdde533b --- /dev/null +++ b/pkg/kv/kvclient/rangefeed/db_adapter.go @@ -0,0 +1,97 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package rangefeed + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/errors" +) + +// dbAdapter is an implementation of db a *kv.DB. +type dbAdapter struct { + db *kv.DB + distSender *kvcoord.DistSender + targetScanBytes int64 +} + +// TODO(ajwerner): Hook up a memory monitor. Fortunately most users of the +// initial scan are reading scant amounts of data. + +// defaultTargetScanBytes was pulled out of thin air. The main reason is that +// this thing is not hooked up to a memory monitor. +const defaultTargetScanBytes = 1 << 19 // 512 KiB + +// newDBAdapter construct a kvDB using a *kv.DB. +func newDBAdapter(db *kv.DB) (*dbAdapter, error) { + dbClient := dbAdapter{ + db: db, + targetScanBytes: defaultTargetScanBytes, + } + { + txnWrapperSender, ok := db.NonTransactionalSender().(*kv.CrossRangeTxnWrapperSender) + if !ok { + return nil, errors.Errorf("failed to extract a %T from %T", + (*kv.CrossRangeTxnWrapperSender)(nil), db.NonTransactionalSender()) + } + distSender, ok := txnWrapperSender.Wrapped().(*kvcoord.DistSender) + if !ok { + return nil, errors.Errorf("failed to extract a %T from %T", + (*kvcoord.DistSender)(nil), txnWrapperSender.Wrapped()) + } + dbClient.distSender = distSender + } + return &dbClient, nil +} + +// RangeFeed is part of the kvDB interface. +func (dbc *dbAdapter) RangeFeed( + ctx context.Context, + span roachpb.Span, + startFrom hlc.Timestamp, + withDiff bool, + eventC chan<- *roachpb.RangeFeedEvent, +) error { + return dbc.distSender.RangeFeed(ctx, span, startFrom, withDiff, eventC) +} + +// Scan is part of the kvDB interface. +func (dbc *dbAdapter) Scan( + ctx context.Context, span roachpb.Span, asOf hlc.Timestamp, rowFn func(value roachpb.KeyValue), +) error { + return dbc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + txn.SetFixedTimestamp(ctx, asOf) + sp := span + var b kv.Batch + for { + b.Header.TargetBytes = dbc.targetScanBytes + b.Scan(sp.Key, sp.EndKey) + if err := txn.Run(ctx, &b); err != nil { + return err + } + res := b.Results[0] + for _, row := range res.Rows { + rowFn(roachpb.KeyValue{Key: row.Key, Value: *row.Value}) + } + if res.ResumeSpan == nil { + return nil + } + sp = res.ResumeSpanAsValue() + b = kv.Batch{} + } + }) +} + +var _ kvDB = (*dbAdapter)(nil) diff --git a/pkg/kv/kvclient/rangefeed/db_adapter_external_test.go b/pkg/kv/kvclient/rangefeed/db_adapter_external_test.go new file mode 100644 index 00000000000..26c978c24f8 --- /dev/null +++ b/pkg/kv/kvclient/rangefeed/db_adapter_external_test.go @@ -0,0 +1,87 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package rangefeed_test + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/encoding" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/stretchr/testify/require" +) + +// TestDBClientScan tests that the logic in Scan on the dbAdapter is sane. +// The rangefeed logic is a literal passthrough so it's not getting a lot of +// testing directly. +func TestDBClientScan(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) + defer tc.Stopper().Stop(ctx) + + db := tc.Server(0).DB() + beforeAny := db.Clock().Now() + scratchKey := tc.ScratchRange(t) + mkKey := func(k string) roachpb.Key { + return encoding.EncodeStringAscending(scratchKey, k) + } + require.NoError(t, db.Put(ctx, mkKey("a"), 1)) + require.NoError(t, db.Put(ctx, mkKey("b"), 2)) + afterB := db.Clock().Now() + require.NoError(t, db.Put(ctx, mkKey("c"), 3)) + + cli, err := rangefeed.NewDBAdapter(db) + require.NoError(t, err) + sp := roachpb.Span{ + Key: scratchKey, + EndKey: scratchKey.PrefixEnd(), + } + + // Ensure that the timestamps are properly respected by not observing any + // values at the timestamp preceding writes. + { + var responses []roachpb.KeyValue + require.NoError(t, cli.Scan(ctx, sp, beforeAny, func(value roachpb.KeyValue) { + responses = append(responses, value) + })) + require.Len(t, responses, 0) + } + + // Ensure that expected values are seen at the intermediate timestamp. + { + var responses []roachpb.KeyValue + require.NoError(t, cli.Scan(ctx, sp, afterB, func(value roachpb.KeyValue) { + responses = append(responses, value) + })) + require.Len(t, responses, 2) + require.Equal(t, mkKey("a"), responses[0].Key) + va, err := responses[0].Value.GetInt() + require.NoError(t, err) + require.Equal(t, int64(1), va) + } + + // Ensure that pagination doesn't break anything. + cli.SetTargetScanBytes(1) + { + var responses []roachpb.KeyValue + require.NoError(t, cli.Scan(ctx, sp, db.Clock().Now(), func(value roachpb.KeyValue) { + responses = append(responses, value) + })) + require.Len(t, responses, 3) + } + +} diff --git a/pkg/kv/kvclient/rangefeed/doc.go b/pkg/kv/kvclient/rangefeed/doc.go new file mode 100644 index 00000000000..256b4c61091 --- /dev/null +++ b/pkg/kv/kvclient/rangefeed/doc.go @@ -0,0 +1,21 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +// Package rangefeed provides a useful client abstraction atop of the rangefeed +// functionality exported by the DistSender. +// +// In particular, the abstraction exported by this package hooks up a stopper, +// and deals with retries upon errors, tracking resolved timestamps along the +// way. +package rangefeed + +// TODO(ajwerner): Rework this logic to encapsulate the multi-span logic in +// changefeedccl/kvfeed. That code also deals with some schema interactions but +// it should be split into two layers. diff --git a/pkg/kv/kvclient/rangefeed/helpers_test.go b/pkg/kv/kvclient/rangefeed/helpers_test.go new file mode 100644 index 00000000000..51ec80ded20 --- /dev/null +++ b/pkg/kv/kvclient/rangefeed/helpers_test.go @@ -0,0 +1,25 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package rangefeed + +// NewDBAdapter allows tests to construct a dbAdapter. +var NewDBAdapter = newDBAdapter + +// NewFactoryWithDB allows tests to construct a factory with an injected db. +var NewFactoryWithDB = newFactory + +// KVDB forwards the definition of kvDB to tests. +type KVDB = kvDB + +// SetTargetScanBytes is exposed for testing. +func (dbc *dbAdapter) SetTargetScanBytes(limit int64) { + dbc.targetScanBytes = limit +} diff --git a/pkg/kv/kvclient/rangefeed/main_test.go b/pkg/kv/kvclient/rangefeed/main_test.go new file mode 100644 index 00000000000..1e7e15018cf --- /dev/null +++ b/pkg/kv/kvclient/rangefeed/main_test.go @@ -0,0 +1,34 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package rangefeed_test + +import ( + "os" + "testing" + + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" +) + +func init() { + security.SetAssetLoader(securitytest.EmbeddedAssets) +} + +func TestMain(m *testing.M) { + serverutils.InitTestServerFactory(server.TestServerFactory) + serverutils.InitTestClusterFactory(testcluster.TestClusterFactory) + os.Exit(m.Run()) +} + +//go:generate ../../../util/leaktest/add-leaktest.sh *_test.go diff --git a/pkg/kv/kvclient/rangefeed/rangefeed.go b/pkg/kv/kvclient/rangefeed/rangefeed.go new file mode 100644 index 00000000000..03ee63d20a5 --- /dev/null +++ b/pkg/kv/kvclient/rangefeed/rangefeed.go @@ -0,0 +1,271 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package rangefeed + +import ( + "context" + "sync" + "time" + + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/cockroach/pkg/util/span" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/logtags" +) + +// TODO(ajwerner): Expose hooks for metrics. +// TODO(ajwerner): Expose access to checkpoints and the frontier. +// TODO(ajwerner): Expose better control over how the retrier gets reset. + +// kvDB is an adapter to the underlying KV store. +type kvDB interface { + + // RangeFeed runs a rangefeed on a given span with the given arguments. + // It encapsulates the RangeFeed method on roachpb.Internal. + RangeFeed( + ctx context.Context, + span roachpb.Span, + startFrom hlc.Timestamp, + withDiff bool, + eventC chan<- *roachpb.RangeFeedEvent, + ) error + + // Scan encapsulates scanning a keyspan at a given point in time. The method + // deals with pagination, calling the caller back for each row. Note that + // the API does not require that the rows be ordered to allow for future + // parallelism. + Scan( + ctx context.Context, + span roachpb.Span, + asOf hlc.Timestamp, + rowFn func(value roachpb.KeyValue), + ) error +} + +// Factory is used to construct RangeFeeds. +type Factory struct { + stopper *stop.Stopper + client kvDB +} + +// NewFactory constructs a new Factory. +func NewFactory(stopper *stop.Stopper, db *kv.DB) (*Factory, error) { + kvDB, err := newDBAdapter(db) + if err != nil { + return nil, err + } + return newFactory(stopper, kvDB), nil +} + +func newFactory(stopper *stop.Stopper, client kvDB) *Factory { + return &Factory{ + stopper: stopper, + client: client, + } +} + +// RangeFeed constructs a new RangeFeed. +func (f *Factory) RangeFeed( + ctx context.Context, + name string, + span roachpb.Span, + initialTimestamp hlc.Timestamp, + onValue func(ctx context.Context, value *roachpb.RangeFeedValue), + options ...Option, +) (_ *RangeFeed, err error) { + r := RangeFeed{ + client: f.client, + stopper: f.stopper, + + initialTimestamp: initialTimestamp, + span: span, + onValue: onValue, + + stopped: make(chan struct{}), + } + initConfig(&r.config, options) + ctx = logtags.AddTag(ctx, "rangefeed", name) + ctx, r.cancel = f.stopper.WithCancelOnQuiesce(ctx) + if err := f.stopper.RunAsyncTask(ctx, "rangefeed", r.run); err != nil { + r.cancel() + return nil, err + } + return &r, nil +} + +// RangeFeed represents a running RangeFeed. +type RangeFeed struct { + config + client kvDB + stopper *stop.Stopper + + initialTimestamp hlc.Timestamp + + span roachpb.Span + onValue func(ctx context.Context, value *roachpb.RangeFeedValue) + + closeOnce sync.Once + cancel context.CancelFunc + stopped chan struct{} +} + +// Close closes the RangeFeed and waits for it to shut down. +func (f *RangeFeed) Close() { + f.closeOnce.Do(func() { + f.cancel() + <-f.stopped + }) +} + +// Run the rangefeed in a loop in the case of failure, likely due to node +// failures or general unavailability. We'll reset the retrier if the +// rangefeed runs for longer than the resetThreshold. +const resetThreshold = 30 * time.Second + +// run will run the RangeFeed until the context is canceled. The context is +// hooked up to the stopper's quiescence. +func (f *RangeFeed) run(ctx context.Context) { + defer close(f.stopped) + r := retry.StartWithCtx(ctx, f.retryOptions) + restartLogEvery := log.Every(10 * time.Second) + if done := f.maybeRunInitialScan(ctx, &restartLogEvery, &r); done { + return + } + + // TODO(ajwerner): Consider adding event buffering. Doing so would require + // draining when the rangefeed fails. + eventCh := make(chan *roachpb.RangeFeedEvent) + errCh := make(chan error) + + // Maintain a frontier in order to resume at a reasonable timestamp. + // TODO(ajwerner): Consider exposing the frontier through a RangeFeed method. + // Doing so would require some synchronization. + frontier := span.MakeFrontier(f.span) + frontier.Forward(f.span, f.initialTimestamp) + for i := 0; r.Next(); i++ { + + // TODO(ajwerner): Figure out what to do if the rangefeed falls behind to + // a point where the frontier timestamp precedes the GC threshold and thus + // will never work. Perhaps an initial scan could be performed again for + // some users. The API currently doesn't make that easy. Perhaps a callback + // should be called in order to allow the client to kill the process or + // something like that. + ts := frontier.Frontier() + log.VEventf(ctx, 1, "starting rangefeed from %v on %v", ts, f.span) + start := timeutil.Now() + + // Note that the below channel send is safe because processEvents will + // wait for the worker to send. RunWorker is safe here because this + // goroutine is run as an AsyncTask. + f.stopper.RunWorker(ctx, func(ctx context.Context) { + errCh <- f.client.RangeFeed(ctx, f.span, ts, f.withDiff, eventCh) + }) + + err := f.processEvents(ctx, frontier, eventCh, errCh) + if err != nil && ctx.Err() == nil && restartLogEvery.ShouldLog() { + log.Warningf(ctx, "rangefeed failed %d times, restarting: %v", + log.Safe(i), err) + } + if ctx.Err() != nil { + log.VEventf(ctx, 1, "exiting rangefeed") + return + } + ranFor := timeutil.Since(start) + log.VEventf(ctx, 1, "restarting rangefeed for %v after %v", + log.Safe(f.span), ranFor) + if ranFor > resetThreshold { + i = 1 + r.Reset() + } + } +} + +// maybeRunInitialScan will attempt to perform an initial data scan if one was +// requested. It will retry in the face of errors and will only return upon +// context cancellation. +func (f *RangeFeed) maybeRunInitialScan( + ctx context.Context, n *log.EveryN, r *retry.Retry, +) (done bool) { + if !f.withInitialScan { + return ctx.Err() != nil // done + } + scan := func(kv roachpb.KeyValue) { + v := roachpb.RangeFeedValue{ + Key: kv.Key, + Value: kv.Value, + } + + // Mark the data as occurring at the initial timestamp, which is the + // timestamp at which it was read. + v.Value.Timestamp = f.initialTimestamp + + // Supply the value from the scan as also the previous value to avoid + // indicating that the value was previously deleted. + if f.withDiff { + v.PrevValue = v.Value + } + + // It's something of a bummer that we must allocate a new value for each + // of these but the contract doesn't indicate that the value cannot be + // retained. + f.onValue(ctx, &v) + } + for { + if !r.Next() { + return true + } + if err := f.client.Scan(ctx, f.span, f.initialTimestamp, scan); err != nil { + if n.ShouldLog() { + log.Warningf(ctx, "failed to perform initial scan: %v", err) + } + } else /* err == nil */ { + if f.onInitialScanDone != nil { + f.onInitialScanDone(ctx) + } + return ctx.Err() != nil // done + } + } +} + +// processEvents processes events sent by the rangefeed on the eventCh. It waits +// for the rangefeed to signal that it has exited by sending on errCh. +func (f *RangeFeed) processEvents( + ctx context.Context, + frontier *span.Frontier, + eventCh <-chan *roachpb.RangeFeedEvent, + errCh <-chan error, +) error { + for { + select { + case ev := <-eventCh: + switch { + case ev.Val != nil: + f.onValue(ctx, ev.Val) + case ev.Checkpoint != nil: + frontier.Forward(ev.Checkpoint.Span, ev.Checkpoint.ResolvedTS) + case ev.Error != nil: + // Intentionally do nothing, we'll get an error returned from the + // call to RangeFeed. + } + case <-ctx.Done(): + // Ensure that the RangeFeed goroutine stops. + <-errCh + return nil + case err := <-errCh: + return err + } + } +} diff --git a/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go b/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go new file mode 100644 index 00000000000..8e10363a1bf --- /dev/null +++ b/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go @@ -0,0 +1,98 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package rangefeed_test + +import ( + "context" + "fmt" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/encoding" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/stretchr/testify/require" +) + +// TestRangeFeedIntegration is a basic integration test demonstrating all of +// the pieces working together. +func TestRangeFeedIntegration(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) + defer tc.Stopper().Stop(ctx) + + db := tc.Server(0).DB() + scratchKey := tc.ScratchRange(t) + scratchKey = scratchKey[:len(scratchKey):len(scratchKey)] + mkKey := func(k string) roachpb.Key { + return encoding.EncodeStringAscending(scratchKey, k) + } + require.NoError(t, db.Put(ctx, mkKey("a"), 1)) + require.NoError(t, db.Put(ctx, mkKey("b"), 2)) + afterB := db.Clock().Now() + require.NoError(t, db.Put(ctx, mkKey("c"), 3)) + + sp := roachpb.Span{ + Key: scratchKey, + EndKey: scratchKey.PrefixEnd(), + } + { + // Enable rangefeeds, otherwise the thing will retry until they are enabled. + _, err := tc.ServerConn(0).Exec("SET CLUSTER SETTING kv.rangefeed.enabled = true") + require.NoError(t, err) + } + + f, err := rangefeed.NewFactory(tc.Stopper(), db) + require.NoError(t, err) + rows := make(chan *roachpb.RangeFeedValue) + initialScanDone := make(chan struct{}) + r, err := f.RangeFeed(ctx, "test", sp, afterB, func(ctx context.Context, value *roachpb.RangeFeedValue) { + fmt.Println(value) + rows <- value + }, rangefeed.WithDiff(), rangefeed.WithInitialScan(func(ctx context.Context) { + close(initialScanDone) + })) + require.NoError(t, err) + defer r.Close() + { + v1 := <-rows + require.Equal(t, mkKey("a"), v1.Key) + // Ensure the initial scan contract is fulfilled when WithDiff is specified. + require.Equal(t, v1.Value, v1.PrevValue) + require.Equal(t, v1.Value.Timestamp, afterB) + } + { + v2 := <-rows + require.Equal(t, mkKey("b"), v2.Key) + } + <-initialScanDone + { + v3 := <-rows + require.Equal(t, mkKey("c"), v3.Key) + } + + // Write a new value for "a" and make sure it is seen. + require.NoError(t, db.Put(ctx, mkKey("a"), 4)) + { + v4 := <-rows + require.Equal(t, mkKey("a"), v4.Key) + prev, err := v4.PrevValue.GetInt() + require.NoError(t, err) + require.Equal(t, int64(1), prev) + updated, err := v4.Value.GetInt() + require.NoError(t, err) + require.Equal(t, int64(4), updated) + } +} diff --git a/pkg/kv/kvclient/rangefeed/rangefeed_mock_test.go b/pkg/kv/kvclient/rangefeed/rangefeed_mock_test.go new file mode 100644 index 00000000000..85597a77b0b --- /dev/null +++ b/pkg/kv/kvclient/rangefeed/rangefeed_mock_test.go @@ -0,0 +1,369 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package rangefeed_test + +import ( + "context" + "regexp" + "strings" + "sync/atomic" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/rpc" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/log/logpb" + "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" +) + +type mockClient struct { + rangefeed func( + ctx context.Context, + span roachpb.Span, + startFrom hlc.Timestamp, + withDiff bool, + eventC chan<- *roachpb.RangeFeedEvent, + ) error + + scan func( + ctx context.Context, + span roachpb.Span, + asOf hlc.Timestamp, + rowFn func(value roachpb.KeyValue), + ) error +} + +func (m *mockClient) RangeFeed( + ctx context.Context, + span roachpb.Span, + startFrom hlc.Timestamp, + withDiff bool, + eventC chan<- *roachpb.RangeFeedEvent, +) error { + return m.rangefeed(ctx, span, startFrom, withDiff, eventC) +} + +func (m *mockClient) Scan( + ctx context.Context, span roachpb.Span, asOf hlc.Timestamp, rowFn func(value roachpb.KeyValue), +) error { + return m.scan(ctx, span, asOf, rowFn) +} + +var _ (rangefeed.KVDB) = (*mockClient)(nil) + +// TestRangefeedMock utilizes the kvDB interface to test the behavior of the +// RangeFeed. +func TestRangeFeedMock(t *testing.T) { + defer leaktest.AfterTest(t)() + shortRetryOptions := retry.Options{ + InitialBackoff: time.Millisecond, + MaxBackoff: 2 * time.Millisecond, + } + t.Run("scan retries", func(t *testing.T) { + stopper := stop.NewStopper() + ctx := context.Background() + defer stopper.Stop(ctx) + ctx, cancel := context.WithCancel(ctx) + var i int + sp := roachpb.Span{ + Key: roachpb.Key("a"), + EndKey: roachpb.Key("b"), + } + ts := hlc.Timestamp{WallTime: 1} + row := roachpb.KeyValue{ + Key: sp.Key, + Value: roachpb.Value{}, + } + const numFailures = 2 + mc := mockClient{ + scan: func(ctx context.Context, span roachpb.Span, asOf hlc.Timestamp, rowFn func(value roachpb.KeyValue)) error { + assert.Equal(t, ts, asOf) + assert.Equal(t, sp, span) + rowFn(row) + if i++; i <= numFailures { + return errors.New("boom") + } + // Ensure the rangefeed doesn't start up by canceling the context prior + // to concluding the scan. + cancel() + return nil + }, + } + f := rangefeed.NewFactoryWithDB(stopper, &mc) + require.NotNil(t, f) + rows := make(chan *roachpb.RangeFeedValue) + + r, err := f.RangeFeed(ctx, "foo", sp, ts, func(ctx context.Context, value *roachpb.RangeFeedValue) { + rows <- value + }, rangefeed.WithInitialScan(func(ctx context.Context) { + close(rows) + }), rangefeed.WithRetry(shortRetryOptions)) + require.NoError(t, err) + require.NotNil(t, r) + for i := 0; i < numFailures+1; i++ { + r, ok := <-rows + require.Equal(t, row.Key, r.Key) + require.True(t, ok) + } + _, ok := <-rows + require.False(t, ok) + r.Close() + }) + t.Run("changefeed retries", func(t *testing.T) { + stopper := stop.NewStopper() + ctx := context.Background() + defer stopper.Stop(ctx) + sp := roachpb.Span{ + Key: roachpb.Key("a"), + EndKey: roachpb.Key("c"), + } + initialTS := hlc.Timestamp{WallTime: 1} + nextTS := initialTS.Next() + lastTS := nextTS.Next() + row := roachpb.KeyValue{ + Key: sp.Key, + Value: roachpb.Value{}, + } + const ( + numRestartsBeforeCheckpoint = 3 + firstPartialCheckpoint = numRestartsBeforeCheckpoint + 1 + secondPartialCheckpoint = firstPartialCheckpoint + 1 + fullCheckpoint = secondPartialCheckpoint + 1 + lastEvent = fullCheckpoint + 1 + totalRestarts = lastEvent - 1 + ) + var iteration int + var gotToTheEnd bool + mc := mockClient{ + scan: func( + ctx context.Context, span roachpb.Span, asOf hlc.Timestamp, rowFn func(value roachpb.KeyValue), + ) error { + t.Error("this should not be called") + return nil + }, + rangefeed: func( + ctx context.Context, span roachpb.Span, startFrom hlc.Timestamp, withDiff bool, eventC chan<- *roachpb.RangeFeedEvent, + ) error { + assert.False(t, withDiff) // it was not set + sendEvent := func(ts hlc.Timestamp) { + eventC <- &roachpb.RangeFeedEvent{ + Val: &roachpb.RangeFeedValue{ + Key: sp.Key, + }, + } + } + iteration++ + switch { + case iteration <= numRestartsBeforeCheckpoint: + sendEvent(initialTS) + assert.Equal(t, startFrom, initialTS) + return errors.New("boom") + case iteration == firstPartialCheckpoint: + assert.Equal(t, startFrom, initialTS) + eventC <- &roachpb.RangeFeedEvent{ + Checkpoint: &roachpb.RangeFeedCheckpoint{ + Span: roachpb.Span{ + Key: sp.Key, + EndKey: sp.Key.PrefixEnd(), + }, + ResolvedTS: nextTS, + }, + } + sendEvent(initialTS) + return errors.New("boom") + case iteration == secondPartialCheckpoint: + assert.Equal(t, startFrom, initialTS) + eventC <- &roachpb.RangeFeedEvent{ + Checkpoint: &roachpb.RangeFeedCheckpoint{ + Span: roachpb.Span{ + Key: sp.Key.PrefixEnd(), + EndKey: sp.EndKey, + }, + ResolvedTS: nextTS, + }, + } + sendEvent(nextTS) + return errors.New("boom") + case iteration == fullCheckpoint: + // At this point the frontier should have a complete checkpoint at + // nextTS. + assert.Equal(t, startFrom, nextTS) + eventC <- &roachpb.RangeFeedEvent{ + Checkpoint: &roachpb.RangeFeedCheckpoint{ + Span: sp, + ResolvedTS: lastTS, + }, + } + sendEvent(nextTS) + return errors.New("boom") + case iteration == lastEvent: + // Send a last event. + sendEvent(lastTS) + gotToTheEnd = true + <-ctx.Done() + return ctx.Err() + default: + panic(iteration) + } + }, + } + f := rangefeed.NewFactoryWithDB(stopper, &mc) + rows := make(chan *roachpb.RangeFeedValue) + r, err := f.RangeFeed(ctx, "foo", sp, initialTS, func( + ctx context.Context, value *roachpb.RangeFeedValue, + ) { + rows <- value + }, rangefeed.WithRetry(shortRetryOptions)) + require.NoError(t, err) + require.NotNil(t, r) + start := timeutil.Now() + for i := 0; i < lastEvent; i++ { + r := <-rows + assert.Equal(t, row.Key, r.Key) + } + minimumBackoff := 850 * time.Microsecond // initialBackoff less jitter + totalBackoff := timeutil.Since(start) + require.Greater(t, totalBackoff.Nanoseconds(), (totalRestarts * minimumBackoff).Nanoseconds()) + r.Close() + require.True(t, gotToTheEnd) + }) + t.Run("withDiff", func(t *testing.T) { + stopper := stop.NewStopper() + ctx := context.Background() + defer stopper.Stop(ctx) + sp := roachpb.Span{ + Key: roachpb.Key("a"), + EndKey: roachpb.Key("c"), + } + mc := mockClient{ + scan: func( + ctx context.Context, span roachpb.Span, asOf hlc.Timestamp, rowFn func(value roachpb.KeyValue), + ) error { + t.Error("this should not be called") + return nil + }, + rangefeed: func( + ctx context.Context, span roachpb.Span, startFrom hlc.Timestamp, withDiff bool, eventC chan<- *roachpb.RangeFeedEvent, + ) error { + assert.True(t, withDiff) + eventC <- &roachpb.RangeFeedEvent{ + Val: &roachpb.RangeFeedValue{ + Key: sp.Key, + }, + } + <-ctx.Done() + return ctx.Err() + }, + } + f := rangefeed.NewFactoryWithDB(stopper, &mc) + rows := make(chan *roachpb.RangeFeedValue) + r, err := f.RangeFeed(ctx, "foo", sp, hlc.Timestamp{}, func( + ctx context.Context, value *roachpb.RangeFeedValue, + ) { + rows <- value + }, rangefeed.WithDiff()) + require.NoError(t, err) + <-rows + r.Close() + }) + t.Run("stopper already stopped", func(t *testing.T) { + stopper := stop.NewStopper() + ctx := context.Background() + sp := roachpb.Span{ + Key: roachpb.Key("a"), + EndKey: roachpb.Key("c"), + } + stopper.Stop(ctx) + f := rangefeed.NewFactoryWithDB(stopper, &mockClient{}) + r, err := f.RangeFeed(ctx, "foo", sp, hlc.Timestamp{}, func( + ctx context.Context, value *roachpb.RangeFeedValue, + ) { + }) + require.Nil(t, r) + require.True(t, errors.Is(err, stop.ErrUnavailable), "%v", err) + }) +} + +// TestBackoffOnRangefeedFailure ensures that the backoff occurs when a +// rangefeed fails. It observes this indirectly by looking at logs. +func TestBackoffOnRangefeedFailure(t *testing.T) { + defer leaktest.AfterTest(t)() + + var called int64 + const timesToFail = 3 + rpcKnobs := rpc.ContextTestingKnobs{ + StreamClientInterceptor: func( + target string, class rpc.ConnectionClass, + ) grpc.StreamClientInterceptor { + return func( + ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, + method string, streamer grpc.Streamer, opts ...grpc.CallOption, + ) (stream grpc.ClientStream, err error) { + if strings.Contains(method, "RangeFeed") && + atomic.AddInt64(&called, 1) <= timesToFail { + return nil, errors.Errorf("boom") + } + return streamer(ctx, desc, cc, method, opts...) + } + }, + } + ctx := context.Background() + var seen struct { + syncutil.Mutex + entries []logpb.Entry + } + restartingRE := regexp.MustCompile("restarting rangefeed.*after.*") + log.Intercept(ctx, func(entry logpb.Entry) { + if !restartingRE.MatchString(entry.Message) { + return + } + seen.Lock() + defer seen.Unlock() + seen.entries = append(seen.entries, entry) + }) + defer log.Intercept(ctx, nil) + tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + ContextTestingKnobs: rpcKnobs, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + testutils.SucceedsSoon(t, func() error { + seen.Lock() + defer seen.Unlock() + if len(seen.entries) < timesToFail { + return errors.Errorf("seen %d, waiting for %d", len(seen.entries), timesToFail) + } + return nil + }) + seen.Lock() + defer seen.Unlock() + +} From 197e527070519196c9896fe6040d2a3b25d08dde Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Tue, 29 Dec 2020 11:07:29 -0500 Subject: [PATCH 2/2] server,lease: adopt new rangefeed package Release note: None --- pkg/kv/kvclient/rangefeed/BUILD.bazel | 12 ++-- pkg/server/BUILD.bazel | 1 + pkg/server/server.go | 7 +++ pkg/server/server_sql.go | 6 ++ pkg/server/testserver.go | 7 +++ pkg/sql/BUILD.bazel | 2 + pkg/sql/catalog/lease/BUILD.bazel | 5 +- pkg/sql/catalog/lease/lease.go | 90 ++++++--------------------- pkg/sql/catalog/lease/lease_test.go | 72 +-------------------- pkg/sql/exec_util.go | 3 + pkg/sql/schema_changer_test.go | 4 ++ 11 files changed, 56 insertions(+), 153 deletions(-) diff --git a/pkg/kv/kvclient/rangefeed/BUILD.bazel b/pkg/kv/kvclient/rangefeed/BUILD.bazel index 3155f0db97d..7aaad1aa934 100644 --- a/pkg/kv/kvclient/rangefeed/BUILD.bazel +++ b/pkg/kv/kvclient/rangefeed/BUILD.bazel @@ -20,8 +20,8 @@ go_library( "//pkg/util/span", "//pkg/util/stop", "//pkg/util/timeutil", - "//vendor/github.com/cockroachdb/errors", - "//vendor/github.com/cockroachdb/logtags", + "@com_github_cockroachdb_errors//:errors", + "@com_github_cockroachdb_logtags//:logtags", ], ) @@ -54,9 +54,9 @@ go_test( "//pkg/util/stop", "//pkg/util/syncutil", "//pkg/util/timeutil", - "//vendor/github.com/cockroachdb/errors", - "//vendor/github.com/stretchr/testify/assert", - "//vendor/github.com/stretchr/testify/require", - "//vendor/google.golang.org/grpc", + "@com_github_cockroachdb_errors//:errors", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + "@org_golang_google_grpc//:grpc", ], ) diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 858aa14bfd6..53dc6182aa7 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -60,6 +60,7 @@ go_library( "//pkg/kv/bulk", "//pkg/kv/kvclient/kvcoord", "//pkg/kv/kvclient/kvtenant", + "//pkg/kv/kvclient/rangefeed", "//pkg/kv/kvserver", "//pkg/kv/kvserver/closedts/container", "//pkg/kv/kvserver/kvserverbase", diff --git a/pkg/server/server.go b/pkg/server/server.go index 5db8d98cd3d..84f9fec21bf 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -39,6 +39,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/container" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" @@ -419,6 +420,11 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { } } + rangeFeedFactory, err := rangefeed.NewFactory(stopper, db) + if err != nil { + return nil, err + } + nodeLiveness := liveness.NewNodeLiveness(liveness.NodeLivenessOptions{ AmbientCtx: cfg.AmbientCtx, Clock: clock, @@ -639,6 +645,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { circularJobRegistry: jobRegistry, jobAdoptionStopFile: jobAdoptionStopFile, protectedtsProvider: protectedtsProvider, + rangeFeedFactory: rangeFeedFactory, sqlStatusServer: sStatus, }) if err != nil { diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index c9361821f2f..843e9ba7b18 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -30,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/bulk" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvtenant" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/migration" @@ -204,6 +205,9 @@ type sqlServerArgs struct { // Used to list sessions and cancel sessions/queries. sqlStatusServer serverpb.SQLStatusServer + + // Used to watch settings and descriptor changes. + rangeFeedFactory *rangefeed.Factory } func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*sqlServer, error) { @@ -280,6 +284,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*sqlServer, error) { codec, lmKnobs, cfg.stopper, + cfg.rangeFeedFactory, cfg.LeaseManagerConfig, ) @@ -505,6 +510,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*sqlServer, error) { ExternalIODirConfig: cfg.ExternalIODirConfig, HydratedTables: hydratedTablesCache, GCJobNotifier: gcJobNotifier, + RangeFeedFactory: cfg.rangeFeedFactory, } if sqlSchemaChangerTestingKnobs := cfg.TestingKnobs.SQLSchemaChanger; sqlSchemaChangerTestingKnobs != nil { diff --git a/pkg/server/testserver.go b/pkg/server/testserver.go index 1247884b2f1..7d2596ce18b 100644 --- a/pkg/server/testserver.go +++ b/pkg/server/testserver.go @@ -31,6 +31,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvtenant" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" @@ -502,6 +503,11 @@ func makeSQLServerArgs( ) db := kv.NewDB(baseCfg.AmbientCtx, tcsFactory, clock, stopper) + rangeFeedFactory, err := rangefeed.NewFactory(stopper, db) + if err != nil { + return sqlServerArgs{}, err + } + circularInternalExecutor := &sql.InternalExecutor{} // Protected timestamps won't be available (at first) in multi-tenant // clusters. @@ -576,6 +582,7 @@ func makeSQLServerArgs( circularInternalExecutor: circularInternalExecutor, circularJobRegistry: &jobs.Registry{}, protectedtsProvider: protectedTSProvider, + rangeFeedFactory: rangeFeedFactory, sqlStatusServer: newTenantStatusServer( baseCfg.AmbientCtx, &adminPrivilegeChecker{ie: circularInternalExecutor}, sessionRegistry, baseCfg.Settings, ), diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 014de6ca281..d75869a2df5 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -238,6 +238,7 @@ go_library( "//pkg/kv", "//pkg/kv/kvclient/kvcoord", "//pkg/kv/kvclient/rangecache", + "//pkg/kv/kvclient/rangefeed", "//pkg/kv/kvserver", "//pkg/kv/kvserver/liveness/livenesspb", "//pkg/kv/kvserver/protectedts", @@ -476,6 +477,7 @@ go_test( "//pkg/kv", "//pkg/kv/kvclient/kvcoord", "//pkg/kv/kvclient/rangecache", + "//pkg/kv/kvclient/rangefeed", "//pkg/kv/kvserver", "//pkg/kv/kvserver/kvserverbase", "//pkg/roachpb", diff --git a/pkg/sql/catalog/lease/BUILD.bazel b/pkg/sql/catalog/lease/BUILD.bazel index d3c6a0326b4..b0e6dc03659 100644 --- a/pkg/sql/catalog/lease/BUILD.bazel +++ b/pkg/sql/catalog/lease/BUILD.bazel @@ -14,7 +14,7 @@ go_library( "//pkg/gossip", "//pkg/keys", "//pkg/kv", - "//pkg/kv/kvclient/kvcoord", + "//pkg/kv/kvclient/rangefeed", "//pkg/roachpb", "//pkg/security", "//pkg/settings", @@ -55,7 +55,6 @@ go_test( "//pkg/keys", "//pkg/kv", "//pkg/roachpb", - "//pkg/rpc", "//pkg/security", "//pkg/security/securitytest", "//pkg/server", @@ -77,7 +76,6 @@ go_test( "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/log", - "//pkg/util/log/logpb", "//pkg/util/randutil", "//pkg/util/retry", "//pkg/util/syncutil", @@ -88,6 +86,5 @@ go_test( "@com_github_cockroachdb_logtags//:logtags", "@com_github_lib_pq//:pq", "@com_github_stretchr_testify//require", - "@org_golang_google_grpc//:grpc", ], ) diff --git a/pkg/sql/catalog/lease/lease.go b/pkg/sql/catalog/lease/lease.go index c0d931f4691..d4479712f74 100644 --- a/pkg/sql/catalog/lease/lease.go +++ b/pkg/sql/catalog/lease/lease.go @@ -28,7 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" - "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/settings" @@ -1261,8 +1261,9 @@ func makeNameCacheKey(parentID descpb.ID, parentSchemaID descpb.ID, name string) // The locking order is: // Manager.mu > descriptorState.mu > nameCache.mu > descriptorVersionState.mu type Manager struct { - storage storage - mu struct { + rangeFeedFactory *rangefeed.Factory + storage storage + mu struct { syncutil.Mutex descriptors map[descpb.ID]*descriptorState @@ -1302,6 +1303,7 @@ func NewLeaseManager( codec keys.SQLCodec, testingKnobs ManagerTestingKnobs, stopper *stop.Stopper, + rangeFeedFactory *rangefeed.Factory, cfg *base.LeaseManagerConfig, ) *Manager { lm := &Manager{ @@ -1318,7 +1320,8 @@ func NewLeaseManager( leaseRenewalTimeout: cfg.DescriptorLeaseRenewalTimeout, testingKnobs: testingKnobs.LeaseStoreTestingKnobs, }, - testingKnobs: testingKnobs, + rangeFeedFactory: rangeFeedFactory, + testingKnobs: testingKnobs, names: nameCache{ descriptors: make(map[nameCacheKey]*descriptorVersionState), }, @@ -1796,54 +1799,14 @@ func (m *Manager) watchForRangefeedUpdates( if log.V(1) { log.Infof(ctx, "using rangefeeds for lease manager updates") } - distSender := db.NonTransactionalSender().(*kv.CrossRangeTxnWrapperSender).Wrapped().(*kvcoord.DistSender) - eventCh := make(chan *roachpb.RangeFeedEvent) - ctx, _ = s.WithCancelOnQuiesce(ctx) - if err := s.RunAsyncTask(ctx, "lease rangefeed", func(ctx context.Context) { - - // Run the rangefeed in a loop in the case of failure, likely due to node - // failures or general unavailability. We'll reset the retrier if the - // rangefeed runs for longer than the resetThreshold. - const resetThreshold = 30 * time.Second - restartLogEvery := log.Every(10 * time.Second) - for i, r := 1, retry.StartWithCtx(ctx, retry.Options{ - InitialBackoff: 100 * time.Millisecond, - MaxBackoff: 2 * time.Second, - Closer: s.ShouldQuiesce(), - }); r.Next(); i++ { - ts := m.getResolvedTimestamp() - descKeyPrefix := m.storage.codec.TablePrefix(uint32(systemschema.DescriptorTable.ID)) - span := roachpb.Span{ - Key: descKeyPrefix, - EndKey: descKeyPrefix.PrefixEnd(), - } - // Note: We don't need to use withDiff to detect version changes because - // the Manager already stores the relevant version information. - const withDiff = false - log.VEventf(ctx, 1, "starting rangefeed from %v on %v", ts, span) - start := timeutil.Now() - err := distSender.RangeFeed(ctx, span, ts, withDiff, eventCh) - if err != nil && ctx.Err() == nil && restartLogEvery.ShouldLog() { - log.Warningf(ctx, "lease rangefeed failed %d times, restarting: %v", - log.Safe(i), log.Safe(err)) - } - if ctx.Err() != nil { - log.VEventf(ctx, 1, "exiting rangefeed") - return - } - ranFor := timeutil.Since(start) - log.VEventf(ctx, 1, "restarting rangefeed for %v after %v", - log.Safe(span), ranFor) - if ranFor > resetThreshold { - i = 1 - r.Reset() - } - } - }); err != nil { - // This will only fail if the stopper has been stopped. - return + descriptorTableStart := m.Codec().TablePrefix(keys.DescriptorTableID) + descriptorTableSpan := roachpb.Span{ + Key: descriptorTableStart, + EndKey: descriptorTableStart.PrefixEnd(), } - handleEvent := func(ev *roachpb.RangeFeedValue) { + handleEvent := func( + ctx context.Context, ev *roachpb.RangeFeedValue, + ) { if len(ev.Value.RawBytes) == 0 { return } @@ -1867,27 +1830,10 @@ func (m *Manager) watchForRangefeedUpdates( case descUpdateCh <- &descriptor: } } - s.RunWorker(ctx, func(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - case e := <-eventCh: - if e.Checkpoint != nil { - log.VEventf(ctx, 2, "got rangefeed checkpoint %v", e.Checkpoint) - m.setResolvedTimestamp(e.Checkpoint.ResolvedTS) - continue - } - if e.Error != nil { - log.Warningf(ctx, "got an error from a rangefeed: %v", e.Error.Error) - continue - } - if e.Val != nil { - handleEvent(e.Val) - } - } - } - }) + // Ignore errors here because they indicate that the server is shutting down. + _, _ = m.rangeFeedFactory.RangeFeed( + ctx, "lease", descriptorTableSpan, m.getResolvedTimestamp(), handleEvent, + ) } func (m *Manager) handleUpdatedSystemCfg( diff --git a/pkg/sql/catalog/lease/lease_test.go b/pkg/sql/catalog/lease/lease_test.go index d9394998da9..95e3622fe73 100644 --- a/pkg/sql/catalog/lease/lease_test.go +++ b/pkg/sql/catalog/lease/lease_test.go @@ -17,8 +17,6 @@ import ( "context" gosql "database/sql" "fmt" - "regexp" - "strings" "sync" "sync/atomic" "testing" @@ -29,7 +27,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog" @@ -50,7 +47,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/log/logpb" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" @@ -58,7 +54,6 @@ import ( "github.com/cockroachdb/logtags" "github.com/lib/pq" "github.com/stretchr/testify/require" - "google.golang.org/grpc" ) type leaseTest struct { @@ -226,6 +221,7 @@ func (t *leaseTest) node(nodeID uint32) *lease.Manager { cfgCpy.Codec, t.leaseManagerTestingKnobs, t.server.Stopper(), + cfgCpy.RangeFeedFactory, t.cfg, ) ctx := logtags.AddTag(context.Background(), "leasemgr", nodeID) @@ -2306,69 +2302,3 @@ func TestRangefeedUpdatesHandledProperlyInTheFaceOfRaces(t *testing.T) { var i, j int require.Equal(t, gosql.ErrNoRows, db2.QueryRow("SELECT i, j FROM foo").Scan(&i, &j)) } - -// TestBackoffOnRangefeedFailure ensures that the backoff occurs when a -// rangefeed fails. It observes this indirectly by looking at logs. -func TestBackoffOnRangefeedFailure(t *testing.T) { - defer leaktest.AfterTest(t)() - - var called int64 - const timesToFail = 3 - rpcKnobs := rpc.ContextTestingKnobs{ - StreamClientInterceptor: func( - target string, class rpc.ConnectionClass, - ) grpc.StreamClientInterceptor { - return func( - ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, - method string, streamer grpc.Streamer, opts ...grpc.CallOption, - ) (stream grpc.ClientStream, err error) { - if strings.Contains(method, "RangeFeed") && - atomic.AddInt64(&called, 1) <= timesToFail { - return nil, errors.Errorf("boom") - } - return streamer(ctx, desc, cc, method, opts...) - } - }, - } - ctx := context.Background() - var seen struct { - syncutil.Mutex - entries []logpb.Entry - } - restartingRE := regexp.MustCompile("restarting rangefeed.*after.*") - log.Intercept(ctx, func(entry logpb.Entry) { - if !restartingRE.MatchString(entry.Message) { - return - } - seen.Lock() - defer seen.Unlock() - seen.entries = append(seen.entries, entry) - }) - defer log.Intercept(ctx, nil) - tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{ - ServerArgs: base.TestServerArgs{ - Knobs: base.TestingKnobs{ - Server: &server.TestingKnobs{ - ContextTestingKnobs: rpcKnobs, - }, - }, - }, - }) - defer tc.Stopper().Stop(ctx) - testutils.SucceedsSoon(t, func() error { - seen.Lock() - defer seen.Unlock() - if len(seen.entries) < timesToFail { - return errors.Errorf("seen %d, waiting for %d", len(seen.entries), timesToFail) - } - return nil - }) - seen.Lock() - defer seen.Unlock() - minimumBackoff := 85 * time.Millisecond // initialBackoff less jitter - var totalBackoff time.Duration - for i := 1; i < len(seen.entries); i++ { - totalBackoff += time.Duration(seen.entries[i].Time - seen.entries[i-1].Time) - } - require.Greater(t, totalBackoff.Nanoseconds(), (3 * minimumBackoff).Nanoseconds()) -} diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 612d516c615..28d7868bd9c 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -34,6 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" @@ -762,6 +763,8 @@ type ExecutorConfig struct { GCJobNotifier *gcjobnotifier.Notifier + RangeFeedFactory *rangefeed.Factory + // VersionUpgradeHook is called after validating a `SET CLUSTER SETTING // version` but before executing it. It can carry out arbitrary migrations // that allow us to eventually remove legacy code. diff --git a/pkg/sql/schema_changer_test.go b/pkg/sql/schema_changer_test.go index affeb9f7960..f4f43d514f0 100644 --- a/pkg/sql/schema_changer_test.go +++ b/pkg/sql/schema_changer_test.go @@ -30,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -98,6 +99,8 @@ func TestSchemaChangeProcess(t *testing.T) { stopper := stop.NewStopper() cfg := base.NewLeaseManagerConfig() execCfg := s.ExecutorConfig().(sql.ExecutorConfig) + rf, err := rangefeed.NewFactory(stopper, kvDB) + require.NoError(t, err) leaseMgr := lease.NewLeaseManager( log.AmbientContext{Tracer: tracing.NewTracer()}, execCfg.NodeID, @@ -108,6 +111,7 @@ func TestSchemaChangeProcess(t *testing.T) { execCfg.Codec, lease.ManagerTestingKnobs{}, stopper, + rf, cfg, ) jobRegistry := s.JobRegistry().(*jobs.Registry)