Skip to content

Commit

Permalink
importinto: add log for chunk checksum & replace codec with keyspace (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
D3Hunter authored Mar 25, 2024
1 parent c3fa89a commit 490e17c
Show file tree
Hide file tree
Showing 13 changed files with 54 additions and 82 deletions.
12 changes: 6 additions & 6 deletions br/pkg/lightning/importer/chunk_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,14 +590,14 @@ func (cr *chunkProcessor) deliverLoop(
hasMoreKVs := true
var startRealOffset, currRealOffset int64 // save to 0 at first

keyspace := keyspace.CodecV1.GetKeyspace()
if t.kvStore != nil {
keyspace = t.kvStore.GetCodec().GetKeyspace()
}
for hasMoreKVs {
c := keyspace.CodecV1
if t.kvStore != nil {
c = t.kvStore.GetCodec()
}
var (
dataChecksum = verify.NewKVChecksumWithKeyspace(c)
indexChecksum = verify.NewKVChecksumWithKeyspace(c)
dataChecksum = verify.NewKVChecksumWithKeyspace(keyspace)
indexChecksum = verify.NewKVChecksumWithKeyspace(keyspace)
)
var columns []string
var kvPacket []deliveredKVs
Expand Down
2 changes: 0 additions & 2 deletions br/pkg/lightning/verification/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//br/pkg/lightning/common",
"@com_github_tikv_client_go_v2//tikv",
"@org_uber_go_zap//zapcore",
],
)
Expand All @@ -22,6 +21,5 @@ go_test(
":verification",
"//br/pkg/lightning/common",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//tikv",
],
)
26 changes: 10 additions & 16 deletions br/pkg/lightning/verification/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"hash/crc64"

"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/zap/zapcore"
)

Expand All @@ -41,11 +40,10 @@ func NewKVChecksum() *KVChecksum {
}

