diff --git a/pkg/kv/kvclient/rangefeed/BUILD.bazel b/pkg/kv/kvclient/rangefeed/BUILD.bazel new file mode 100644 index 00000000000..7aaad1aa934 --- /dev/null +++ b/pkg/kv/kvclient/rangefeed/BUILD.bazel @@ -0,0 +1,62 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "rangefeed", + srcs = [ + "config.go", + "db_adapter.go", + "doc.go", + "rangefeed.go", + ], + importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed", + visibility = ["//visibility:public"], + deps = [ + "//pkg/kv", + "//pkg/kv/kvclient/kvcoord", + "//pkg/roachpb", + "//pkg/util/hlc", + "//pkg/util/log", + "//pkg/util/retry", + "//pkg/util/span", + "//pkg/util/stop", + "//pkg/util/timeutil", + "@com_github_cockroachdb_errors//:errors", + "@com_github_cockroachdb_logtags//:logtags", + ], +) + +go_test( + name = "rangefeed_test", + srcs = [ + "db_adapter_external_test.go", + "helpers_test.go", + "main_test.go", + "rangefeed_external_test.go", + "rangefeed_mock_test.go", + ], + embed = [":rangefeed"], + deps = [ + "//pkg/base", + "//pkg/roachpb", + "//pkg/rpc", + "//pkg/security", + "//pkg/security/securitytest", + "//pkg/server", + "//pkg/testutils", + "//pkg/testutils/serverutils", + "//pkg/testutils/testcluster", + "//pkg/util/encoding", + "//pkg/util/hlc", + "//pkg/util/leaktest", + "//pkg/util/log", + "//pkg/util/log/logpb", + "//pkg/util/retry", + "//pkg/util/stop", + "//pkg/util/syncutil", + "//pkg/util/timeutil", + "@com_github_cockroachdb_errors//:errors", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + "@org_golang_google_grpc//:grpc", + ], +) diff --git a/pkg/kv/kvclient/rangefeed/config.go b/pkg/kv/kvclient/rangefeed/config.go new file mode 100644 index 00000000000..68d07b9d82c --- /dev/null +++ b/pkg/kv/kvclient/rangefeed/config.go @@ -0,0 +1,72 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package rangefeed + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/util/retry" +) + +// Option configures a RangeFeed. +type Option interface { + set(*config) +} + +type config struct { + retryOptions retry.Options + onInitialScanDone OnInitialScanDone + withInitialScan bool + withDiff bool +} + +type optionFunc func(*config) + +func (o optionFunc) set(c *config) { o(c) } + +// OnInitialScanDone is called when an initial scan is finished before any rows +// from the rangefeed are supplied. +type OnInitialScanDone func(ctx context.Context) + +// WithInitialScan enables an initial scan of the data in the span. The rows of +// an initial scan will be passed to the value function used to construct the +// RangeFeed. Upon completion of the initial scan, the passed function (if +// non-nil) will be called. The initial scan may be restarted and thus rows +// may be observed multiple times. The caller cannot rely on rows being returned +// in order. +func WithInitialScan(f OnInitialScanDone) Option { + return optionFunc(func(c *config) { + c.withInitialScan = true + c.onInitialScanDone = f + }) +} + +// WithDiff makes an option to enable an initial scan which defaults to +// false. +func WithDiff() Option { return withDiff } + +var withDiff = optionFunc(func(c *config) { c.withDiff = true }) + +// WithRetry configures the retry options for the rangefeed. +func WithRetry(options retry.Options) Option { + return optionFunc(func(c *config) { + c.retryOptions = options + }) +} + +var defaultConfig = config{} + +func initConfig(c *config, options []Option) { + *c = defaultConfig + for _, o := range options { + o.set(c) + } +} diff --git a/pkg/kv/kvclient/rangefeed/db_adapter.go b/pkg/kv/kvclient/rangefeed/db_adapter.go new file mode 100644 index 00000000000..68dbdde533b --- /dev/null +++ b/pkg/kv/kvclient/rangefeed/db_adapter.go @@ -0,0 +1,97 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package rangefeed + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/errors" +) + +// dbAdapter is an implementation of db a *kv.DB. +type dbAdapter struct { + db *kv.DB + distSender *kvcoord.DistSender + targetScanBytes int64 +} + +// TODO(ajwerner): Hook up a memory monitor. Fortunately most users of the +// initial scan are reading scant amounts of data. + +// defaultTargetScanBytes was pulled out of thin air. The main reason is that +// this thing is not hooked up to a memory monitor. +const defaultTargetScanBytes = 1 << 19 // 512 KiB + +// newDBAdapter construct a kvDB using a *kv.DB. +func newDBAdapter(db *kv.DB) (*dbAdapter, error) { + dbClient := dbAdapter{ + db: db, + targetScanBytes: defaultTargetScanBytes, + } + { + txnWrapperSender, ok := db.NonTransactionalSender().(*kv.CrossRangeTxnWrapperSender) + if !ok { + return nil, errors.Errorf("failed to extract a %T from %T", + (*kv.CrossRangeTxnWrapperSender)(nil), db.NonTransactionalSender()) + } + distSender, ok := txnWrapperSender.Wrapped().(*kvcoord.DistSender) + if !ok { + return nil, errors.Errorf("failed to extract a %T from %T", + (*kvcoord.DistSender)(nil), txnWrapperSender.Wrapped()) + } + dbClient.distSender = distSender + } + return &dbClient, nil +} + +// RangeFeed is part of the kvDB interface. +func (dbc *dbAdapter) RangeFeed( + ctx context.Context, + span roachpb.Span, + startFrom hlc.Timestamp, + withDiff bool, + eventC chan<- *roachpb.RangeFeedEvent, +) error { + return dbc.distSender.RangeFeed(ctx, span, startFrom, withDiff, eventC) +} + +// Scan is part of the kvDB interface. +func (dbc *dbAdapter) Scan( + ctx context.Context, span roachpb.Span, asOf hlc.Timestamp, rowFn func(value roachpb.KeyValue), +) error { + return dbc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + txn.SetFixedTimestamp(ctx, asOf) + sp := span + var b kv.Batch + for { + b.Header.TargetBytes = dbc.targetScanBytes + b.Scan(sp.Key, sp.EndKey) + if err := txn.Run(ctx, &b); err != nil { + return err + } + res := b.Results[0] + for _, row := range res.Rows { + rowFn(roachpb.KeyValue{Key: row.Key, Value: *row.Value}) + } + if res.ResumeSpan == nil { + return nil + } + sp = res.ResumeSpanAsValue() + b = kv.Batch{} + } + }) +} + +var _ kvDB = (*dbAdapter)(nil) diff --git a/pkg/kv/kvclient/rangefeed/db_adapter_external_test.go b/pkg/kv/kvclient/rangefeed/db_adapter_external_test.go new file mode 100644 index 00000000000..26c978c24f8 --- /dev/null +++ b/pkg/kv/kvclient/rangefeed/db_adapter_external_test.go @@ -0,0 +1,87 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package rangefeed_test + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/encoding" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/stretchr/testify/require" +) + +// TestDBClientScan tests that the logic in Scan on the dbAdapter is sane. +// The rangefeed logic is a literal passthrough so it's not getting a lot of +// testing directly. +func TestDBClientScan(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) + defer tc.Stopper().Stop(ctx) + + db := tc.Server(0).DB() + beforeAny := db.Clock().Now() + scratchKey := tc.ScratchRange(t) + mkKey := func(k string) roachpb.Key { + return encoding.EncodeStringAscending(scratchKey, k) + } + require.NoError(t, db.Put(ctx, mkKey("a"), 1)) + require.NoError(t, db.Put(ctx, mkKey("b"), 2)) + afterB := db.Clock().Now() + require.NoError(t, db.Put(ctx, mkKey("c"), 3)) + + cli, err := rangefeed.NewDBAdapter(db) + require.NoError(t, err) + sp := roachpb.Span{ + Key: scratchKey, + EndKey: scratchKey.PrefixEnd(), + } + + // Ensure that the timestamps are properly respected by not observing any + // values at the timestamp preceding writes. + { + var responses []roachpb.KeyValue + require.NoError(t, cli.Scan(ctx, sp, beforeAny, func(value roachpb.KeyValue) { + responses = append(responses, value) + })) + require.Len(t, responses, 0) + } + + // Ensure that expected values are seen at the intermediate timestamp. + { + var responses []roachpb.KeyValue + require.NoError(t, cli.Scan(ctx, sp, afterB, func(value roachpb.KeyValue) { + responses = append(responses, value) + })) + require.Len(t, responses, 2) + require.Equal(t, mkKey("a"), responses[0].Key) + va, err := responses[0].Value.GetInt() + require.NoError(t, err) + require.Equal(t, int64(1), va) + } + + // Ensure that pagination doesn't break anything. + cli.SetTargetScanBytes(1) + { + var responses []roachpb.KeyValue + require.NoError(t, cli.Scan(ctx, sp, db.Clock().Now(), func(value roachpb.KeyValue) { + responses = append(responses, value) + })) + require.Len(t, responses, 3) + } + +} diff --git a/pkg/kv/kvclient/rangefeed/doc.go b/pkg/kv/kvclient/rangefeed/doc.go new file mode 100644 index 00000000000..256b4c61091 --- /dev/null +++ b/pkg/kv/kvclient/rangefeed/doc.go @@ -0,0 +1,21 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +// Package rangefeed provides a useful client abstraction atop of the rangefeed +// functionality exported by the DistSender. +// +// In particular, the abstraction exported by this package hooks up a stopper, +// and deals with retries upon errors, tracking resolved timestamps along the +// way. +package rangefeed + +// TODO(ajwerner): Rework this logic to encapsulate the multi-span logic in +// changefeedccl/kvfeed. That code also deals with some schema interactions but +// it should be split into two layers. diff --git a/pkg/kv/kvclient/rangefeed/helpers_test.go b/pkg/kv/kvclient/rangefeed/helpers_test.go new file mode 100644 index 00000000000..51ec80ded20 --- /dev/null +++ b/pkg/kv/kvclient/rangefeed/helpers_test.go @@ -0,0 +1,25 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package rangefeed + +// NewDBAdapter allows tests to construct a dbAdapter. +var NewDBAdapter = newDBAdapter + +// NewFactoryWithDB allows tests to construct a factory with an injected db. +var NewFactoryWithDB = newFactory + +// KVDB forwards the definition of kvDB to tests. +type KVDB = kvDB + +// SetTargetScanBytes is exposed for testing. +func (dbc *dbAdapter) SetTargetScanBytes(limit int64) { + dbc.targetScanBytes = limit +} diff --git a/pkg/kv/kvclient/rangefeed/main_test.go b/pkg/kv/kvclient/rangefeed/main_test.go new file mode 100644 index 00000000000..1e7e15018cf --- /dev/null +++ b/pkg/kv/kvclient/rangefeed/main_test.go @@ -0,0 +1,34 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package rangefeed_test + +import ( + "os" + "testing" + + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" +) + +func init() { + security.SetAssetLoader(securitytest.EmbeddedAssets) +} + +func TestMain(m *testing.M) { + serverutils.InitTestServerFactory(server.TestServerFactory) + serverutils.InitTestClusterFactory(testcluster.TestClusterFactory) + os.Exit(m.Run()) +} + +//go:generate ../../../util/leaktest/add-leaktest.sh *_test.go diff --git a/pkg/kv/kvclient/rangefeed/rangefeed.go b/pkg/kv/kvclient/rangefeed/rangefeed.go new file mode 100644 index 00000000000..03ee63d20a5 --- /dev/null +++ b/pkg/kv/kvclient/rangefeed/rangefeed.go @@ -0,0 +1,271 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package rangefeed + +import ( + "context" + "sync" + "time" + + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/cockroach/pkg/util/span" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/logtags" +) + +// TODO(ajwerner): Expose hooks for metrics. +// TODO(ajwerner): Expose access to checkpoints and the frontier. +// TODO(ajwerner): Expose better control over how the retrier gets reset. + +// kvDB is an adapter to the underlying KV store. +type kvDB interface { + + // RangeFeed runs a rangefeed on a given span with the given arguments. + // It encapsulates the RangeFeed method on roachpb.Internal. + RangeFeed( + ctx context.Context, + span roachpb.Span, + startFrom hlc.Timestamp, + withDiff bool, + eventC chan<- *roachpb.RangeFeedEvent, + ) error + + // Scan encapsulates scanning a keyspan at a given point in time. The method + // deals with pagination, calling the caller back for each row. Note that + // the API does not require that the rows be ordered to allow for future + // parallelism. + Scan( + ctx context.Context, + span roachpb.Span, + asOf hlc.Timestamp, + rowFn func(value roachpb.KeyValue), + ) error +} + +// Factory is used to construct RangeFeeds. +type Factory struct { + stopper *stop.Stopper + client kvDB +} + +// NewFactory constructs a new Factory. +func NewFactory(stopper *stop.Stopper, db *kv.DB) (*Factory, error) { + kvDB, err := newDBAdapter(db) + if err != nil { + return nil, err + } + return newFactory(stopper, kvDB), nil +} + +func newFactory(stopper *stop.Stopper, client kvDB) *Factory { + return &Factory{ + stopper: stopper, + client: client, + } +} + +// RangeFeed constructs a new RangeFeed. +func (f *Factory) RangeFeed( + ctx context.Context, + name string, + span roachpb.Span, + initialTimestamp hlc.Timestamp, + onValue func(ctx context.Context, value *roachpb.RangeFeedValue), + options ...Option, +) (_ *RangeFeed, err error) { + r := RangeFeed{ + client: f.client, + stopper: f.stopper, + + initialTimestamp: initialTimestamp, + span: span, + onValue: onValue, + + stopped: make(chan struct{}), + } + initConfig(&r.config, options) + ctx = logtags.AddTag(ctx, "rangefeed", name) + ctx, r.cancel = f.stopper.WithCancelOnQuiesce(ctx) + if err := f.stopper.RunAsyncTask(ctx, "rangefeed", r.run); err != nil { + r.cancel() + return nil, err + } + return &r, nil +} + +// RangeFeed represents a running RangeFeed. +type RangeFeed struct { + config + client kvDB + stopper *stop.Stopper + + initialTimestamp hlc.Timestamp + + span roachpb.Span + onValue func(ctx context.Context, value *roachpb.RangeFeedValue) + + closeOnce sync.Once + cancel context.CancelFunc + stopped chan struct{} +} + +// Close closes the RangeFeed and waits for it to shut down. +func (f *RangeFeed) Close() { + f.closeOnce.Do(func() { + f.cancel() + <-f.stopped + }) +} + +// Run the rangefeed in a loop in the case of failure, likely due to node +// failures or general unavailability. We'll reset the retrier if the +// rangefeed runs for longer than the resetThreshold. +const resetThreshold = 30 * time.Second + +// run will run the RangeFeed until the context is canceled. The context is +// hooked up to the stopper's quiescence. +func (f *RangeFeed) run(ctx context.Context) { + defer close(f.stopped) + r := retry.StartWithCtx(ctx, f.retryOptions) + restartLogEvery := log.Every(10 * time.Second) + if done := f.maybeRunInitialScan(ctx, &restartLogEvery, &r); done { + return + } + + // TODO(ajwerner): Consider adding event buffering. Doing so would require + // draining when the rangefeed fails. + eventCh := make(chan *roachpb.RangeFeedEvent) + errCh := make(chan error) + + // Maintain a frontier in order to resume at a reasonable timestamp. + // TODO(ajwerner): Consider exposing the frontier through a RangeFeed method. + // Doing so would require some synchronization. + frontier := span.MakeFrontier(f.span) + frontier.Forward(f.span, f.initialTimestamp) + for i := 0; r.Next(); i++ { + + // TODO(ajwerner): Figure out what to do if the rangefeed falls behind to + // a point where the frontier timestamp precedes the GC threshold and thus + // will never work. Perhaps an initial scan could be performed again for + // some users. The API currently doesn't make that easy. Perhaps a callback + // should be called in order to allow the client to kill the process or + // something like that. + ts := frontier.Frontier() + log.VEventf(ctx, 1, "starting rangefeed from %v on %v", ts, f.span) + start := timeutil.Now() + + // Note that the below channel send is safe because processEvents will + // wait for the worker to send. RunWorker is safe here because this + // goroutine is run as an AsyncTask. + f.stopper.RunWorker(ctx, func(ctx context.Context) { + errCh <- f.client.RangeFeed(ctx, f.span, ts, f.withDiff, eventCh) + }) + + err := f.processEvents(ctx, frontier, eventCh, errCh) + if err != nil && ctx.Err() == nil && restartLogEvery.ShouldLog() { + log.Warningf(ctx, "rangefeed failed %d times, restarting: %v", + log.Safe(i), err) + } + if ctx.Err() != nil { + log.VEventf(ctx, 1, "exiting rangefeed") + return + } + ranFor := timeutil.Since(start) + log.VEventf(ctx, 1, "restarting rangefeed for %v after %v", + log.Safe(f.span), ranFor) + if ranFor > resetThreshold { + i = 1 + r.Reset() + } + } +} + +// maybeRunInitialScan will attempt to perform an initial data scan if one was +// requested. It will retry in the face of errors and will only return upon +// context cancellation. +func (f *RangeFeed) maybeRunInitialScan( + ctx context.Context, n *log.EveryN, r *retry.Retry, +) (done bool) { + if !f.withInitialScan { + return ctx.Err() != nil // done + } + scan := func(kv roachpb.KeyValue) { + v := roachpb.RangeFeedValue{ + Key: kv.Key, + Value: kv.Value, + } + + // Mark the data as occurring at the initial timestamp, which is the + // timestamp at which it was read. + v.Value.Timestamp = f.initialTimestamp + + // Supply the value from the scan as also the previous value to avoid + // indicating that the value was previously deleted. + if f.withDiff { + v.PrevValue = v.Value + } + + // It's something of a bummer that we must allocate a new value for each + // of these but the contract doesn't indicate that the value cannot be + // retained. + f.onValue(ctx, &v) + } + for { + if !r.Next() { + return true + } + if err := f.client.Scan(ctx, f.span, f.initialTimestamp, scan); err != nil { + if n.ShouldLog() { + log.Warningf(ctx, "failed to perform initial scan: %v", err) + } + } else /* err == nil */ { + if f.onInitialScanDone != nil { + f.onInitialScanDone(ctx) + } + return ctx.Err() != nil // done + } + } +} + +// processEvents processes events sent by the rangefeed on the eventCh. It waits +// for the rangefeed to signal that it has exited by sending on errCh. +func (f *RangeFeed) processEvents( + ctx context.Context, + frontier *span.Frontier, + eventCh <-chan *roachpb.RangeFeedEvent, + errCh <-chan error, +) error { + for { + select { + case ev := <-eventCh: + switch { + case ev.Val != nil: + f.onValue(ctx, ev.Val) + case ev.Checkpoint != nil: + frontier.Forward(ev.Checkpoint.Span, ev.Checkpoint.ResolvedTS) + case ev.Error != nil: + // Intentionally do nothing, we'll get an error returned from the + // call to RangeFeed. + } + case <-ctx.Done(): + // Ensure that the RangeFeed goroutine stops. + <-errCh + return nil + case err := <-errCh: + return err + } + } +} diff --git a/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go b/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go new file mode 100644 index 00000000000..8e10363a1bf --- /dev/null +++ b/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go @@ -0,0 +1,98 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package rangefeed_test + +import ( + "context" + "fmt" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/encoding" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/stretchr/testify/require" +) + +// TestRangeFeedIntegration is a basic integration test demonstrating all of +// the pieces working together. +func TestRangeFeedIntegration(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) + defer tc.Stopper().Stop(ctx) + + db := tc.Server(0).DB() + scratchKey := tc.ScratchRange(t) + scratchKey = scratchKey[:len(scratchKey):len(scratchKey)] + mkKey := func(k string) roachpb.Key { + return encoding.EncodeStringAscending(scratchKey, k) + } + require.NoError(t, db.Put(ctx, mkKey("a"), 1)) + require.NoError(t, db.Put(ctx, mkKey("b"), 2)) + afterB := db.Clock().Now() + require.NoError(t, db.Put(ctx, mkKey("c"), 3)) + + sp := roachpb.Span{ + Key: scratchKey, + EndKey: scratchKey.PrefixEnd(), + } + { + // Enable rangefeeds, otherwise the thing will retry until they are enabled. + _, err := tc.ServerConn(0).Exec("SET CLUSTER SETTING kv.rangefeed.enabled = true") + require.NoError(t, err) + } + + f, err := rangefeed.NewFactory(tc.Stopper(), db) + require.NoError(t, err) + rows := make(chan *roachpb.RangeFeedValue) + initialScanDone := make(chan struct{}) + r, err := f.RangeFeed(ctx, "test", sp, afterB, func(ctx context.Context, value *roachpb.RangeFeedValue) { + fmt.Println(value) + rows <- value + }, rangefeed.WithDiff(), rangefeed.WithInitialScan(func(ctx context.Context) { + close(initialScanDone) + })) + require.NoError(t, err) + defer r.Close() + { + v1 := <-rows + require.Equal(t, mkKey("a"), v1.Key) + // Ensure the initial scan contract is fulfilled when WithDiff is specified. + require.Equal(t, v1.Value, v1.PrevValue) + require.Equal(t, v1.Value.Timestamp, afterB) + } + { + v2 := <-rows + require.Equal(t, mkKey("b"), v2.Key) + } + <-initialScanDone + { + v3 := <-rows + require.Equal(t, mkKey("c"), v3.Key) + } + + // Write a new value for "a" and make sure it is seen. + require.NoError(t, db.Put(ctx, mkKey("a"), 4)) + { + v4 := <-rows + require.Equal(t, mkKey("a"), v4.Key) + prev, err := v4.PrevValue.GetInt() + require.NoError(t, err) + require.Equal(t, int64(1), prev) + updated, err := v4.Value.GetInt() + require.NoError(t, err) + require.Equal(t, int64(4), updated) + } +} diff --git a/pkg/kv/kvclient/rangefeed/rangefeed_mock_test.go b/pkg/kv/kvclient/rangefeed/rangefeed_mock_test.go new file mode 100644 index 00000000000..85597a77b0b --- /dev/null +++ b/pkg/kv/kvclient/rangefeed/rangefeed_mock_test.go @@ -0,0 +1,369 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package rangefeed_test + +import ( + "context" + "regexp" + "strings" + "sync/atomic" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/rpc" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/log/logpb" + "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" +) + +type mockClient struct { + rangefeed func( + ctx context.Context, + span roachpb.Span, + startFrom hlc.Timestamp, + withDiff bool, + eventC chan<- *roachpb.RangeFeedEvent, + ) error + + scan func( + ctx context.Context, + span roachpb.Span, + asOf hlc.Timestamp, + rowFn func(value roachpb.KeyValue), + ) error +} + +func (m *mockClient) RangeFeed( + ctx context.Context, + span roachpb.Span, + startFrom hlc.Timestamp, + withDiff bool, + eventC chan<- *roachpb.RangeFeedEvent, +) error { + return m.rangefeed(ctx, span, startFrom, withDiff, eventC) +} + +func (m *mockClient) Scan( + ctx context.Context, span roachpb.Span, asOf hlc.Timestamp, rowFn func(value roachpb.KeyValue), +) error { + return m.scan(ctx, span, asOf, rowFn) +} + +var _ (rangefeed.KVDB) = (*mockClient)(nil) + +// TestRangefeedMock utilizes the kvDB interface to test the behavior of the +// RangeFeed. +func TestRangeFeedMock(t *testing.T) { + defer leaktest.AfterTest(t)() + shortRetryOptions := retry.Options{ + InitialBackoff: time.Millisecond, + MaxBackoff: 2 * time.Millisecond, + } + t.Run("scan retries", func(t *testing.T) { + stopper := stop.NewStopper() + ctx := context.Background() + defer stopper.Stop(ctx) + ctx, cancel := context.WithCancel(ctx) + var i int + sp := roachpb.Span{ + Key: roachpb.Key("a"), + EndKey: roachpb.Key("b"), + } + ts := hlc.Timestamp{WallTime: 1} + row := roachpb.KeyValue{ + Key: sp.Key, + Value: roachpb.Value{}, + } + const numFailures = 2 + mc := mockClient{ + scan: func(ctx context.Context, span roachpb.Span, asOf hlc.Timestamp, rowFn func(value roachpb.KeyValue)) error { + assert.Equal(t, ts, asOf) + assert.Equal(t, sp, span) + rowFn(row) + if i++; i <= numFailures { + return errors.New("boom") + } + // Ensure the rangefeed doesn't start up by canceling the context prior + // to concluding the scan. + cancel() + return nil + }, + } + f := rangefeed.NewFactoryWithDB(stopper, &mc) + require.NotNil(t, f) + rows := make(chan *roachpb.RangeFeedValue) + + r, err := f.RangeFeed(ctx, "foo", sp, ts, func(ctx context.Context, value *roachpb.RangeFeedValue) { + rows <- value + }, rangefeed.WithInitialScan(func(ctx context.Context) { + close(rows) + }), rangefeed.WithRetry(shortRetryOptions)) + require.NoError(t, err) + require.NotNil(t, r) + for i := 0; i < numFailures+1; i++ { + r, ok := <-rows + require.Equal(t, row.Key, r.Key) + require.True(t, ok) + } + _, ok := <-rows + require.False(t, ok) + r.Close() + }) + t.Run("changefeed retries", func(t *testing.T) { + stopper := stop.NewStopper() + ctx := context.Background() + defer stopper.Stop(ctx) + sp := roachpb.Span{ + Key: roachpb.Key("a"), + EndKey: roachpb.Key("c"), + } + initialTS := hlc.Timestamp{WallTime: 1} + nextTS := initialTS.Next() + lastTS := nextTS.Next() + row := roachpb.KeyValue{ + Key: sp.Key, + Value: roachpb.Value{}, + } + const ( + numRestartsBeforeCheckpoint = 3 + firstPartialCheckpoint = numRestartsBeforeCheckpoint + 1 + secondPartialCheckpoint = firstPartialCheckpoint + 1 + fullCheckpoint = secondPartialCheckpoint + 1 + lastEvent = fullCheckpoint + 1 + totalRestarts = lastEvent - 1 + ) + var iteration int + var gotToTheEnd bool + mc := mockClient{ + scan: func( + ctx context.Context, span roachpb.Span, asOf hlc.Timestamp, rowFn func(value roachpb.KeyValue), + ) error { + t.Error("this should not be called") + return nil + }, + rangefeed: func( + ctx context.Context, span roachpb.Span, startFrom hlc.Timestamp, withDiff bool, eventC chan<- *roachpb.RangeFeedEvent, + ) error { + assert.False(t, withDiff) // it was not set + sendEvent := func(ts hlc.Timestamp) { + eventC <- &roachpb.RangeFeedEvent{ + Val: &roachpb.RangeFeedValue{ + Key: sp.Key, + }, + } + } + iteration++ + switch { + case iteration <= numRestartsBeforeCheckpoint: + sendEvent(initialTS) + assert.Equal(t, startFrom, initialTS) + return errors.New("boom") + case iteration == firstPartialCheckpoint: + assert.Equal(t, startFrom, initialTS) + eventC <- &roachpb.RangeFeedEvent{ + Checkpoint: &roachpb.RangeFeedCheckpoint{ + Span: roachpb.Span{ + Key: sp.Key, + EndKey: sp.Key.PrefixEnd(), + }, + ResolvedTS: nextTS, + }, + } + sendEvent(initialTS) + return errors.New("boom") + case iteration == secondPartialCheckpoint: + assert.Equal(t, startFrom, initialTS) + eventC <- &roachpb.RangeFeedEvent{ + Checkpoint: &roachpb.RangeFeedCheckpoint{ + Span: roachpb.Span{ + Key: sp.Key.PrefixEnd(), + EndKey: sp.EndKey, + }, + ResolvedTS: nextTS, + }, + } + sendEvent(nextTS) + return errors.New("boom") + case iteration == fullCheckpoint: + // At this point the frontier should have a complete checkpoint at + // nextTS. + assert.Equal(t, startFrom, nextTS) + eventC <- &roachpb.RangeFeedEvent{ + Checkpoint: &roachpb.RangeFeedCheckpoint{ + Span: sp, + ResolvedTS: lastTS, + }, + } + sendEvent(nextTS) + return errors.New("boom") + case iteration == lastEvent: + // Send a last event. + sendEvent(lastTS) + gotToTheEnd = true + <-ctx.Done() + return ctx.Err() + default: + panic(iteration) + } + }, + } + f := rangefeed.NewFactoryWithDB(stopper, &mc) + rows := make(chan *roachpb.RangeFeedValue) + r, err := f.RangeFeed(ctx, "foo", sp, initialTS, func( + ctx context.Context, value *roachpb.RangeFeedValue, + ) { + rows <- value + }, rangefeed.WithRetry(shortRetryOptions)) + require.NoError(t, err) + require.NotNil(t, r) + start := timeutil.Now() + for i := 0; i < lastEvent; i++ { + r := <-rows + assert.Equal(t, row.Key, r.Key) + } + minimumBackoff := 850 * time.Microsecond // initialBackoff less jitter + totalBackoff := timeutil.Since(start) + require.Greater(t, totalBackoff.Nanoseconds(), (totalRestarts * minimumBackoff).Nanoseconds()) + r.Close() + require.True(t, gotToTheEnd) + }) + t.Run("withDiff", func(t *testing.T) { + stopper := stop.NewStopper() + ctx := context.Background() + defer stopper.Stop(ctx) + sp := roachpb.Span{ + Key: roachpb.Key("a"), + EndKey: roachpb.Key("c"), + } + mc := mockClient{ + scan: func( + ctx context.Context, span roachpb.Span, asOf hlc.Timestamp, rowFn func(value roachpb.KeyValue), + ) error { + t.Error("this should not be called") + return nil + }, + rangefeed: func( + ctx context.Context, span roachpb.Span, startFrom hlc.Timestamp, withDiff bool, eventC chan<- *roachpb.RangeFeedEvent, + ) error { + assert.True(t, withDiff) + eventC <- &roachpb.RangeFeedEvent{ + Val: &roachpb.RangeFeedValue{ + Key: sp.Key, + }, + } + <-ctx.Done() + return ctx.Err() + }, + } + f := rangefeed.NewFactoryWithDB(stopper, &mc) + rows := make(chan *roachpb.RangeFeedValue) + r, err := f.RangeFeed(ctx, "foo", sp, hlc.Timestamp{}, func( + ctx context.Context, value *roachpb.RangeFeedValue, + ) { + rows <- value + }, rangefeed.WithDiff()) + require.NoError(t, err) + <-rows + r.Close() + }) + t.Run("stopper already stopped", func(t *testing.T) { + stopper := stop.NewStopper() + ctx := context.Background() + sp := roachpb.Span{ + Key: roachpb.Key("a"), + EndKey: roachpb.Key("c"), + } + stopper.Stop(ctx) + f := rangefeed.NewFactoryWithDB(stopper, &mockClient{}) + r, err := f.RangeFeed(ctx, "foo", sp, hlc.Timestamp{}, func( + ctx context.Context, value *roachpb.RangeFeedValue, + ) { + }) + require.Nil(t, r) + require.True(t, errors.Is(err, stop.ErrUnavailable), "%v", err) + }) +} + +// TestBackoffOnRangefeedFailure ensures that the backoff occurs when a +// rangefeed fails. It observes this indirectly by looking at logs. +func TestBackoffOnRangefeedFailure(t *testing.T) { + defer leaktest.AfterTest(t)() + + var called int64 + const timesToFail = 3 + rpcKnobs := rpc.ContextTestingKnobs{ + StreamClientInterceptor: func( + target string, class rpc.ConnectionClass, + ) grpc.StreamClientInterceptor { + return func( + ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, + method string, streamer grpc.Streamer, opts ...grpc.CallOption, + ) (stream grpc.ClientStream, err error) { + if strings.Contains(method, "RangeFeed") && + atomic.AddInt64(&called, 1) <= timesToFail { + return nil, errors.Errorf("boom") + } + return streamer(ctx, desc, cc, method, opts...) + } + }, + } + ctx := context.Background() + var seen struct { + syncutil.Mutex + entries []logpb.Entry + } + restartingRE := regexp.MustCompile("restarting rangefeed.*after.*") + log.Intercept(ctx, func(entry logpb.Entry) { + if !restartingRE.MatchString(entry.Message) { + return + } + seen.Lock() + defer seen.Unlock() + seen.entries = append(seen.entries, entry) + }) + defer log.Intercept(ctx, nil) + tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + ContextTestingKnobs: rpcKnobs, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + testutils.SucceedsSoon(t, func() error { + seen.Lock() + defer seen.Unlock() + if len(seen.entries) < timesToFail { + return errors.Errorf("seen %d, waiting for %d", len(seen.entries), timesToFail) + } + return nil + }) + seen.Lock() + defer seen.Unlock() + +} diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 858aa14bfd6..53dc6182aa7 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -60,6 +60,7 @@ go_library( "//pkg/kv/bulk", "//pkg/kv/kvclient/kvcoord", "//pkg/kv/kvclient/kvtenant", + "//pkg/kv/kvclient/rangefeed", "//pkg/kv/kvserver", "//pkg/kv/kvserver/closedts/container", "//pkg/kv/kvserver/kvserverbase", diff --git a/pkg/server/server.go b/pkg/server/server.go index 5db8d98cd3d..84f9fec21bf 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -39,6 +39,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/container" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" @@ -419,6 +420,11 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { } } + rangeFeedFactory, err := rangefeed.NewFactory(stopper, db) + if err != nil { + return nil, err + } + nodeLiveness := liveness.NewNodeLiveness(liveness.NodeLivenessOptions{ AmbientCtx: cfg.AmbientCtx, Clock: clock, @@ -639,6 +645,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { circularJobRegistry: jobRegistry, jobAdoptionStopFile: jobAdoptionStopFile, protectedtsProvider: protectedtsProvider, + rangeFeedFactory: rangeFeedFactory, sqlStatusServer: sStatus, }) if err != nil { diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index c9361821f2f..843e9ba7b18 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -30,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/bulk" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvtenant" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/migration" @@ -204,6 +205,9 @@ type sqlServerArgs struct { // Used to list sessions and cancel sessions/queries. sqlStatusServer serverpb.SQLStatusServer + + // Used to watch settings and descriptor changes. + rangeFeedFactory *rangefeed.Factory } func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*sqlServer, error) { @@ -280,6 +284,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*sqlServer, error) { codec, lmKnobs, cfg.stopper, + cfg.rangeFeedFactory, cfg.LeaseManagerConfig, ) @@ -505,6 +510,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*sqlServer, error) { ExternalIODirConfig: cfg.ExternalIODirConfig, HydratedTables: hydratedTablesCache, GCJobNotifier: gcJobNotifier, + RangeFeedFactory: cfg.rangeFeedFactory, } if sqlSchemaChangerTestingKnobs := cfg.TestingKnobs.SQLSchemaChanger; sqlSchemaChangerTestingKnobs != nil { diff --git a/pkg/server/testserver.go b/pkg/server/testserver.go index 1247884b2f1..7d2596ce18b 100644 --- a/pkg/server/testserver.go +++ b/pkg/server/testserver.go @@ -31,6 +31,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvtenant" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" @@ -502,6 +503,11 @@ func makeSQLServerArgs( ) db := kv.NewDB(baseCfg.AmbientCtx, tcsFactory, clock, stopper) + rangeFeedFactory, err := rangefeed.NewFactory(stopper, db) + if err != nil { + return sqlServerArgs{}, err + } + circularInternalExecutor := &sql.InternalExecutor{} // Protected timestamps won't be available (at first) in multi-tenant // clusters. @@ -576,6 +582,7 @@ func makeSQLServerArgs( circularInternalExecutor: circularInternalExecutor, circularJobRegistry: &jobs.Registry{}, protectedtsProvider: protectedTSProvider, + rangeFeedFactory: rangeFeedFactory, sqlStatusServer: newTenantStatusServer( baseCfg.AmbientCtx, &adminPrivilegeChecker{ie: circularInternalExecutor}, sessionRegistry, baseCfg.Settings, ), diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 014de6ca281..d75869a2df5 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -238,6 +238,7 @@ go_library( "//pkg/kv", "//pkg/kv/kvclient/kvcoord", "//pkg/kv/kvclient/rangecache", + "//pkg/kv/kvclient/rangefeed", "//pkg/kv/kvserver", "//pkg/kv/kvserver/liveness/livenesspb", "//pkg/kv/kvserver/protectedts", @@ -476,6 +477,7 @@ go_test( "//pkg/kv", "//pkg/kv/kvclient/kvcoord", "//pkg/kv/kvclient/rangecache", + "//pkg/kv/kvclient/rangefeed", "//pkg/kv/kvserver", "//pkg/kv/kvserver/kvserverbase", "//pkg/roachpb", diff --git a/pkg/sql/catalog/lease/BUILD.bazel b/pkg/sql/catalog/lease/BUILD.bazel index d3c6a0326b4..b0e6dc03659 100644 --- a/pkg/sql/catalog/lease/BUILD.bazel +++ b/pkg/sql/catalog/lease/BUILD.bazel @@ -14,7 +14,7 @@ go_library( "//pkg/gossip", "//pkg/keys", "//pkg/kv", - "//pkg/kv/kvclient/kvcoord", + "//pkg/kv/kvclient/rangefeed", "//pkg/roachpb", "//pkg/security", "//pkg/settings", @@ -55,7 +55,6 @@ go_test( "//pkg/keys", "//pkg/kv", "//pkg/roachpb", - "//pkg/rpc", "//pkg/security", "//pkg/security/securitytest", "//pkg/server", @@ -77,7 +76,6 @@ go_test( "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/log", - "//pkg/util/log/logpb", "//pkg/util/randutil", "//pkg/util/retry", "//pkg/util/syncutil", @@ -88,6 +86,5 @@ go_test( "@com_github_cockroachdb_logtags//:logtags", "@com_github_lib_pq//:pq", "@com_github_stretchr_testify//require", - "@org_golang_google_grpc//:grpc", ], ) diff --git a/pkg/sql/catalog/lease/lease.go b/pkg/sql/catalog/lease/lease.go index c0d931f4691..d4479712f74 100644 --- a/pkg/sql/catalog/lease/lease.go +++ b/pkg/sql/catalog/lease/lease.go @@ -28,7 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" - "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/settings" @@ -1261,8 +1261,9 @@ func makeNameCacheKey(parentID descpb.ID, parentSchemaID descpb.ID, name string) // The locking order is: // Manager.mu > descriptorState.mu > nameCache.mu > descriptorVersionState.mu type Manager struct { - storage storage - mu struct { + rangeFeedFactory *rangefeed.Factory + storage storage + mu struct { syncutil.Mutex descriptors map[descpb.ID]*descriptorState @@ -1302,6 +1303,7 @@ func NewLeaseManager( codec keys.SQLCodec, testingKnobs ManagerTestingKnobs, stopper *stop.Stopper, + rangeFeedFactory *rangefeed.Factory, cfg *base.LeaseManagerConfig, ) *Manager { lm := &Manager{ @@ -1318,7 +1320,8 @@ func NewLeaseManager( leaseRenewalTimeout: cfg.DescriptorLeaseRenewalTimeout, testingKnobs: testingKnobs.LeaseStoreTestingKnobs, }, - testingKnobs: testingKnobs, + rangeFeedFactory: rangeFeedFactory, + testingKnobs: testingKnobs, names: nameCache{ descriptors: make(map[nameCacheKey]*descriptorVersionState), }, @@ -1796,54 +1799,14 @@ func (m *Manager) watchForRangefeedUpdates( if log.V(1) { log.Infof(ctx, "using rangefeeds for lease manager updates") } - distSender := db.NonTransactionalSender().(*kv.CrossRangeTxnWrapperSender).Wrapped().(*kvcoord.DistSender) - eventCh := make(chan *roachpb.RangeFeedEvent) - ctx, _ = s.WithCancelOnQuiesce(ctx) - if err := s.RunAsyncTask(ctx, "lease rangefeed", func(ctx context.Context) { - - // Run the rangefeed in a loop in the case of failure, likely due to node - // failures or general unavailability. We'll reset the retrier if the - // rangefeed runs for longer than the resetThreshold. - const resetThreshold = 30 * time.Second - restartLogEvery := log.Every(10 * time.Second) - for i, r := 1, retry.StartWithCtx(ctx, retry.Options{ - InitialBackoff: 100 * time.Millisecond, - MaxBackoff: 2 * time.Second, - Closer: s.ShouldQuiesce(), - }); r.Next(); i++ { - ts := m.getResolvedTimestamp() - descKeyPrefix := m.storage.codec.TablePrefix(uint32(systemschema.DescriptorTable.ID)) - span := roachpb.Span{ - Key: descKeyPrefix, - EndKey: descKeyPrefix.PrefixEnd(), - } - // Note: We don't need to use withDiff to detect version changes because - // the Manager already stores the relevant version information. - const withDiff = false - log.VEventf(ctx, 1, "starting rangefeed from %v on %v", ts, span) - start := timeutil.Now() - err := distSender.RangeFeed(ctx, span, ts, withDiff, eventCh) - if err != nil && ctx.Err() == nil && restartLogEvery.ShouldLog() { - log.Warningf(ctx, "lease rangefeed failed %d times, restarting: %v", - log.Safe(i), log.Safe(err)) - } - if ctx.Err() != nil { - log.VEventf(ctx, 1, "exiting rangefeed") - return - } - ranFor := timeutil.Since(start) - log.VEventf(ctx, 1, "restarting rangefeed for %v after %v", - log.Safe(span), ranFor) - if ranFor > resetThreshold { - i = 1 - r.Reset() - } - } - }); err != nil { - // This will only fail if the stopper has been stopped. - return + descriptorTableStart := m.Codec().TablePrefix(keys.DescriptorTableID) + descriptorTableSpan := roachpb.Span{ + Key: descriptorTableStart, + EndKey: descriptorTableStart.PrefixEnd(), } - handleEvent := func(ev *roachpb.RangeFeedValue) { + handleEvent := func( + ctx context.Context, ev *roachpb.RangeFeedValue, + ) { if len(ev.Value.RawBytes) == 0 { return } @@ -1867,27 +1830,10 @@ func (m *Manager) watchForRangefeedUpdates( case descUpdateCh <- &descriptor: } } - s.RunWorker(ctx, func(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - case e := <-eventCh: - if e.Checkpoint != nil { - log.VEventf(ctx, 2, "got rangefeed checkpoint %v", e.Checkpoint) - m.setResolvedTimestamp(e.Checkpoint.ResolvedTS) - continue - } - if e.Error != nil { - log.Warningf(ctx, "got an error from a rangefeed: %v", e.Error.Error) - continue - } - if e.Val != nil { - handleEvent(e.Val) - } - } - } - }) + // Ignore errors here because they indicate that the server is shutting down. + _, _ = m.rangeFeedFactory.RangeFeed( + ctx, "lease", descriptorTableSpan, m.getResolvedTimestamp(), handleEvent, + ) } func (m *Manager) handleUpdatedSystemCfg( diff --git a/pkg/sql/catalog/lease/lease_test.go b/pkg/sql/catalog/lease/lease_test.go index d9394998da9..95e3622fe73 100644 --- a/pkg/sql/catalog/lease/lease_test.go +++ b/pkg/sql/catalog/lease/lease_test.go @@ -17,8 +17,6 @@ import ( "context" gosql "database/sql" "fmt" - "regexp" - "strings" "sync" "sync/atomic" "testing" @@ -29,7 +27,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog" @@ -50,7 +47,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/log/logpb" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" @@ -58,7 +54,6 @@ import ( "github.com/cockroachdb/logtags" "github.com/lib/pq" "github.com/stretchr/testify/require" - "google.golang.org/grpc" ) type leaseTest struct { @@ -226,6 +221,7 @@ func (t *leaseTest) node(nodeID uint32) *lease.Manager { cfgCpy.Codec, t.leaseManagerTestingKnobs, t.server.Stopper(), + cfgCpy.RangeFeedFactory, t.cfg, ) ctx := logtags.AddTag(context.Background(), "leasemgr", nodeID) @@ -2306,69 +2302,3 @@ func TestRangefeedUpdatesHandledProperlyInTheFaceOfRaces(t *testing.T) { var i, j int require.Equal(t, gosql.ErrNoRows, db2.QueryRow("SELECT i, j FROM foo").Scan(&i, &j)) } - -// TestBackoffOnRangefeedFailure ensures that the backoff occurs when a -// rangefeed fails. It observes this indirectly by looking at logs. -func TestBackoffOnRangefeedFailure(t *testing.T) { - defer leaktest.AfterTest(t)() - - var called int64 - const timesToFail = 3 - rpcKnobs := rpc.ContextTestingKnobs{ - StreamClientInterceptor: func( - target string, class rpc.ConnectionClass, - ) grpc.StreamClientInterceptor { - return func( - ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, - method string, streamer grpc.Streamer, opts ...grpc.CallOption, - ) (stream grpc.ClientStream, err error) { - if strings.Contains(method, "RangeFeed") && - atomic.AddInt64(&called, 1) <= timesToFail { - return nil, errors.Errorf("boom") - } - return streamer(ctx, desc, cc, method, opts...) - } - }, - } - ctx := context.Background() - var seen struct { - syncutil.Mutex - entries []logpb.Entry - } - restartingRE := regexp.MustCompile("restarting rangefeed.*after.*") - log.Intercept(ctx, func(entry logpb.Entry) { - if !restartingRE.MatchString(entry.Message) { - return - } - seen.Lock() - defer seen.Unlock() - seen.entries = append(seen.entries, entry) - }) - defer log.Intercept(ctx, nil) - tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{ - ServerArgs: base.TestServerArgs{ - Knobs: base.TestingKnobs{ - Server: &server.TestingKnobs{ - ContextTestingKnobs: rpcKnobs, - }, - }, - }, - }) - defer tc.Stopper().Stop(ctx) - testutils.SucceedsSoon(t, func() error { - seen.Lock() - defer seen.Unlock() - if len(seen.entries) < timesToFail { - return errors.Errorf("seen %d, waiting for %d", len(seen.entries), timesToFail) - } - return nil - }) - seen.Lock() - defer seen.Unlock() - minimumBackoff := 85 * time.Millisecond // initialBackoff less jitter - var totalBackoff time.Duration - for i := 1; i < len(seen.entries); i++ { - totalBackoff += time.Duration(seen.entries[i].Time - seen.entries[i-1].Time) - } - require.Greater(t, totalBackoff.Nanoseconds(), (3 * minimumBackoff).Nanoseconds()) -} diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 612d516c615..28d7868bd9c 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -34,6 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" @@ -762,6 +763,8 @@ type ExecutorConfig struct { GCJobNotifier *gcjobnotifier.Notifier + RangeFeedFactory *rangefeed.Factory + // VersionUpgradeHook is called after validating a `SET CLUSTER SETTING // version` but before executing it. It can carry out arbitrary migrations // that allow us to eventually remove legacy code. diff --git a/pkg/sql/schema_changer_test.go b/pkg/sql/schema_changer_test.go index affeb9f7960..f4f43d514f0 100644 --- a/pkg/sql/schema_changer_test.go +++ b/pkg/sql/schema_changer_test.go @@ -30,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -98,6 +99,8 @@ func TestSchemaChangeProcess(t *testing.T) { stopper := stop.NewStopper() cfg := base.NewLeaseManagerConfig() execCfg := s.ExecutorConfig().(sql.ExecutorConfig) + rf, err := rangefeed.NewFactory(stopper, kvDB) + require.NoError(t, err) leaseMgr := lease.NewLeaseManager( log.AmbientContext{Tracer: tracing.NewTracer()}, execCfg.NodeID, @@ -108,6 +111,7 @@ func TestSchemaChangeProcess(t *testing.T) { execCfg.Codec, lease.ManagerTestingKnobs{}, stopper, + rf, cfg, ) jobRegistry := s.JobRegistry().(*jobs.Registry)