Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvclient/rangefeed,lease: introduce library for rangefeeds and adopt #58361

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions pkg/kv/kvclient/rangefeed/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "rangefeed",
srcs = [
"config.go",
"db_adapter.go",
"doc.go",
"rangefeed.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed",
visibility = ["//visibility:public"],
deps = [
"//pkg/kv",
"//pkg/kv/kvclient/kvcoord",
"//pkg/roachpb",
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/retry",
"//pkg/util/span",
"//pkg/util/stop",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
],
)

go_test(
name = "rangefeed_test",
srcs = [
"db_adapter_external_test.go",
"helpers_test.go",
"main_test.go",
"rangefeed_external_test.go",
"rangefeed_mock_test.go",
],
embed = [":rangefeed"],
deps = [
"//pkg/base",
"//pkg/roachpb",
"//pkg/rpc",
"//pkg/security",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/testcluster",
"//pkg/util/encoding",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/log/logpb",
"//pkg/util/retry",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@org_golang_google_grpc//:grpc",
],
)
72 changes: 72 additions & 0 deletions pkg/kv/kvclient/rangefeed/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package rangefeed

import (
"context"

"github.com/cockroachdb/cockroach/pkg/util/retry"
)

// Option configures a RangeFeed.
type Option interface {
set(*config)
}

type config struct {
retryOptions retry.Options
onInitialScanDone OnInitialScanDone
withInitialScan bool
withDiff bool
}

type optionFunc func(*config)

func (o optionFunc) set(c *config) { o(c) }

// OnInitialScanDone is called when an initial scan is finished before any rows
// from the rangefeed are supplied.
type OnInitialScanDone func(ctx context.Context)

// WithInitialScan enables an initial scan of the data in the span. The rows of
// an initial scan will be passed to the value function used to construct the
// RangeFeed. Upon completion of the initial scan, the passed function (if
// non-nil) will be called. The initial scan may be restarted and thus rows
// may be observed multiple times. The caller cannot rely on rows being returned
// in order.
func WithInitialScan(f OnInitialScanDone) Option {
return optionFunc(func(c *config) {
c.withInitialScan = true
c.onInitialScanDone = f
})
}

// WithDiff makes an option to enable an initial scan which defaults to
// false.
func WithDiff() Option { return withDiff }

var withDiff = optionFunc(func(c *config) { c.withDiff = true })

// WithRetry configures the retry options for the rangefeed.
func WithRetry(options retry.Options) Option {
return optionFunc(func(c *config) {
c.retryOptions = options
})
}

var defaultConfig = config{}

func initConfig(c *config, options []Option) {
*c = defaultConfig
for _, o := range options {
o.set(c)
}
}
97 changes: 97 additions & 0 deletions pkg/kv/kvclient/rangefeed/db_adapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package rangefeed

import (
"context"

"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/errors"
)

// dbAdapter is an implementation of db a *kv.DB.
type dbAdapter struct {
db *kv.DB
distSender *kvcoord.DistSender
targetScanBytes int64
}

// TODO(ajwerner): Hook up a memory monitor. Fortunately most users of the
// initial scan are reading scant amounts of data.

// defaultTargetScanBytes was pulled out of thin air. The main reason is that
// this thing is not hooked up to a memory monitor.
const defaultTargetScanBytes = 1 << 19 // 512 KiB

// newDBAdapter construct a kvDB using a *kv.DB.
func newDBAdapter(db *kv.DB) (*dbAdapter, error) {
dbClient := dbAdapter{
db: db,
targetScanBytes: defaultTargetScanBytes,
}
{
txnWrapperSender, ok := db.NonTransactionalSender().(*kv.CrossRangeTxnWrapperSender)
if !ok {
return nil, errors.Errorf("failed to extract a %T from %T",
(*kv.CrossRangeTxnWrapperSender)(nil), db.NonTransactionalSender())
}
distSender, ok := txnWrapperSender.Wrapped().(*kvcoord.DistSender)
if !ok {
return nil, errors.Errorf("failed to extract a %T from %T",
(*kvcoord.DistSender)(nil), txnWrapperSender.Wrapped())
}
dbClient.distSender = distSender
}
return &dbClient, nil
}

// RangeFeed is part of the kvDB interface.
func (dbc *dbAdapter) RangeFeed(
ctx context.Context,
span roachpb.Span,
startFrom hlc.Timestamp,
withDiff bool,
eventC chan<- *roachpb.RangeFeedEvent,
) error {
return dbc.distSender.RangeFeed(ctx, span, startFrom, withDiff, eventC)
}

