Skip to content

Commit

Permalink
support decode kv
Browse files Browse the repository at this point in the history
Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>
  • Loading branch information
Little-Wallace committed May 25, 2021
1 parent 18edf13 commit 4341adc
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 17 deletions.
52 changes: 52 additions & 0 deletions pkg/lightning/backend/kv/kv2sql.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package kv

import (
"github.com/pingcap/br/pkg/lightning/metric"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
)

type TableKVDecoder struct {
tbl table.Table
se *session
recordCache []types.Datum
genCols []genCol
// auto random bits value for this chunk
autoRandomHeaderBits int64
}

func (t *TableKVDecoder) DecodeHandleFromTable(key []byte) (kv.Handle, error) {
return tablecodec.DecodeRowKey(key)
}

func (t *TableKVDecoder) EncodeHandleKey(h kv.Handle) kv.Key {
return tablecodec.EncodeRowKeyWithHandle(t.tbl.Meta().ID, h)
}

func (t *TableKVDecoder) DecodeHandleFromIndex(indexInfo *model.IndexInfo, key []byte, value []byte) (kv.Handle, error) {
cols := tables.BuildRowcodecColInfoForIndexColumns(indexInfo, t.tbl.Meta())
return tablecodec.DecodeIndexHandle(key, value, len(cols))
}

// DecodeRawRowData decodes raw row data into a datum slice and a (columnID:columnValue) map.
func (t *TableKVDecoder) DecodeRawRowData(h kv.Handle, value []byte) ([]types.Datum, map[int64]types.Datum, error) {
return tables.DecodeRawRowData(t.se, t.tbl.Meta(), h, t.tbl.Cols(), value)
}

func NewTableKVDecoder(tbl table.Table, options *SessionOptions) (*TableKVDecoder, error) {
metric.KvEncoderCounter.WithLabelValues("open").Inc()
se := newSession(options)
cols := tbl.Cols()
// Set CommonAddRecordCtx to session to reuse the slices and BufStore in AddRecord
recordCtx := tables.NewCommonAddRecordCtx(len(cols))
tables.SetAddRecordCtx(se, recordCtx)

return &TableKVDecoder{
tbl: tbl,
se: se,
}, nil
}
83 changes: 83 additions & 0 deletions pkg/lightning/backend/kv/sql2kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package kv

import (
"errors"
"fmt"

. "github.com/pingcap/check"
"github.com/pingcap/parser"
Expand Down Expand Up @@ -138,6 +139,88 @@ func (s *kvSuite) TestEncode(c *C) {
}))
}

func (s *kvSuite) TestDecode(c *C) {
c1 := &model.ColumnInfo{ID: 1, Name: model.NewCIStr("c1"), State: model.StatePublic, Offset: 0, FieldType: *types.NewFieldType(mysql.TypeTiny)}
cols := []*model.ColumnInfo{c1}
tblInfo := &model.TableInfo{ID: 1, Columns: cols, PKIsHandle: false, State: model.StatePublic}
tbl, err := tables.TableFromMeta(NewPanickingAllocators(0), tblInfo)
c.Assert(err, IsNil)
decoder, err := NewTableKVDecoder(tbl, &SessionOptions{
SQLMode: mysql.ModeStrictAllTables,
Timestamp: 1234567890,
})
c.Assert(decoder, NotNil)
p := common.KvPair{
Key: []byte{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1},
Val: []byte{0x8, 0x2, 0x8, 0x2},
}
h, err := decoder.DecodeHandleFromTable(p.Key)
c.Assert(err, IsNil)
c.Assert(p.Val, NotNil)
rows, _, err := decoder.DecodeRawRowData(h, p.Val)
c.Assert(rows, DeepEquals, []types.Datum{
types.NewIntDatum(1),
})
}

