Skip to content

Commit

Permalink
fix writer append rows
Browse files Browse the repository at this point in the history
  • Loading branch information
tangenta committed Jun 28, 2023
1 parent 0b0b602 commit 6e95182
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 43 deletions.
45 changes: 13 additions & 32 deletions br/pkg/lightning/backend/sharedisk/sharedisk.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,17 +120,16 @@ func NewWriter(ctx context.Context, externalStorage storage.ExternalStorage, eng
engine := NewEngine(2048, 256)
pool := membuf.NewPool()
filePrefix := fmt.Sprintf("%s_%d", engineUUID.String(), writerID)
writeBatchSize := 8 * 1024
return &Writer{
ctx: ctx,
engine: engine,
memtableSizeLimit: 8 * 1024,
memtableSizeLimit: writeBatchSize,
keyAdapter: &local.NoopKeyAdapter{},
exStorage: externalStorage,
memBufPool: pool,
kvBuffer: pool.NewBuffer(),
writeBatch: nil,
batchCount: 0,
batchSize: 0,
writeBatch: make([]common.KvPair, 0, writeBatchSize),
currentSeq: 0,
tikvCodec: keyspace.CodecV1,
filenamePrefix: filePrefix,
Expand All @@ -145,7 +144,7 @@ type Writer struct {
ctx context.Context
sync.Mutex
engine *Engine
memtableSizeLimit int64
memtableSizeLimit int
keyAdapter local.KeyAdapter
exStorage storage.ExternalStorage

Expand All @@ -154,9 +153,6 @@ type Writer struct {
kvBuffer *membuf.Buffer
writeBatch []common.KvPair

batchCount int
batchSize int64

currentSeq int
onClose func(writerID, currentSeq int)

Expand Down Expand Up @@ -372,33 +368,21 @@ func (w *Writer) AppendRows(ctx context.Context, columnNames []string, rows enco
w.Lock()
defer w.Unlock()

l := len(w.writeBatch)
cnt := w.batchCount
keyAdapter := w.keyAdapter
for _, pair := range kvs {
w.batchSize += int64(len(pair.Key) + len(pair.Val))
buf := w.kvBuffer.AllocBytes(keyAdapter.EncodedLen(pair.Key, pair.RowID))
key := keyAdapter.Encode(buf[:0], pair.Key, pair.RowID)
val := w.kvBuffer.AddBytes(pair.Val)
if cnt < l {
w.writeBatch[cnt].Key = key
w.writeBatch[cnt].Val = val
} else {
w.writeBatch = append(w.writeBatch, common.KvPair{Key: key, Val: val})
w.writeBatch = append(w.writeBatch, common.KvPair{Key: key, Val: val})
if len(w.writeBatch) >= w.memtableSizeLimit {
if err := w.flushKVs(ctx); err != nil {
return err
}
w.writeBatch = w.writeBatch[:0]
w.kvBuffer.Reset()
}
cnt++
}
w.batchCount = cnt

if w.batchSize > w.memtableSizeLimit {
if err := w.flushKVs(ctx); err != nil {
return err
}
}

w.batchSize = 0
w.batchCount = 0
w.kvBuffer.Reset()
return nil
}

Expand Down Expand Up @@ -427,9 +411,6 @@ func (s status) Flushed() bool {
}

func (w *Writer) flushKVs(ctx context.Context) error {
if w.batchCount == 0 {
return nil
}
dataWriter, statWriter, err := w.createStorageWriter()
if err != nil {
return err
Expand All @@ -439,14 +420,14 @@ func (w *Writer) flushKVs(ctx context.Context) error {
}()
w.currentSeq++

slices.SortFunc(w.writeBatch[:w.batchCount], func(i, j common.KvPair) bool {
slices.SortFunc(w.writeBatch, func(i, j common.KvPair) bool {
return bytes.Compare(i.Key, j.Key) < 0
})

w.kvStore, err = Create(w.ctx, dataWriter, statWriter)
w.kvStore.rc = w.engine.rc

for i := 0; i < w.batchCount; i++ {
for i := 0; i < len(w.writeBatch); i++ {
err = w.kvStore.AddKeyValue(w.writeBatch[i].Key, w.writeBatch[i].Val)
logutil.BgLogger().Info("add key", zap.Any("key", w.writeBatch[i].Key), zap.Any("value", w.writeBatch[i].Val))
if err != nil {
Expand Down
26 changes: 15 additions & 11 deletions br/pkg/lightning/backend/sharedisk/sharedisk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func TestWriter(t *testing.T) {
kvs = append(kvs, kv)
}
err = writer.AppendRows(ctx, nil, kv2.MakeRowsFromKvPairs(kvs))
err = writer.flushKVs(context.Background())
require.NoError(t, err)
err = writer.kvStore.Finish()
require.NoError(t, err)
Expand All @@ -86,22 +87,25 @@ func TestWriter(t *testing.T) {
}
}()

dataReader := DataFileReader{ctx: ctx, name: "test/0", exStorage: storage}
dataReader.readBuffer = make([]byte, 4096)

i := 0
for {
k, v, err := dataReader.GetNextKV()
require.NoError(t, err)
if k == nil && v == nil {
break
for _, fileName := range []string{"test/0", "test/1", "test/2"} {
dataReader := DataFileReader{ctx: ctx, name: fileName, exStorage: storage}
dataReader.readBuffer = make([]byte, 4096)

for {
k, v, err := dataReader.GetNextKV()
require.NoError(t, err)
if k == nil && v == nil {
break
}
i++
logutil.BgLogger().Info("print kv", zap.Any("key", k), zap.Any("value", v))
}
i++
logutil.BgLogger().Info("print kv", zap.Any("key", k), zap.Any("value", v))
}

require.Equal(t, 20000, i)

statReader := statFileReader{ctx: ctx, name: "test_stat/0", exStorage: storage}
statReader := statFileReader{ctx: ctx, name: "test_stat/2", exStorage: storage}
statReader.readBuffer = make([]byte, 4096)

j := 0
Expand Down

0 comments on commit 6e95182

Please sign in to comment.