Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txn: Add RangedKVRetriever struct to provide custom range data operations #27543

Merged
merged 8 commits into from
Aug 25, 2021
35 changes: 35 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"crypto/tls"
"time"

"github.com/pingcap/errors"
deadlockpb "github.com/pingcap/kvproto/pkg/deadlock"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
Expand Down Expand Up @@ -70,6 +71,40 @@ type Retriever interface {
IterReverse(k Key) (Iterator, error)
}

// EmptyIterator is an iterator without any entry
type EmptyIterator struct{}

// Valid returns true if the current iterator is valid.
func (i *EmptyIterator) Valid() bool { return false }

// Key returns the current key. Always return nil for this iterator
func (i *EmptyIterator) Key() Key { return nil }

// Value returns the current value. Always return nil for this iterator
func (i *EmptyIterator) Value() []byte { return nil }

// Next goes the next position. Always return error for this iterator
func (i *EmptyIterator) Next() error { return errors.New("scanner iterator is invalid") }

// Close closes the iterator.
func (i *EmptyIterator) Close() {}

// EmptyRetriever is a retriever without any entry
type EmptyRetriever struct{}

// Get gets the value for key k from kv store. Always return nil for this retriever
func (r *EmptyRetriever) Get(_ context.Context, _ Key) ([]byte, error) {
return nil, ErrNotExist
}

// Iter creates an Iterator. Always return EmptyIterator for this retriever
func (r *EmptyRetriever) Iter(_ Key, _ Key) (Iterator, error) { return &EmptyIterator{}, nil }

// IterReverse creates a reversed Iterator. Always return EmptyIterator for this retriever
func (r *EmptyRetriever) IterReverse(_ Key) (Iterator, error) {
return &EmptyIterator{}, nil
}