func (s *kvSuite) TestDecodeIndex(c *C) {
logger := log.Logger{Logger: zap.NewNop()}
tblInfo := &model.TableInfo{
ID: 1,
Indices: []*model.IndexInfo{
{
ID: 2,
Name: model.NewCIStr("test"),
Columns: []*model.IndexColumn{
{Offset: 0},
{Offset: 1},
},
Primary: true,
State: model.StatePublic,
},
},
Columns: []*model.ColumnInfo{
{ID: 1, Name: model.NewCIStr("c1"), State: model.StatePublic, Offset: 0, FieldType: *types.NewFieldType(mysql.TypeInt24)},
{ID: 2, Name: model.NewCIStr("c2"), State: model.StatePublic, Offset: 1, FieldType: *types.NewFieldType(mysql.TypeString)},
},
State: model.StatePublic,
PKIsHandle: false,
}
tbl, err := tables.TableFromMeta(NewPanickingAllocators(0), tblInfo)
if err != nil {
fmt.Printf("error: %v", err.Error())
}
c.Assert(err, IsNil)
rows := []types.Datum{
types.NewIntDatum(2),
types.NewStringDatum("abc"),
}

// Strict mode
strictMode, err := NewTableKVEncoder(tbl, &SessionOptions{
SQLMode: mysql.ModeStrictAllTables,
Timestamp: 1234567890,
})
c.Assert(err, IsNil)
pairs, err := strictMode.Encode(logger, rows, 1, []int{0, 1, -1})
data := pairs.(kvPairs)
c.Assert(len(data), DeepEquals, 2)

decoder, err := NewTableKVDecoder(tbl, &SessionOptions{
SQLMode: mysql.ModeStrictAllTables,
Timestamp: 1234567890,
})
c.Assert(err, IsNil)
h1, err := decoder.DecodeHandleFromTable(data[0].Key)
c.Assert(err, IsNil)
h2, err := decoder.DecodeHandleFromIndex(tbl.Indices()[0].Meta(), data[1].Key, data[1].Val)
c.Assert(err, IsNil)
c.Assert(h1.Equal(h2), IsTrue)
rawData, _, err := decoder.DecodeRawRowData(h1, data[0].Val)
c.Assert(err, IsNil)
c.Assert(rawData, DeepEquals, rows)
}

