Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
Detect duplicate data from TiKV (#1144)
Browse files Browse the repository at this point in the history
  • Loading branch information
Little-Wallace authored Aug 3, 2021
1 parent e8bd882 commit 0c84977
Show file tree
Hide file tree
Showing 20 changed files with 1,245 additions and 131 deletions.
5 changes: 0 additions & 5 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,6 @@ error = '''
storage is not tikv
'''

["BR:Lightning:ErrDuplicateDetected"]
error = '''
duplicate detected
'''

["BR:PD:ErrPDInvalidResponse"]
error = '''
PD invalid response
Expand Down
4 changes: 0 additions & 4 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,4 @@ var (
ErrKVDownloadFailed = errors.Normalize("download sst failed", errors.RFCCodeText("BR:KV:ErrKVDownloadFailed"))
// ErrKVIngestFailed indicates a generic, retryable ingest error.
ErrKVIngestFailed = errors.Normalize("ingest sst failed", errors.RFCCodeText("BR:KV:ErrKVIngestFailed"))

// ErrDuplicateDetected means duplicate is detected during lightning engine import. This error must be returned after
// all data in the engine is imported. So it's safe to reset or cleanup the engine data.
ErrDuplicateDetected = errors.Normalize("duplicate detected", errors.RFCCodeText("BR:Lightning:ErrDuplicateDetected"))
)
22 changes: 17 additions & 5 deletions pkg/lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/pingcap/tidb/table"
"go.uber.org/zap"

berrors "github.com/pingcap/br/pkg/errors"
"github.com/pingcap/br/pkg/lightning/backend/kv"
"github.com/pingcap/br/pkg/lightning/checkpoints"
"github.com/pingcap/br/pkg/lightning/common"
Expand Down Expand Up @@ -197,6 +196,14 @@ type AbstractBackend interface {

// LocalWriter obtains a thread-local EngineWriter for writing rows into the given engine.
LocalWriter(ctx context.Context, cfg *LocalWriterConfig, engineUUID uuid.UUID) (EngineWriter, error)

// CollectLocalDuplicateRows collect duplicate keys from local db. We will store the duplicate keys which
// may be repeated with other keys in local data source.
CollectLocalDuplicateRows(ctx context.Context, tbl table.Table) error

// CollectLocalDuplicateRows collect duplicate keys from remote TiKV storage. This keys may be duplicate with
// the data import by other lightning.
CollectRemoteDuplicateRows(ctx context.Context, tbl table.Table) error
}

// Backend is the delivery target for Lightning
Expand Down Expand Up @@ -313,7 +320,7 @@ func (be Backend) UnsafeImportAndReset(ctx context.Context, engineUUID uuid.UUID
uuid: engineUUID,
},
}
if err := closedEngine.Import(ctx); err != nil && !berrors.Is(err, berrors.ErrDuplicateDetected) {
if err := closedEngine.Import(ctx); err != nil {
return err
}
return be.abstract.ResetEngine(ctx, engineUUID)
Expand Down Expand Up @@ -352,6 +359,14 @@ func (be Backend) OpenEngine(ctx context.Context, config *EngineConfig, tableNam
}, nil
}

func (be Backend) CollectLocalDuplicateRows(ctx context.Context, tbl table.Table) error {
return be.abstract.CollectLocalDuplicateRows(ctx, tbl)
}

func (be Backend) CollectRemoteDuplicateRows(ctx context.Context, tbl table.Table) error {
return be.abstract.CollectRemoteDuplicateRows(ctx, tbl)
}

// Close the opened engine to prepare it for importing.
func (engine *OpenedEngine) Close(ctx context.Context, cfg *EngineConfig) (*ClosedEngine, error) {
closedEngine, err := engine.unsafeClose(ctx, cfg)
Expand Down Expand Up @@ -427,9 +442,6 @@ func (engine *ClosedEngine) Import(ctx context.Context) error {
for i := 0; i < importMaxRetryTimes; i++ {
task := engine.logger.With(zap.Int("retryCnt", i)).Begin(zap.InfoLevel, "import")
err = engine.backend.ImportEngine(ctx, engine.uuid)
if berrors.Is(err, berrors.ErrDuplicateDetected) {
return err
}
if !common.IsRetryableError(err) {
task.End(zap.ErrorLevel, err)
return err
Expand Down
8 changes: 8 additions & 0 deletions pkg/lightning/backend/importer/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,14 @@ func (importer *importer) CleanupEngine(ctx context.Context, engineUUID uuid.UUI
return errors.Trace(err)
}

func (importer *importer) CollectLocalDuplicateRows(ctx context.Context, tbl table.Table) error {
panic("Unsupported Operation")
}

func (importer *importer) CollectRemoteDuplicateRows(ctx context.Context, tbl table.Table) error {
panic("Unsupported Operation")
}

func (importer *importer) WriteRows(
ctx context.Context,
engineUUID uuid.UUID,
Expand Down
62 changes: 62 additions & 0 deletions pkg/lightning/backend/kv/kv2sql.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package kv

import (
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"

"github.com/pingcap/br/pkg/lightning/metric"
)

type TableKVDecoder struct {
tbl table.Table
se *session
}

func (t *TableKVDecoder) DecodeHandleFromTable(key []byte) (kv.Handle, error) {
return tablecodec.DecodeRowKey(key)
}

func (t *TableKVDecoder) EncodeHandleKey(h kv.Handle) kv.Key {
return tablecodec.EncodeRowKeyWithHandle(t.tbl.Meta().ID, h)
}

func (t *TableKVDecoder) DecodeHandleFromIndex(indexInfo *model.IndexInfo, key []byte, value []byte) (kv.Handle, error) {
cols := tables.BuildRowcodecColInfoForIndexColumns(indexInfo, t.tbl.Meta())
return tablecodec.DecodeIndexHandle(key, value, len(cols))
}

// DecodeRawRowData decodes raw row data into a datum slice and a (columnID:columnValue) map.
func (t *TableKVDecoder) DecodeRawRowData(h kv.Handle, value []byte) ([]types.Datum, map[int64]types.Datum, error) {
return tables.DecodeRawRowData(t.se, t.tbl.Meta(), h, t.tbl.Cols(), value)
}

func NewTableKVDecoder(tbl table.Table, options *SessionOptions) (*TableKVDecoder, error) {
metric.KvEncoderCounter.WithLabelValues("open").Inc()
se := newSession(options)
cols := tbl.Cols()
// Set CommonAddRecordCtx to session to reuse the slices and BufStore in AddRecord
recordCtx := tables.NewCommonAddRecordCtx(len(cols))
tables.SetAddRecordCtx(se, recordCtx)

return &TableKVDecoder{
tbl: tbl,
se: se,
}, nil
}
82 changes: 82 additions & 0 deletions pkg/lightning/backend/kv/sql2kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,88 @@ func (s *kvSuite) TestEncode(c *C) {
}})
}

func (s *kvSuite) TestDecode(c *C) {
c1 := &model.ColumnInfo{ID: 1, Name: model.NewCIStr("c1"), State: model.StatePublic, Offset: 0, FieldType: *types.NewFieldType(mysql.TypeTiny)}
cols := []*model.ColumnInfo{c1}
tblInfo := &model.TableInfo{ID: 1, Columns: cols, PKIsHandle: false, State: model.StatePublic}
tbl, err := tables.TableFromMeta(NewPanickingAllocators(0), tblInfo)
c.Assert(err, IsNil)
decoder, err := NewTableKVDecoder(tbl, &SessionOptions{
SQLMode: mysql.ModeStrictAllTables,
Timestamp: 1234567890,
})
c.Assert(decoder, NotNil)
p := common.KvPair{
Key: []byte{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1},
Val: []byte{0x8, 0x2, 0x8, 0x2},
}
h, err := decoder.DecodeHandleFromTable(p.Key)
c.Assert(err, IsNil)
c.Assert(p.Val, NotNil)
rows, _, err := decoder.DecodeRawRowData(h, p.Val)
c.Assert(rows, DeepEquals, []types.Datum{
types.NewIntDatum(1),
})
}

func (s *kvSuite) TestDecodeIndex(c *C) {
logger := log.Logger{Logger: zap.NewNop()}
tblInfo := &model.TableInfo{
ID: 1,
Indices: []*model.IndexInfo{
{
ID: 2,
Name: model.NewCIStr("test"),
Columns: []*model.IndexColumn{
{Offset: 0},
{Offset: 1},
},
Primary: true,
State: model.StatePublic,
},
},
Columns: []*model.ColumnInfo{
{ID: 1, Name: model.NewCIStr("c1"), State: model.StatePublic, Offset: 0, FieldType: *types.NewFieldType(mysql.TypeInt24)},
{ID: 2, Name: model.NewCIStr("c2"), State: model.StatePublic, Offset: 1, FieldType: *types.NewFieldType(mysql.TypeString)},
},
State: model.StatePublic,
PKIsHandle: false,
}
tbl, err := tables.TableFromMeta(NewPanickingAllocators(0), tblInfo)
if err != nil {
fmt.Printf("error: %v", err.Error())
}
c.Assert(err, IsNil)
rows := []types.Datum{
types.NewIntDatum(2),
types.NewStringDatum("abc"),
}

// Strict mode
strictMode, err := NewTableKVEncoder(tbl, &SessionOptions{
SQLMode: mysql.ModeStrictAllTables,
Timestamp: 1234567890,
})
c.Assert(err, IsNil)
pairs, err := strictMode.Encode(logger, rows, 1, []int{0, 1, -1}, 123)
data := pairs.(*KvPairs)
c.Assert(len(data.pairs), DeepEquals, 2)

decoder, err := NewTableKVDecoder(tbl, &SessionOptions{
SQLMode: mysql.ModeStrictAllTables,
Timestamp: 1234567890,
})
c.Assert(err, IsNil)
h1, err := decoder.DecodeHandleFromTable(data.pairs[0].Key)
c.Assert(err, IsNil)
h2, err := decoder.DecodeHandleFromIndex(tbl.Indices()[0].Meta(), data.pairs[1].Key, data.pairs[1].Val)
c.Assert(err, IsNil)
c.Assert(h1.Equal(h2), IsTrue)
rawData, _, err := decoder.DecodeRawRowData(h1, data.pairs[0].Val)
c.Assert(err, IsNil)
c.Assert(rawData, DeepEquals, rows)
}

func (s *kvSuite) TestEncodeRowFormatV2(c *C) {
// Test encoding in row format v2, as described in <https://github.com/pingcap/tidb/blob/master/docs/design/2018-07-19-row-format.md>.

Expand Down
Loading

0 comments on commit 0c84977

Please sign in to comment.