// Mutator is the interface wraps the basic Set and Delete methods.
type Mutator interface {
// Set sets the value for key k as v into kv store.
Expand Down
64 changes: 64 additions & 0 deletions store/driver/txn/ranged_kv_retriever.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package txn

import (
"bytes"

"github.com/pingcap/tidb/kv"
)

// RangedKVRetriever contains a kv.KeyRange to indicate the kv range of this retriever
type RangedKVRetriever struct {
kv.KeyRange
kv.Retriever
}

// NewRangeRetriever creates a new RangedKVRetriever
func NewRangeRetriever(retriever kv.Retriever, startKey, endKey kv.Key) *RangedKVRetriever {
return &RangedKVRetriever{
KeyRange: kv.KeyRange{StartKey: startKey, EndKey: endKey},
Retriever: retriever,
}
}

// Valid returns if the retriever is valid
func (s *RangedKVRetriever) Valid() bool {
return len(s.EndKey) == 0 || bytes.Compare(s.StartKey, s.EndKey) < 0
}

// Contains returns whether the key located in the range
func (s *RangedKVRetriever) Contains(k kv.Key) bool {
return bytes.Compare(k, s.StartKey) >= 0 && (len(s.EndKey) == 0 || bytes.Compare(k, s.EndKey) < 0)
}

// Intersect returns a new RangedKVRetriever with an intersected range
func (s *RangedKVRetriever) Intersect(startKey, endKey kv.Key) *RangedKVRetriever {
maxStartKey := startKey
if bytes.Compare(s.StartKey, maxStartKey) > 0 {
maxStartKey = s.StartKey
}

minEndKey := endKey
if len(minEndKey) == 0 || (len(s.EndKey) > 0 && bytes.Compare(s.EndKey, minEndKey) < 0) {
minEndKey = s.EndKey
}

if len(minEndKey) == 0 || bytes.Compare(maxStartKey, minEndKey) < 0 {
return NewRangeRetriever(s.Retriever, maxStartKey, minEndKey)
}

return nil
}
251 changes: 251 additions & 0 deletions store/driver/txn/ranged_kv_retriever_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package txn

import (
"testing"

"github.com/pingcap/tidb/kv"
"github.com/stretchr/testify/assert"
)

func newRetriever(startKey, endKey kv.Key) *RangedKVRetriever {
return NewRangeRetriever(&kv.EmptyRetriever{}, startKey, endKey)
}

func makeBytes(s interface{}) []byte {
if s == nil {
return nil
}

switch key := s.(type) {
case string:
return []byte(key)
default:
return key.([]byte)
}
}

func TestRangedKVRetrieverValid(t *testing.T) {
assert := assert.New(t)
assert.True(newRetriever(nil, nil).Valid())
assert.True(newRetriever(nil, kv.Key("0")).Valid())
assert.True(newRetriever(kv.Key("0"), nil).Valid())
assert.True(newRetriever(kv.Key("0"), kv.Key("01")).Valid())
assert.True(newRetriever(kv.Key("0"), kv.Key("00")).Valid())

assert.False(newRetriever(kv.Key("0"), kv.Key("0")).Valid())
assert.False(newRetriever(kv.Key("00"), kv.Key("0")).Valid())
assert.False(newRetriever(kv.Key("b"), kv.Key("a")).Valid())
}

func TestRangedKVRetrieverContains(t *testing.T) {
assert := assert.New(t)
cases := []struct {
startKey interface{}
endKey interface{}
contains []interface{}
notContains []interface{}
}{
{
startKey: "abcdef",
endKey: nil,
contains: []interface{}{[]byte{0xFF}, "abcdef", "abcdefg", "abcdeg", "bbcdeg"},
notContains: []interface{}{"abcdee", "abcde", "a", "0bcdefg", []byte{0x0}},
},
{
startKey: nil,
endKey: "abcdef",
contains: []interface{}{"abcdee", "abcde", "a", "0bcdefg", []byte{0x0}},
notContains: []interface{}{[]byte{0xFF}, "abcdef", "abcdefg", "abcdeg", "bbcdeg"},
},
{
startKey: "abcdef",
endKey: "abcdeg",
contains: []interface{}{"abcdef", "abcdefg"},
notContains: []interface{}{"abcdeg", "abcdega", "abcdee", "abcdeea", "a", []byte{0xFF}, []byte{0x0}},
},
{
startKey: "abcdef",
endKey: "abcdeh",
contains: []interface{}{"abcdef", "abcdeg", "abcdefg"},
notContains: []interface{}{"abcdeh"},
},
{
startKey: nil,
endKey: nil,
contains: []interface{}{[]byte{0xFF}, "abcdef", "abcdefg", "abcdeg", "bbcdeg", "abcdee", "abcde", "a", "0bcdefg", []byte{0x0}},
},
{
startKey: "abcdef",
endKey: "abcdef",
notContains: []interface{}{[]byte{0xFF}, "abcdef", "abcdefg", "abcdeg", "bbcdeg", "abcdee", "abcde", "a", "0bcdefg", []byte{0x0}},
},
}

for _, c := range cases {
startKey := makeBytes(c.startKey)
endKey := makeBytes(c.endKey)
retriever := newRetriever(startKey, endKey)
if len(startKey) == 0 {
assert.Nil(retriever.StartKey)
} else {
assert.Equal(kv.Key(startKey), retriever.StartKey)
}

if len(endKey) == 0 {
assert.Nil(retriever.EndKey)
} else {
assert.Equal(kv.Key(endKey), retriever.EndKey)
}

for _, k := range c.contains {
assert.True(retriever.Contains(makeBytes(k)))
}

for _, k := range c.notContains {
assert.False(retriever.Contains(makeBytes(k)))
}
}
}

func TestRangedKVRetrieverIntersect(t *testing.T) {
assert := assert.New(t)
retrievers := []struct {
startKey interface{}
endKey interface{}
// {{inputStart, inputEnd}, {expectedOutputStart, expectedOutputEnd}}
cases [][][]interface{}
}{
{
startKey: nil,
endKey: nil,
cases: [][][]interface{}{
{{nil, nil}, {nil, nil}},
{{"abcde", "abcdg"}, {"abcde", "abcdg"}},
{{nil, "abcdg"}, {nil, "abcdg"}},
{{"abcde", nil}, {"abcde", nil}},
{{"abcde", "abcdd"}, nil},
},
},
{
startKey: nil,
endKey: "abcde",
cases: [][][]interface{}{
{{nil, nil}, {nil, "abcde"}},
{{nil, "abcde"}, {nil, "abcde"}},
{{nil, "abcdef"}, {nil, "abcde"}},
{{nil, "abcdd"}, {nil, "abcdd"}},
{{"abcde", nil}, nil},
{{"abcdd", nil}, {"abcdd", "abcde"}},
{{"abcdef", nil}, nil},
{{"abcde", "abcdf"}, nil},
{{"abcdd", "abcde"}, {"abcdd", "abcde"}},
{{"abcdd", "abcdf"}, {"abcdd", "abcde"}},
{{"abcdc", "abcdd"}, {"abcdc", "abcdd"}},
{{"abcdef", "abcdf"}, nil},
{{"abcdb", "abcda"}, nil},
},
},
{
startKey: "abcde",
endKey: nil,
cases: [][][]interface{}{
{{nil, nil}, {"abcde", nil}},
{{nil, "abcde"}, nil},
{{nil, "abcdef"}, {"abcde", "abcdef"}},
{{nil, "abcdd"}, nil},
{{"abcde", nil}, {"abcde", nil}},
{{"abcdd", nil}, {"abcde", nil}},
{{"abcdef", nil}, {"abcdef", nil}},
{{"abcde", "abcdf"}, {"abcde", "abcdf"}},
{{"abcdd", "abcde"}, nil},
{{"abcdd", "abcdf"}, {"abcde", "abcdf"}},
{{"abcdc", "abcdd"}, nil},
{{"abcdef", "abcdf"}, {"abcdef", "abcdf"}},
{{"abcde", "abcdd"}, nil},
},
},
{
startKey: "abcde",
endKey: "abcdg",
cases: [][][]interface{}{
{{nil, nil}, {"abcde", "abcdg"}},
{{nil, "abcde"}, nil},
{{nil, "abcdef"}, {"abcde", "abcdef"}},
{{nil, "abcdd"}, nil},
{{nil, "abcdg"}, {"abcde", "abcdg"}},
{{nil, "abcdga"}, {"abcde", "abcdg"}},
{{"abcde", nil}, {"abcde", "abcdg"}},
{{"abcdd", nil}, {"abcde", "abcdg"}},
{{"abcdef", nil}, {"abcdef", "abcdg"}},
{{"abcdg", nil}, nil},
{{"abcdga", nil}, nil},
{{"abcde", "abcdf"}, {"abcde", "abcdf"}},
{{"abcdd", "abcde"}, nil},
{{"abcdd", "abcdf"}, {"abcde", "abcdf"}},
{{"abcdc", "abcdd"}, nil},
{{"abcdef", "abcdf"}, {"abcdef", "abcdf"}},
{{"abcde", "abcdg"}, {"abcde", "abcdg"}},
{{"abcde", "abcdh"}, {"abcde", "abcdg"}},
{{"abcdef", "abcdg"}, {"abcdef", "abcdg"}},
{{"abcdef", "abcdh"}, {"abcdef", "abcdg"}},
{{"abcdg", "abcdh"}, nil},
{{"abcde", "abcdd"}, nil},
},
},
{
startKey: "abcde",
endKey: "abcdd",
cases: [][][]interface{}{
{{nil, nil}, nil},
{{nil, "abcde"}, nil},
{{nil, "abcdef"}, nil},
{{nil, "abcdd"}, nil},
{{"abcde", nil}, nil},
{{"abcdd", nil}, nil},
{{"abcdef", nil}, nil},
{{"abcde", "abcdd"}, nil},
{{"abcde", "abcdf"}, nil},
{{"abcde", "abcdef"}, nil},
},
},
}

for _, r := range retrievers {
retriever := newRetriever(makeBytes(r.startKey), makeBytes(r.endKey))
for _, c := range r.cases {
result := retriever.Intersect(makeBytes(c[0][0]), makeBytes(c[0][1]))
if len(c[1]) == 0 {
assert.Nil(result)
continue
}

assert.Equal(retriever.Retriever, result.Retriever)
expectedStart := makeBytes(c[1][0])
expectedEnd := makeBytes(c[1][1])
if len(expectedStart) == 0 {
assert.Nil(result.StartKey)
} else {
assert.Equal(kv.Key(expectedStart), result.StartKey)
}

if len(expectedEnd) == 0 {
assert.Nil(result.EndKey)
} else {
assert.Equal(kv.Key(expectedEnd), result.EndKey)
}
}
}
}