func (s *kvSuite) TestEncodeRowFormatV2(c *C) {
// Test encoding in row format v2, as described in <https://github.com/pingcap/tidb/blob/master/docs/design/2018-07-19-row-format.md>.

Expand Down
3 changes: 1 addition & 2 deletions pkg/lightning/backend/kv/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@
package kv

import (
"github.com/pingcap/tidb/types"

"github.com/pingcap/br/pkg/lightning/log"
"github.com/pingcap/br/pkg/lightning/verification"
"github.com/pingcap/tidb/types"
)

// Encoder encodes a row of SQL values into some opaque type which can be
Expand Down
58 changes: 43 additions & 15 deletions pkg/lightning/restore/duplicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package restore
import (
"bytes"
"context"
backendkv "github.com/pingcap/br/pkg/lightning/backend/kv"
"github.com/pingcap/br/pkg/lightning/checkpoints"
"github.com/pingcap/br/pkg/lightning/common"
"github.com/pingcap/br/pkg/lightning/log"
Expand Down Expand Up @@ -42,10 +43,11 @@ const (
)

type DuplicateRequest struct {
tableID int64
indexID int64 // 0 represent it is a
start kv.Key
end kv.Key
tableID int64
indexID int64 // 0 represent it is a table request
start kv.Key
end kv.Key
indexInfo *model.IndexInfo
}

type DuplicateManager struct {
Expand All @@ -54,6 +56,7 @@ type DuplicateManager struct {
RegionConcurrency int
connPool common.GRPCConns
tls *common.TLS
decoder backendkv.TableKVDecoder
}

func (manager *DuplicateManager) DuplicateTable(ctx context.Context, tableInfo *checkpoints.TidbTableInfo) error {
Expand Down Expand Up @@ -105,7 +108,7 @@ func (manager *DuplicateManager) sendRequestToTiKV(ctx context.Context, req *Dup
end = req.end
}

cli, err := manager.getDuplicateStream(ctx, region, start, end)
cli, err := manager.getDuplicateStream(ctx, region, start, end, req.indexInfo == nil)
if err != nil {
r, err := manager.pdClient.GetRegionByID(rpcctx, region.Meta.GetId())
if err != nil {
Expand Down Expand Up @@ -159,20 +162,31 @@ func (manager *DuplicateManager) sendRequestToTiKV(ctx context.Context, req *Dup
if len(resp.Pairs) == 0 {
break
}
manager.StoreDuplicateData(resp)
manager.storeDuplicateData(resp, req)
}
}
regions = unfinishedRegions
}
return nil
}

func (manager *DuplicateManager) StoreDuplicateData(resp *sst.DuplicateDetectResponse) error {
func (manager *DuplicateManager) storeDuplicateData(resp *sst.DuplicateDetectResponse, req *DuplicateRequest) error {
b := manager.db.NewBatch()
opts := &pebble.WriteOptions{Sync: false}
for _, kv := range resp.Pairs {
kv.Key = strconv.AppendUint(kv.Key, kv.CommitTs, 10)
b.Set(kv.Key, kv.Value, opts)
ts := strconv.AppendUint([]byte{}, kv.CommitTs, 10)
if req.indexInfo != nil {
h, err := manager.decoder.DecodeHandleFromIndex(req.indexInfo, kv.Key, kv.Value)
if err != nil {
continue
}
key := manager.decoder.EncodeHandleKey(h)
key = strconv.AppendUint(key, kv.CommitTs, 10)
b.Set(kv.Key, ts, opts)
} else {
kv.Key = strconv.AppendUint(kv.Key, kv.CommitTs, 10)
b.Set(kv.Key, ts, opts)
}
}
var err error
for i := 0; i < maxRetryTimes; i++ {
Expand All @@ -184,7 +198,10 @@ func (manager *DuplicateManager) StoreDuplicateData(resp *sst.DuplicateDetectRes
return err
}

func (manager *DuplicateManager) getDuplicateStream(ctx context.Context, region *pd.Region, start []byte, end []byte) (sst.ImportSST_DuplicateDetectClient, error) {
func (manager *DuplicateManager) getDuplicateStream(ctx context.Context,
region *pd.Region,
start []byte, end []byte,
keyOnly bool) (sst.ImportSST_DuplicateDetectClient, error) {
leader := region.Leader
if leader == nil {
leader = region.Meta.GetPeers()[0]
Expand All @@ -204,7 +221,7 @@ func (manager *DuplicateManager) getDuplicateStream(ctx context.Context, region
Context: reqCtx,
StartKey: start,
EndKey: end,
KeyOnly: false,
KeyOnly: keyOnly,
}
stream, err := cli.DuplicateDetect(ctx, req)
return stream, err
Expand Down Expand Up @@ -255,6 +272,16 @@ func (manager *DuplicateManager) makeConn(ctx context.Context, storeID uint64) (
return conn, nil
}

func (manager *DuplicateManager) ReportDuplicateData() error {
// TODO
return nil
}

func (manager *DuplicateManager) RepairDuplicateData() error {
// TODO
return nil
}

func buildDuplicateRequests(tableInfo *model.TableInfo) ([]*DuplicateRequest, error) {
reqs := make([]*DuplicateRequest, 0)
req := buildTableRequest(tableInfo.ID)
Expand Down Expand Up @@ -297,10 +324,11 @@ func buildIndexRequest(tableID int64, indexInfo *model.IndexInfo) ([]*DuplicateR
reqs := make([]*DuplicateRequest, 1)
for _, r := range keysRanges {
r := &DuplicateRequest{
start: r.StartKey,
end: r.EndKey,
indexID: indexInfo.ID,
tableID: tableID,
start: r.StartKey,
end: r.EndKey,
indexID: indexInfo.ID,
tableID: tableID,
indexInfo: indexInfo,
}
reqs = append(reqs, r)
}
Expand Down

0 comments on commit 4341adc

Please sign in to comment.