// Scan is part of the kvDB interface.
func (dbc *dbAdapter) Scan(
ctx context.Context, span roachpb.Span, asOf hlc.Timestamp, rowFn func(value roachpb.KeyValue),
) error {
return dbc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
txn.SetFixedTimestamp(ctx, asOf)
sp := span
var b kv.Batch
for {
b.Header.TargetBytes = dbc.targetScanBytes
b.Scan(sp.Key, sp.EndKey)
if err := txn.Run(ctx, &b); err != nil {
return err
}
res := b.Results[0]
for _, row := range res.Rows {
rowFn(roachpb.KeyValue{Key: row.Key, Value: *row.Value})
}
if res.ResumeSpan == nil {
return nil
}
sp = res.ResumeSpanAsValue()
b = kv.Batch{}
}
})
}

var _ kvDB = (*dbAdapter)(nil)
87 changes: 87 additions & 0 deletions pkg/kv/kvclient/rangefeed/db_adapter_external_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package rangefeed_test

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/stretchr/testify/require"
)

// TestDBClientScan tests that the logic in Scan on the dbAdapter is sane.
// The rangefeed logic is a literal passthrough so it's not getting a lot of
// testing directly.
func TestDBClientScan(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{})
defer tc.Stopper().Stop(ctx)

db := tc.Server(0).DB()
beforeAny := db.Clock().Now()
scratchKey := tc.ScratchRange(t)
mkKey := func(k string) roachpb.Key {
return encoding.EncodeStringAscending(scratchKey, k)
}
require.NoError(t, db.Put(ctx, mkKey("a"), 1))
require.NoError(t, db.Put(ctx, mkKey("b"), 2))
afterB := db.Clock().Now()
require.NoError(t, db.Put(ctx, mkKey("c"), 3))

cli, err := rangefeed.NewDBAdapter(db)
require.NoError(t, err)
sp := roachpb.Span{
Key: scratchKey,
EndKey: scratchKey.PrefixEnd(),
}

// Ensure that the timestamps are properly respected by not observing any
// values at the timestamp preceding writes.
{
var responses []roachpb.KeyValue
require.NoError(t, cli.Scan(ctx, sp, beforeAny, func(value roachpb.KeyValue) {
responses = append(responses, value)
}))
require.Len(t, responses, 0)
}

// Ensure that expected values are seen at the intermediate timestamp.
{
var responses []roachpb.KeyValue
require.NoError(t, cli.Scan(ctx, sp, afterB, func(value roachpb.KeyValue) {
responses = append(responses, value)
}))
require.Len(t, responses, 2)
require.Equal(t, mkKey("a"), responses[0].Key)
va, err := responses[0].Value.GetInt()
require.NoError(t, err)
require.Equal(t, int64(1), va)
}

// Ensure that pagination doesn't break anything.
cli.SetTargetScanBytes(1)
{
var responses []roachpb.KeyValue
require.NoError(t, cli.Scan(ctx, sp, db.Clock().Now(), func(value roachpb.KeyValue) {
responses = append(responses, value)
}))
require.Len(t, responses, 3)
}

}
21 changes: 21 additions & 0 deletions pkg/kv/kvclient/rangefeed/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

// Package rangefeed provides a useful client abstraction atop of the rangefeed
// functionality exported by the DistSender.
//
// In particular, the abstraction exported by this package hooks up a stopper,
// and deals with retries upon errors, tracking resolved timestamps along the
// way.
package rangefeed

// TODO(ajwerner): Rework this logic to encapsulate the multi-span logic in
// changefeedccl/kvfeed. That code also deals with some schema interactions but
// it should be split into two layers.
25 changes: 25 additions & 0 deletions pkg/kv/kvclient/rangefeed/helpers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package rangefeed

// NewDBAdapter allows tests to construct a dbAdapter.
var NewDBAdapter = newDBAdapter

// NewFactoryWithDB allows tests to construct a factory with an injected db.
var NewFactoryWithDB = newFactory

// KVDB forwards the definition of kvDB to tests.
type KVDB = kvDB

// SetTargetScanBytes is exposed for testing.
func (dbc *dbAdapter) SetTargetScanBytes(limit int64) {
dbc.targetScanBytes = limit
}
Loading