Skip to content

Commit

Permalink
lightning: add store write limiter (#35193)
Browse files Browse the repository at this point in the history
close #35192
  • Loading branch information
sleepymole authored and pull[bot] committed Sep 20, 2024
1 parent 6d33735 commit 1185463
Show file tree
Hide file tree
Showing 7 changed files with 220 additions and 22 deletions.
68 changes: 46 additions & 22 deletions br/pkg/lightning/backend/local/local.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,9 @@ type local struct {
errorMgr *errormanager.ErrorManager
importClientFactory ImportClientFactory

bufferPool *membuf.Pool
metrics *metric.Metrics
bufferPool *membuf.Pool
metrics *metric.Metrics
writeLimiter StoreWriteLimiter
}

func openDuplicateDB(storeDir string) (*pebble.DB, error) {
Expand Down Expand Up @@ -308,6 +309,12 @@ func NewLocalBackend(
if duplicateDetection {
keyAdapter = dupDetectKeyAdapter{}
}
var writeLimiter StoreWriteLimiter
if cfg.TikvImporter.StoreWriteBWLimit > 0 {
writeLimiter = newStoreWriteLimiter(int(cfg.TikvImporter.StoreWriteBWLimit))
} else {
writeLimiter = noopStoreWriteLimiter{}
}
local := &local{
engines: sync.Map{},
pdCtl: pdCtl,
Expand All @@ -334,6 +341,7 @@ func NewLocalBackend(
errorMgr: errorMgr,
importClientFactory: importClientFactory,
bufferPool: membuf.NewPool(membuf.WithAllocator(manual.Allocator{})),
writeLimiter: writeLimiter,
}
if m, ok := metric.FromContext(ctx); ok {
local.metrics = m
Expand Down Expand Up @@ -784,6 +792,7 @@ func (local *local) WriteToTiKV(

leaderID := region.Leader.GetId()
clients := make([]sst.ImportSST_WriteClient, 0, len(region.Region.GetPeers()))
storeIDs := make([]uint64, 0, len(region.Region.GetPeers()))
requests := make([]*sst.WriteRequest, 0, len(region.Region.GetPeers()))
for _, peer := range region.Region.GetPeers() {
cli, err := local.getImportClient(ctx, peer.StoreId)
Expand Down Expand Up @@ -812,50 +821,65 @@ func (local *local) WriteToTiKV(
}
clients = append(clients, wstream)
requests = append(requests, req)
storeIDs = append(storeIDs, peer.StoreId)
}

bytesBuf := local.bufferPool.NewBuffer()
defer bytesBuf.Destroy()
pairs := make([]*sst.Pair, 0, local.batchWriteKVPairs)
count := 0
size := int64(0)
totalSize := int64(0)
totalCount := int64(0)
firstLoop := true
// if region-split-size <= 96MiB, we bump the threshold a bit to avoid too many retry split
// because the range-properties is not 100% accurate
regionMaxSize := regionSplitSize
if regionSplitSize <= int64(config.SplitRegionSize) {
regionMaxSize = regionSplitSize * 4 / 3
}
// Set a lower flush limit to make the speed of write more smooth.
flushLimit := int64(local.writeLimiter.Limit() / 10)

flushKVs := func() error {
for i := range clients {
if err := local.writeLimiter.WaitN(ctx, storeIDs[i], int(size)); err != nil {
return errors.Trace(err)
}
requests[i].Chunk.(*sst.WriteRequest_Batch).Batch.Pairs = pairs[:count]
if err := clients[i].Send(requests[i]); err != nil {
return errors.Trace(err)
}
}
return nil
}

for iter.First(); iter.Valid(); iter.Next() {
size += int64(len(iter.Key()) + len(iter.Value()))
kvSize := int64(len(iter.Key()) + len(iter.Value()))
// here we reuse the `*sst.Pair`s to optimize object allocation
if firstLoop {
if count < len(pairs) {
pairs[count].Key = bytesBuf.AddBytes(iter.Key())
pairs[count].Value = bytesBuf.AddBytes(iter.Value())
} else {
pair := &sst.Pair{
Key: bytesBuf.AddBytes(iter.Key()),
Value: bytesBuf.AddBytes(iter.Value()),
}
pairs = append(pairs, pair)
} else {
pairs[count].Key = bytesBuf.AddBytes(iter.Key())
pairs[count].Value = bytesBuf.AddBytes(iter.Value())
}
count++
totalCount++
size += kvSize
totalSize += kvSize

if count >= local.batchWriteKVPairs {
for i := range clients {
requests[i].Chunk.(*sst.WriteRequest_Batch).Batch.Pairs = pairs[:count]
if err := clients[i].Send(requests[i]); err != nil {
return nil, Range{}, stats, errors.Trace(err)
}
if count >= local.batchWriteKVPairs || size >= flushLimit {
if err := flushKVs(); err != nil {
return nil, Range{}, stats, err
}
count = 0
size = 0
bytesBuf.Reset()
firstLoop = false
}
if size >= regionMaxSize || totalCount >= regionSplitKeys {
if totalSize >= regionMaxSize || totalCount >= regionSplitKeys {
break
}
}
Expand All @@ -865,12 +889,12 @@ func (local *local) WriteToTiKV(
}

if count > 0 {
for i := range clients {
requests[i].Chunk.(*sst.WriteRequest_Batch).Batch.Pairs = pairs[:count]
if err := clients[i].Send(requests[i]); err != nil {
return nil, Range{}, stats, errors.Trace(err)
}
if err := flushKVs(); err != nil {
return nil, Range{}, stats, err
}
count = 0
size = 0
bytesBuf.Reset()
}

var leaderPeerMetas []*sst.SSTMeta
Expand Down Expand Up @@ -913,7 +937,7 @@ func (local *local) WriteToTiKV(
logutil.Region(region.Region), logutil.Leader(region.Leader))
}
stats.count = totalCount
stats.totalBytes = size
stats.totalBytes = totalSize

return leaderPeerMetas, finishedRange, stats, nil
}
Expand Down
74 changes: 74 additions & 0 deletions br/pkg/lightning/backend/local/localhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"context"
"database/sql"
"math"
"regexp"
"runtime"
"sort"
Expand All @@ -40,6 +41,7 @@ import (
"go.uber.org/multierr"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"
)

const (
Expand Down Expand Up @@ -592,3 +594,75 @@ func intersectRange(region *metapb.Region, rg Range) Range {

return Range{start: startKey, end: endKey}
}

type StoreWriteLimiter interface {
WaitN(ctx context.Context, storeID uint64, n int) error
Limit() int
}

type storeWriteLimiter struct {
rwm sync.RWMutex
limiters map[uint64]*rate.Limiter
limit int
burst int
}

func newStoreWriteLimiter(limit int) *storeWriteLimiter {
var burst int
// Allow burst of at most 20% of the limit.
if limit <= math.MaxInt-limit/5 {
burst = limit + limit/5
} else {
// If overflowed, set burst to math.MaxInt.
burst = math.MaxInt
}
return &storeWriteLimiter{
limiters: make(map[uint64]*rate.Limiter),
limit: limit,
burst: burst,
}
}

func (s *storeWriteLimiter) WaitN(ctx context.Context, storeID uint64, n int) error {
limiter := s.getLimiter(storeID)
// The original WaitN doesn't allow n > burst,
// so we call WaitN with burst multiple times.
for n > limiter.Burst() {
if err := limiter.WaitN(ctx, limiter.Burst()); err != nil {
return err
}
n -= limiter.Burst()
}
return limiter.WaitN(ctx, n)
}

func (s *storeWriteLimiter) Limit() int {
return s.limit
}

func (s *storeWriteLimiter) getLimiter(storeID uint64) *rate.Limiter {
s.rwm.RLock()
limiter, ok := s.limiters[storeID]
s.rwm.RUnlock()
if ok {
return limiter
}
s.rwm.Lock()
defer s.rwm.Unlock()
limiter, ok = s.limiters[storeID]
if !ok {
limiter = rate.NewLimiter(rate.Limit(s.limit), s.burst)
s.limiters[storeID] = limiter
}
return limiter
}

type noopStoreWriteLimiter struct{}

func (noopStoreWriteLimiter) WaitN(ctx context.Context, storeID uint64, n int) error {
return nil
}

func (noopStoreWriteLimiter) Limit() int {
return math.MaxInt
}
43 changes: 43 additions & 0 deletions br/pkg/lightning/backend/local/localhelper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -770,3 +770,46 @@ func TestNeedSplit(t *testing.T) {
}
}
}

func TestStoreWriteLimiter(t *testing.T) {
// Test create store write limiter with limit math.MaxInt.
limiter := newStoreWriteLimiter(math.MaxInt)
err := limiter.WaitN(context.Background(), 1, 1024)
require.NoError(t, err)

// Test WaitN exceeds the burst.
limiter = newStoreWriteLimiter(100)
start := time.Now()
// 120 is the initial burst, 150 is the number of new tokens.
err = limiter.WaitN(context.Background(), 1, 120+120)
require.NoError(t, err)
require.Greater(t, time.Since(start), time.Second)

// Test WaitN with different store id.
limiter = newStoreWriteLimiter(100)
var wg sync.WaitGroup
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
defer cancel()
for i := 0; i < 10; i++ {
wg.Add(1)
go func(storeID uint64) {
defer wg.Done()
start = time.Now()
var gotTokens int
for {
n := rand.Intn(50)
if limiter.WaitN(ctx, storeID, n) != nil {
break
}
gotTokens += n
}
elapsed := time.Since(start)
maxTokens := 120 + int(float64(elapsed)/float64(time.Second)*100)
// In theory, gotTokens should be less than or equal to maxTokens.
// But we allow a little of error to avoid the test being flaky.
require.LessOrEqual(t, gotTokens, maxTokens+1)

}(uint64(i))
}
wg.Wait()
}
1 change: 1 addition & 0 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,7 @@ type TikvImporter struct {

EngineMemCacheSize ByteSize `toml:"engine-mem-cache-size" json:"engine-mem-cache-size"`
LocalWriterMemCacheSize ByteSize `toml:"local-writer-mem-cache-size" json:"local-writer-mem-cache-size"`
StoreWriteBWLimit ByteSize `toml:"store-write-bwlimit" json:"store-write-bwlimit"`
}

type Checkpoint struct {
Expand Down
5 changes: 5 additions & 0 deletions br/tests/lightning_write_limit/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[tikv-importer]
store-write-bwlimit = "1Mi"

[mydumper.csv]
header = false
49 changes: 49 additions & 0 deletions br/tests/lightning_write_limit/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#!/bin/bash
#
# Copyright 2022 PingCAP, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

set -eux

mkdir -p "$TEST_DIR/data"

cat <<EOF >"$TEST_DIR/data/test-schema-create.sql"
CREATE DATABASE test;
EOF
cat <<EOF >"$TEST_DIR/data/test.t-schema.sql"
CREATE TABLE test.t (
id int,
a int,
b int,
c int
);
EOF

# Generate 200k rows. Total size is about 5MiB.
set +x
for i in {1..200000}; do
echo "$i,$i,$i,$i" >>"$TEST_DIR/data/test.t.0.csv"
done
set -x

start=$(date +%s)
run_lightning --backend local -d "$TEST_DIR/data" --config "tests/$TEST_NAME/config.toml"
end=$(date +%s)
take=$((end - start))

# The encoded kv size is 10MiB. Usually it should take more than 10s.
if [ $take -lt 10 ]; then
echo "Lightning runs too fast. The write limiter doesn't work."
exit 1
fi
2 changes: 2 additions & 0 deletions br/tidb-lightning.toml
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ addr = "127.0.0.1:8287"
# The memory cache used in for local sorting during the encode-KV phase before flushing into the engines. The memory
# usage is bound by region-concurrency * local-writer-mem-cache-size.
#local-writer-mem-cache-size = '128MiB'
# Limit the write bandwidth to each tikv store. The unit is 'Bytes per second'. 0 means no limit.
#store-write-bwlimit = 0

[mydumper]
# block size of file reading
Expand Down

0 comments on commit 1185463

Please sign in to comment.