// NewKVChecksumWithKeyspace creates a new KVChecksum with the given checksum and keyspace.
func NewKVChecksumWithKeyspace(k tikv.Codec) *KVChecksum {
ks := k.GetKeyspace()
func NewKVChecksumWithKeyspace(keyspace []byte) *KVChecksum {
return &KVChecksum{
base: crc64.Update(0, ecmaTable, ks),
prefixLen: len(ks),
base: crc64.Update(0, ecmaTable, keyspace),
prefixLen: len(keyspace),
}
}

Expand Down Expand Up @@ -129,8 +127,8 @@ func (c *KVChecksum) MarshalJSON() ([]byte, error) {

// KVGroupChecksum is KVChecksum(s) each for a data KV group or index KV groups.
type KVGroupChecksum struct {
m map[int64]*KVChecksum
codec tikv.Codec
m map[int64]*KVChecksum
keyspace []byte
}

// DataKVGroupID represents the ID for data KV group, as index id starts from 1,
Expand All @@ -139,10 +137,10 @@ const DataKVGroupID = -1

// NewKVGroupChecksumWithKeyspace creates a new KVGroupChecksum with the given
// keyspace.
func NewKVGroupChecksumWithKeyspace(k tikv.Codec) *KVGroupChecksum {
func NewKVGroupChecksumWithKeyspace(keyspace []byte) *KVGroupChecksum {
m := make(map[int64]*KVChecksum, 8)
m[DataKVGroupID] = NewKVChecksumWithKeyspace(k)
return &KVGroupChecksum{m: m, codec: k}
m[DataKVGroupID] = NewKVChecksumWithKeyspace(keyspace)
return &KVGroupChecksum{m: m, keyspace: keyspace}
}

// NewKVGroupChecksumForAdd creates a new KVGroupChecksum, and it can't be used
Expand All @@ -165,7 +163,7 @@ func (c *KVGroupChecksum) UpdateOneDataKV(kv common.KvPair) {
func (c *KVGroupChecksum) UpdateOneIndexKV(indexID int64, kv common.KvPair) {
cksum := c.m[indexID]
if cksum == nil {
cksum = NewKVChecksumWithKeyspace(c.codec)
cksum = NewKVChecksumWithKeyspace(c.keyspace)
c.m[indexID] = cksum
}
cksum.UpdateOne(kv)
Expand All @@ -184,11 +182,7 @@ func (c *KVGroupChecksum) getOrCreateOneGroup(id int64) *KVChecksum {
if ok {
return cksum
}
if c.codec == nil {
cksum = NewKVChecksum()
} else {
cksum = NewKVChecksumWithKeyspace(c.codec)
}
cksum = NewKVChecksumWithKeyspace(c.keyspace)
c.m[id] = cksum
return cksum
}
Expand Down
18 changes: 3 additions & 15 deletions br/pkg/lightning/verification/checksum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/verification"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/tikv"
)

func TestChecksum(t *testing.T) {
Expand Down Expand Up @@ -79,36 +78,25 @@ func TestChecksumJSON(t *testing.T) {
require.Equal(t, []byte(`{"Checksum":{"checksum":7890,"size":123,"kvs":456}}`), res)
}

type mockCodec struct {
tikv.Codec
keyspace []byte
}

func (m *mockCodec) GetKeyspace() []byte {
return m.keyspace
}

func TestGroupChecksum(t *testing.T) {
codec := &mockCodec{}
kvPair := common.KvPair{Key: []byte("key"), Val: []byte("val")}
kvPair2 := common.KvPair{Key: []byte("key2"), Val: []byte("val2")}

c := verification.NewKVGroupChecksumWithKeyspace(codec)
c := verification.NewKVGroupChecksumWithKeyspace([]byte(""))
c.UpdateOneDataKV(kvPair)
c.UpdateOneIndexKV(1, kvPair2)
inner := c.GetInnerChecksums()
require.Equal(t, 2, len(inner))
require.Equal(t, uint64(1), inner[1].SumKVS())
require.Equal(t, uint64(1), inner[verification.DataKVGroupID].SumKVS())

keyspaceCodec := &mockCodec{keyspace: []byte("keyspace")}
keyspaceC := verification.NewKVGroupChecksumWithKeyspace(keyspaceCodec)
keyspaceC := verification.NewKVGroupChecksumWithKeyspace([]byte("keyspace"))
keyspaceC.UpdateOneDataKV(kvPair)
keyspaceC.UpdateOneIndexKV(1, kvPair2)
keyspaceInner := keyspaceC.GetInnerChecksums()
require.NotEqual(t, inner, keyspaceInner)

c2 := verification.NewKVGroupChecksumWithKeyspace(codec)
c2 := verification.NewKVGroupChecksumWithKeyspace(nil)
c2.UpdateOneIndexKV(1, kvPair)
c2.UpdateOneIndexKV(2, kvPair2)
c.Add(c2)
Expand Down
2 changes: 1 addition & 1 deletion pkg/disttask/importinto/subtask_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (e *importMinimalTaskExecutor) Run(ctx context.Context, dataWriter, indexWr
})
chunkCheckpoint := toChunkCheckpoint(e.mTtask.Chunk)
sharedVars := e.mTtask.SharedVars
checksum := verify.NewKVGroupChecksumWithKeyspace(sharedVars.TableImporter.GetCodec())
checksum := verify.NewKVGroupChecksumWithKeyspace(sharedVars.TableImporter.GetKeySpace())
if sharedVars.TableImporter.IsLocalSort() {
if err := importer.ProcessChunk(
ctx,
Expand Down
2 changes: 1 addition & 1 deletion pkg/disttask/importinto/task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (s *importStepExecutor) RunSubtask(ctx context.Context, subtask *proto.Subt
DataEngine: dataEngine,
IndexEngine: indexEngine,
Progress: importer.NewProgress(),
Checksum: verification.NewKVGroupChecksumWithKeyspace(s.tableImporter.GetCodec()),
Checksum: verification.NewKVGroupChecksumWithKeyspace(s.tableImporter.GetKeySpace()),
SortedDataMeta: &external.SortedKVMeta{},
SortedIndexMetas: make(map[int64]*external.SortedKVMeta),
}
Expand Down
1 change: 0 additions & 1 deletion pkg/executor/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ go_library(
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_log//:log",
"@com_github_prometheus_client_golang//prometheus",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//util",
"@com_github_tikv_pd_client//:client",
"@io_etcd_go_etcd_client_v3//:client",
Expand Down
28 changes: 13 additions & 15 deletions pkg/executor/importer/chunk_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/syncutil"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -135,10 +134,10 @@ func (b *encodedKVGroupBatch) reset() {
b.memBuf = nil
}

func newEncodedKVGroupBatch(codec tikv.Codec) *encodedKVGroupBatch {
func newEncodedKVGroupBatch(keyspace []byte) *encodedKVGroupBatch {
return &encodedKVGroupBatch{
indexKVs: make(map[int64][]common.KvPair, 8),
groupChecksum: verify.NewKVGroupChecksumWithKeyspace(codec),
groupChecksum: verify.NewKVGroupChecksumWithKeyspace(keyspace),
}
}

Expand Down Expand Up @@ -175,7 +174,7 @@ type chunkEncoder struct {
chunkName string
logger *zap.Logger
encoder KVEncoder
kvCodec tikv.Codec
keyspace []byte

// total duration takes by read/encode.
readTotalDur time.Duration
Expand All @@ -191,7 +190,7 @@ func newChunkEncoder(
sendFn func(ctx context.Context, batch *encodedKVGroupBatch) error,
logger *zap.Logger,
encoder KVEncoder,
kvCodec tikv.Codec,
keyspace []byte,
) *chunkEncoder {
return &chunkEncoder{
chunkName: chunkName,
Expand All @@ -200,8 +199,8 @@ func newChunkEncoder(
sendFn: sendFn,
logger: logger,
encoder: encoder,
kvCodec: kvCodec,
groupChecksum: verify.NewKVGroupChecksumWithKeyspace(kvCodec),
keyspace: keyspace,
groupChecksum: verify.NewKVGroupChecksumWithKeyspace(keyspace),
}
}

Expand Down Expand Up @@ -243,7 +242,7 @@ func (p *chunkEncoder) encodeLoop(ctx context.Context) error {
p.encodeTotalDur += encodeDur
p.readTotalDur += readDur

kvGroupBatch := newEncodedKVGroupBatch(p.kvCodec)
kvGroupBatch := newEncodedKVGroupBatch(p.keyspace)

for _, kvs := range rowBatch {
if err := kvGroupBatch.add(kvs); err != nil {
Expand Down Expand Up @@ -305,9 +304,11 @@ func (p *chunkEncoder) encodeLoop(ctx context.Context) error {
}

func (p *chunkEncoder) summaryFields() []zap.Field {
mergedChecksum := p.groupChecksum.MergedChecksum()
return []zap.Field{
zap.Duration("readDur", p.readTotalDur),
zap.Duration("encodeDur", p.encodeTotalDur),
zap.Object("checksum", &mergedChecksum),
}
}

Expand Down Expand Up @@ -358,7 +359,7 @@ func (p *baseChunkProcessor) Process(ctx context.Context) (err error) {
func NewFileChunkProcessor(
parser mydump.Parser,
encoder KVEncoder,
kvCodec tikv.Codec,
keyspace []byte,
chunk *checkpoints.ChunkCheckpoint,
logger *zap.Logger,
diskQuotaLock *syncutil.RWMutex,
Expand All @@ -369,7 +370,6 @@ func NewFileChunkProcessor(
chunkLogger := logger.With(zap.String("key", chunk.GetKey()))
deliver := &dataDeliver{
logger: chunkLogger,
kvCodec: kvCodec,
diskQuotaLock: diskQuotaLock,
kvBatch: make(chan *encodedKVGroupBatch, maxKVQueueSize),
dataWriter: dataWriter,
Expand All @@ -385,7 +385,7 @@ func NewFileChunkProcessor(
deliver.sendEncodedData,
chunkLogger,
encoder,
kvCodec,
keyspace,
),
logger: chunkLogger,
groupChecksum: groupChecksum,
Expand All @@ -394,7 +394,6 @@ func NewFileChunkProcessor(

type dataDeliver struct {
logger *zap.Logger
kvCodec tikv.Codec
kvBatch chan *encodedKVGroupBatch
diskQuotaLock *syncutil.RWMutex
dataWriter backend.EngineWriter
Expand Down Expand Up @@ -503,7 +502,7 @@ type QueryRow struct {
func newQueryChunkProcessor(
rowCh chan QueryRow,
encoder KVEncoder,
kvCodec tikv.Codec,
keyspace []byte,
logger *zap.Logger,
diskQuotaLock *syncutil.RWMutex,
dataWriter backend.EngineWriter,
Expand All @@ -514,7 +513,6 @@ func newQueryChunkProcessor(
chunkLogger := logger.With(zap.String("key", chunkName))
deliver := &dataDeliver{
logger: chunkLogger,
kvCodec: kvCodec,
diskQuotaLock: diskQuotaLock,
kvBatch: make(chan *encodedKVGroupBatch, maxKVQueueSize),
dataWriter: dataWriter,
Expand All @@ -530,7 +528,7 @@ func newQueryChunkProcessor(
deliver.sendEncodedData,
chunkLogger,
encoder,
kvCodec,
keyspace,
),
logger: chunkLogger,
groupChecksum: groupChecksum,
Expand Down
14 changes: 6 additions & 8 deletions pkg/executor/importer/chunk_process_testkit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import (
"github.com/pingcap/tidb/pkg/util/syncutil"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/mock/gomock"
)

Expand Down Expand Up @@ -99,7 +98,6 @@ func TestFileChunkProcess(t *testing.T) {
)
require.NoError(t, err)
diskQuotaLock := &syncutil.RWMutex{}
codec := tikv.NewCodecV1(tikv.ModeRaw)

t.Run("process success", func(t *testing.T) {
var dataKVCnt, indexKVCnt int
Expand Down Expand Up @@ -143,9 +141,9 @@ func TestFileChunkProcess(t *testing.T) {
chunkInfo := &checkpoints.ChunkCheckpoint{
Chunk: mydump.Chunk{EndOffset: int64(len(sourceData)), RowIDMax: 10000},
}
checksum := verify.NewKVGroupChecksumWithKeyspace(codec)
checksum := verify.NewKVGroupChecksumWithKeyspace(nil)
processor := importer.NewFileChunkProcessor(
csvParser, encoder, codec,
csvParser, encoder, nil,
chunkInfo, logger.Logger, diskQuotaLock, dataWriter, indexWriter, checksum,
)
require.NoError(t, processor.Process(ctx))
Expand Down Expand Up @@ -181,7 +179,7 @@ func TestFileChunkProcess(t *testing.T) {
Chunk: mydump.Chunk{EndOffset: int64(len(sourceData)), RowIDMax: 10000},
}
processor := importer.NewFileChunkProcessor(
csvParser, encoder, codec,
csvParser, encoder, nil,
chunkInfo, logger.Logger, diskQuotaLock, dataWriter, indexWriter, nil,
)
require.ErrorIs(t, processor.Process(ctx), common.ErrEncodeKV)
Expand All @@ -206,7 +204,7 @@ func TestFileChunkProcess(t *testing.T) {
Chunk: mydump.Chunk{EndOffset: int64(len(sourceData)), RowIDMax: 10000},
}
processor := importer.NewFileChunkProcessor(
csvParser, encoder, codec,
csvParser, encoder, nil,
chunkInfo, logger.Logger, diskQuotaLock, dataWriter, indexWriter, nil,
)
require.ErrorIs(t, processor.Process(ctx), common.ErrEncodeKV)
Expand All @@ -230,7 +228,7 @@ func TestFileChunkProcess(t *testing.T) {
Chunk: mydump.Chunk{EndOffset: int64(len(sourceData)), RowIDMax: 10000},
}
processor := importer.NewFileChunkProcessor(
csvParser, encoder, codec,
csvParser, encoder, nil,
chunkInfo, logger.Logger, diskQuotaLock, dataWriter, indexWriter, nil,
)
require.ErrorContains(t, processor.Process(ctx), "data write error")
Expand All @@ -255,7 +253,7 @@ func TestFileChunkProcess(t *testing.T) {
Chunk: mydump.Chunk{EndOffset: int64(len(sourceData)), RowIDMax: 10000},
}
processor := importer.NewFileChunkProcessor(
csvParser, encoder, codec,
csvParser, encoder, nil,
chunkInfo, logger.Logger, diskQuotaLock, dataWriter, indexWriter, nil,
)
require.ErrorContains(t, processor.Process(ctx), "index write error")
Expand Down
4 changes: 2 additions & 2 deletions pkg/executor/importer/engine_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,12 @@ func ProcessChunkWithWriter(
}
}()
cp = NewFileChunkProcessor(
parser, encoder, tableImporter.GetCodec(), chunk, logger,
parser, encoder, tableImporter.GetKeySpace(), chunk, logger,
tableImporter.diskQuotaLock, dataWriter, indexWriter, groupChecksum,
)
case DataSourceTypeQuery:
cp = newQueryChunkProcessor(
tableImporter.rowCh, encoder, tableImporter.GetCodec(), logger,
tableImporter.rowCh, encoder, tableImporter.GetKeySpace(), logger,
tableImporter.diskQuotaLock, dataWriter, indexWriter, groupChecksum,
)
}
Expand Down
Loading

0 comments on commit 490e17c

Please sign in to comment.