Skip to content

Commit

Permalink
fix check
Browse files Browse the repository at this point in the history
  • Loading branch information
3pointer committed Mar 7, 2022
1 parent 00d49d8 commit b5470a9
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 17 deletions.
2 changes: 1 addition & 1 deletion br/pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type WalkOption struct {
// ObjPrefix used fo prefix search in storage.
// it can save lots of time when we want find specify prefix objects in storage.
// For example. we have 10000 <Hash>.sst files and 10 backupmeta.(\d+) files.
// we can use ObjPrefix = "backupmeta" to retrive all meta files quickly.
// we can use ObjPrefix = "backupmeta" to retrieve all meta files quickly.
ObjPrefix string
// ListCount is the number of entries per page.
//
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/stream/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/pingcap/errors"
"github.com/pingcap/log"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"

backuppb "github.com/pingcap/kvproto/pkg/brpb"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/redact"
"github.com/pingcap/tidb/kv"
"go.etcd.io/etcd/client/v3"
)

// MetaDataClient is the client for operations over metadata.
Expand Down
17 changes: 11 additions & 6 deletions br/pkg/stream/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"github.com/pingcap/tidb/tablecodec"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/kv"
"go.etcd.io/etcd/client/v3"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/embed"
"go.etcd.io/etcd/server/v3/mvcc"
)
Expand Down Expand Up @@ -92,27 +92,31 @@ func simpleTask(name string, tableCount int) stream.TaskInfo {
}

func keyIs(t *testing.T, key, value []byte, etcd *embed.Etcd) {
r, err := etcd.Server.KV().Range(key, nil, mvcc.RangeOptions{})
ctx := context.Background()
r, err := etcd.Server.KV().Range(ctx, key, nil, mvcc.RangeOptions{})
require.NoError(t, err)
require.Len(t, r.KVs, 1)
require.Equal(t, key, r.KVs[0].Key)
require.Equal(t, value, r.KVs[0].Value)
}

func keyExists(t *testing.T, key []byte, etcd *embed.Etcd) {
r, err := etcd.Server.KV().Range(key, nil, mvcc.RangeOptions{})
ctx := context.Background()
r, err := etcd.Server.KV().Range(ctx, key, nil, mvcc.RangeOptions{})
require.NoError(t, err)
require.Len(t, r.KVs, 1)
}

func keyNotExists(t *testing.T, key []byte, etcd *embed.Etcd) {
r, err := etcd.Server.KV().Range(key, nil, mvcc.RangeOptions{})
ctx := context.Background()
r, err := etcd.Server.KV().Range(ctx, key, nil, mvcc.RangeOptions{})
require.NoError(t, err)
require.Len(t, r.KVs, 0)
}

func rangeMatches(t *testing.T, ranges stream.Ranges, etcd *embed.Etcd) {
r, err := etcd.Server.KV().Range(ranges[0].StartKey, ranges[len(ranges)-1].EndKey, mvcc.RangeOptions{})
ctx := context.Background()
r, err := etcd.Server.KV().Range(ctx, ranges[0].StartKey, ranges[len(ranges)-1].EndKey, mvcc.RangeOptions{})
require.NoError(t, err)
if len(r.KVs) != len(ranges) {
t.Logf("len(ranges) not match len(response.KVs) [%d vs %d]", len(ranges), len(r.KVs))
Expand All @@ -126,7 +130,8 @@ func rangeMatches(t *testing.T, ranges stream.Ranges, etcd *embed.Etcd) {
}

func rangeIsEmpty(t *testing.T, prefix []byte, etcd *embed.Etcd) {
r, err := etcd.Server.KV().Range(prefix, kv.PrefixNextKey(prefix), mvcc.RangeOptions{})
ctx := context.Background()
r, err := etcd.Server.KV().Range(ctx, prefix, kv.PrefixNextKey(prefix), mvcc.RangeOptions{})
require.NoError(t, err)
require.Len(t, r.KVs, 0)
}
Expand Down
11 changes: 5 additions & 6 deletions br/pkg/stream/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@ import (

const (
// The commmon prefix of keys involved by the stream backup.
streamKeyPrefix = "/tidb/br-stream"
taskInfoPath = "/info"
taskCheckpointPath = "/checkpoint"
taskRangesPath = "/ranges"
taskPausePath = "/pause"
streamKeyPrefix = "/tidb/br-stream"
taskInfoPath = "/info"
taskRangesPath = "/ranges"
taskPausePath = "/pause"
)

var (
Expand Down Expand Up @@ -144,7 +143,7 @@ func (t *TaskInfo) UntilTS(ts uint64) *TaskInfo {
}

// WithTableFilterHint adds the table filter of the stream backup, and return itself.
// When schama version changed, TiDB should change the ranges of the task accroding to the table filter.
// When schama version changed, TiDB should change the ranges of the task according to the table filter.
func (t *TaskInfo) WithTableFilter(filterChain ...string) *TaskInfo {
t.PBInfo.TableFilter = filterChain
return t
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/stream/prefix_scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/pingcap/tidb/br/pkg/redact"
"github.com/pingcap/tidb/kv"
kvutil "github.com/tikv/client-go/v2/kv"
"go.etcd.io/etcd/client/v3"
clientv3 "go.etcd.io/etcd/client/v3"
)

// etcdSource is the adapter for etcd client and the `Source` interface.
Expand All @@ -19,7 +19,7 @@ type etcdSource struct {
// Scan ordered in range [from, to).
// return at most limit kvs at once.
// WARNING: The method for scanning keeps poor consistency: we may get different reversion of different pages.
// This might be accpetable just for calculating min backup ts or ranges to backup.
// This might be acceptable just for calculating min backup ts or ranges to backup.
func (e etcdSource) Scan(
ctx context.Context,
from, to []byte,
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (cfg *StreamConfig) ParseStreamCommonFromFlags(flags *pflag.FlagSet) error

cfg.TaskName, err = flags.GetString(flagStreamTaskName)
if err != nil {
errors.Trace(err)
return errors.Trace(err)
}

if len(cfg.TaskName) <= 0 {
Expand Down

0 comments on commit b5470a9

Please sign in to comment.