From 41d4a7ef61277b31bdf6c20d2270a3515912ce25 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Mon, 16 Oct 2023 20:18:24 +0800 Subject: [PATCH 1/9] init Signed-off-by: crazycs520 --- internal/client/client_batch.go | 6 ++++- internal/client/client_test.go | 46 +++++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/internal/client/client_batch.go b/internal/client/client_batch.go index a8091094a..1ac64cc9f 100644 --- a/internal/client/client_batch.go +++ b/internal/client/client_batch.go @@ -298,6 +298,9 @@ func (a *batchConn) fetchMorePendingRequests( const idleTimeout = 3 * time.Minute +// BatchSendLoopPanicFlag only used for testing. +var BatchSendLoopPanicFlag = int64(0) + func (a *batchConn) batchSendLoop(cfg config.TiKVClient) { defer func() { if r := recover(); r != nil { @@ -305,7 +308,8 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) { logutil.BgLogger().Error("batchSendLoop", zap.Any("r", r), zap.Stack("stack")) - logutil.BgLogger().Info("restart batchSendLoop") + logutil.BgLogger().Info("restart batchSendLoop", zap.Int64("cound", BatchSendLoopPanicFlag)) + atomic.AddInt64(&BatchSendLoopPanicFlag, 1) go a.batchSendLoop(cfg) } }() diff --git a/internal/client/client_test.go b/internal/client/client_test.go index 2934c1241..390ac6cf0 100644 --- a/internal/client/client_test.go +++ b/internal/client/client_test.go @@ -723,3 +723,49 @@ func TestBatchClientRecoverAfterServerRestart(t *testing.T) { require.NoError(t, err) } } + +func TestDebugBatchClient(t *testing.T) { + config.UpdateGlobal(func(conf *config.Config) { + conf.TiKVClient.MaxBatchSize = 128 + })() + + server, port := startMockTikvService() + require.True(t, port > 0) + require.True(t, server.IsRunning()) + addr := server.addr + client := NewRPCClient() + defer func() { + err := client.Close() + require.NoError(t, err) + server.Stop() + }() + + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(1) + go func() { + defer wg.Done() + conn, err := client.getConnArray(addr, true) + assert.Nil(t, err) + for j := 0; j < 100; j++ { + req := &tikvrpc.Request{ + Type: tikvrpc.CmdCop, + Req: &coprocessor.Request{}, + } + err = tikvrpc.SetContext(req, nil, nil) + assert.Nil(t, err) + batchReq := req.ToBatchCommandsRequest() + ctx, cancel := context.WithCancel(context.Background()) + go func() { + time.Sleep(time.Millisecond * time.Duration(rand.Intn(5)+1)) + cancel() + }() + _, err = sendBatchRequest(ctx, addr, "", conn.batchConn, batchReq, time.Second*30) + req.IsRetryRequest = true + } + }() + } + wg.Wait() + // batchSendLoop should not panic. + assert.Equal(t, atomic.LoadInt64(&BatchSendLoopPanicFlag), int64(0)) +} From ed34dd1af606a11511648080cbb2e64dc04cb6e9 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Mon, 16 Oct 2023 20:47:31 +0800 Subject: [PATCH 2/9] add test Signed-off-by: crazycs520 --- internal/client/client_batch.go | 2 +- internal/client/client_test.go | 46 ------------------------ internal/locate/region_request_test.go | 49 ++++++++++++++++++++++++++ 3 files changed, 50 insertions(+), 47 deletions(-) diff --git a/internal/client/client_batch.go b/internal/client/client_batch.go index 1ac64cc9f..8ff7faeb2 100644 --- a/internal/client/client_batch.go +++ b/internal/client/client_batch.go @@ -308,8 +308,8 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) { logutil.BgLogger().Error("batchSendLoop", zap.Any("r", r), zap.Stack("stack")) - logutil.BgLogger().Info("restart batchSendLoop", zap.Int64("cound", BatchSendLoopPanicFlag)) atomic.AddInt64(&BatchSendLoopPanicFlag, 1) + logutil.BgLogger().Info("restart batchSendLoop", zap.Int64("count", atomic.LoadInt64(&BatchSendLoopPanicFlag))) go a.batchSendLoop(cfg) } }() diff --git a/internal/client/client_test.go b/internal/client/client_test.go index 390ac6cf0..2934c1241 100644 --- a/internal/client/client_test.go +++ b/internal/client/client_test.go @@ -723,49 +723,3 @@ func TestBatchClientRecoverAfterServerRestart(t *testing.T) { require.NoError(t, err) } } - -func TestDebugBatchClient(t *testing.T) { - config.UpdateGlobal(func(conf *config.Config) { - conf.TiKVClient.MaxBatchSize = 128 - })() - - server, port := startMockTikvService() - require.True(t, port > 0) - require.True(t, server.IsRunning()) - addr := server.addr - client := NewRPCClient() - defer func() { - err := client.Close() - require.NoError(t, err) - server.Stop() - }() - - var wg sync.WaitGroup - for i := 0; i < 100; i++ { - wg.Add(1) - go func() { - defer wg.Done() - conn, err := client.getConnArray(addr, true) - assert.Nil(t, err) - for j := 0; j < 100; j++ { - req := &tikvrpc.Request{ - Type: tikvrpc.CmdCop, - Req: &coprocessor.Request{}, - } - err = tikvrpc.SetContext(req, nil, nil) - assert.Nil(t, err) - batchReq := req.ToBatchCommandsRequest() - ctx, cancel := context.WithCancel(context.Background()) - go func() { - time.Sleep(time.Millisecond * time.Duration(rand.Intn(5)+1)) - cancel() - }() - _, err = sendBatchRequest(ctx, addr, "", conn.batchConn, batchReq, time.Second*30) - req.IsRetryRequest = true - } - }() - } - wg.Wait() - // batchSendLoop should not panic. - assert.Equal(t, atomic.LoadInt64(&BatchSendLoopPanicFlag), int64(0)) -} diff --git a/internal/locate/region_request_test.go b/internal/locate/region_request_test.go index 6c97be87e..15d71920c 100644 --- a/internal/locate/region_request_test.go +++ b/internal/locate/region_request_test.go @@ -37,8 +37,10 @@ package locate import ( "context" "fmt" + "math/rand" "net" "sync" + "sync/atomic" "testing" "time" "unsafe" @@ -733,3 +735,50 @@ func (s *testRegionRequestToSingleStoreSuite) TestKVReadTimeoutWithDisableBatchC s.True(IsFakeRegionError(regionErr)) s.Equal(0, bo.GetTotalBackoffTimes()) // use kv read timeout will do fast retry, so backoff times should be 0. } + +func (s *testRegionRequestToSingleStoreSuite) TestBatchClientSendLoopPanic() { + config.UpdateGlobal(func(conf *config.Config) { + conf.TiKVClient.MaxBatchSize = 128 + })() + + server, port := mock_server.StartMockTikvService() + s.True(port > 0) + tf := func(s *Store, bo *retry.Backoffer) livenessState { + return reachable + } + s.regionRequestSender.regionCache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf)) + + rpcClient := client.NewRPCClient() + s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { + return rpcClient.SendRequest(ctx, server.Addr(), req, timeout) + }} + defer func() { + rpcClient.Close() + server.Stop() + }() + + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for j := 0; j < 100; j++ { + ctx, cancel := context.WithCancel(context.Background()) + bo := retry.NewBackofferWithVars(ctx, 20000, nil) + region, err := s.cache.LocateRegionByID(bo, s.region) + s.Nil(err) + s.NotNil(region) + go func() { + // mock for kill query execution or timeout. + time.Sleep(time.Millisecond * time.Duration(rand.Intn(5)+1)) + cancel() + }() + req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("a"), Version: 1}) + s.regionRequestSender.SendReq(bo, req, region.Region, time.Second*30) + } + }() + } + wg.Wait() + // batchSendLoop should not panic. + s.Equal(atomic.LoadInt64(&client.BatchSendLoopPanicFlag), int64(0)) +} From 7cbd31a58f04e22d687cb5d5884301ed337227fa Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Mon, 16 Oct 2023 21:00:59 +0800 Subject: [PATCH 3/9] refine test Signed-off-by: crazycs520 --- internal/locate/region_request_test.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/internal/locate/region_request_test.go b/internal/locate/region_request_test.go index 15d71920c..22f6cedff 100644 --- a/internal/locate/region_request_test.go +++ b/internal/locate/region_request_test.go @@ -743,15 +743,14 @@ func (s *testRegionRequestToSingleStoreSuite) TestBatchClientSendLoopPanic() { server, port := mock_server.StartMockTikvService() s.True(port > 0) - tf := func(s *Store, bo *retry.Backoffer) livenessState { - return reachable - } - s.regionRequestSender.regionCache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf)) - rpcClient := client.NewRPCClient() s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { return rpcClient.SendRequest(ctx, server.Addr(), req, timeout) }} + tf := func(s *Store, bo *retry.Backoffer) livenessState { + return reachable + } + defer func() { rpcClient.Close() server.Stop() @@ -764,7 +763,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestBatchClientSendLoopPanic() { defer wg.Done() for j := 0; j < 100; j++ { ctx, cancel := context.WithCancel(context.Background()) - bo := retry.NewBackofferWithVars(ctx, 20000, nil) + bo := retry.NewBackofferWithVars(ctx, int(client.ReadTimeoutShort.Milliseconds()), nil) region, err := s.cache.LocateRegionByID(bo, s.region) s.Nil(err) s.NotNil(region) @@ -774,7 +773,9 @@ func (s *testRegionRequestToSingleStoreSuite) TestBatchClientSendLoopPanic() { cancel() }() req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("a"), Version: 1}) - s.regionRequestSender.SendReq(bo, req, region.Region, time.Second*30) + regionRequestSender := NewRegionRequestSender(s.cache, s.regionRequestSender.client) + regionRequestSender.regionCache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf)) + regionRequestSender.SendReq(bo, req, region.Region, client.ReadTimeoutShort) } }() } From 9d409e6bab0b3725c32a30a934da6966aa977323 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Mon, 16 Oct 2023 21:11:56 +0800 Subject: [PATCH 4/9] try to fix Signed-off-by: crazycs520 --- internal/locate/region_request.go | 5 +++++ internal/locate/region_request_test.go | 2 ++ 2 files changed, 7 insertions(+) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 9c9e613e0..2bc6074cd 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -1459,6 +1459,11 @@ func (s *RegionRequestSender) SendReqCtx( var retry bool resp, retry, err = s.sendReqToRegion(bo, rpcCtx, req, timeout) + if err != nil { + oldCtx := req.Context + req.Context = kvrpcpb.Context{} + req.Context = oldCtx + } req.IsRetryRequest = true if err != nil { msg := fmt.Sprintf("send request failed, err: %v", err.Error()) diff --git a/internal/locate/region_request_test.go b/internal/locate/region_request_test.go index 22f6cedff..cd8ecfc78 100644 --- a/internal/locate/region_request_test.go +++ b/internal/locate/region_request_test.go @@ -737,9 +737,11 @@ func (s *testRegionRequestToSingleStoreSuite) TestKVReadTimeoutWithDisableBatchC } func (s *testRegionRequestToSingleStoreSuite) TestBatchClientSendLoopPanic() { + // This test should use `go test -race` to run. config.UpdateGlobal(func(conf *config.Config) { conf.TiKVClient.MaxBatchSize = 128 })() + atomic.StoreInt64(&client.BatchSendLoopPanicFlag, 0) server, port := mock_server.StartMockTikvService() s.True(port > 0) From 42dc160461bdbd63394aac502719f9ebb492ab00 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Mon, 16 Oct 2023 23:23:20 +0800 Subject: [PATCH 5/9] fix test Signed-off-by: crazycs520 --- internal/locate/region_request.go | 5 ----- internal/locate/region_request_test.go | 2 +- tikvrpc/tikvrpc.go | 5 +++-- 3 files changed, 4 insertions(+), 8 deletions(-) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 2bc6074cd..9c9e613e0 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -1459,11 +1459,6 @@ func (s *RegionRequestSender) SendReqCtx( var retry bool resp, retry, err = s.sendReqToRegion(bo, rpcCtx, req, timeout) - if err != nil { - oldCtx := req.Context - req.Context = kvrpcpb.Context{} - req.Context = oldCtx - } req.IsRetryRequest = true if err != nil { msg := fmt.Sprintf("send request failed, err: %v", err.Error()) diff --git a/internal/locate/region_request_test.go b/internal/locate/region_request_test.go index cd8ecfc78..d432a7ffd 100644 --- a/internal/locate/region_request_test.go +++ b/internal/locate/region_request_test.go @@ -774,7 +774,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestBatchClientSendLoopPanic() { time.Sleep(time.Millisecond * time.Duration(rand.Intn(5)+1)) cancel() }() - req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("a"), Version: 1}) + req := tikvrpc.NewRequest(tikvrpc.CmdCop, &coprocessor.Request{Data: []byte("a"), StartTs: 1}) regionRequestSender := NewRegionRequestSender(s.cache, s.regionRequestSender.client) regionRequestSender.regionCache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf)) regionRequestSender.SendReq(bo, req, region.Region, client.ReadTimeoutShort) diff --git a/tikvrpc/tikvrpc.go b/tikvrpc/tikvrpc.go index 83119c3d3..6a4d34f7c 100644 --- a/tikvrpc/tikvrpc.go +++ b/tikvrpc/tikvrpc.go @@ -807,13 +807,14 @@ func AttachContext(req *Request, ctx *kvrpcpb.Context) bool { // SetContext set the Context field for the given req to the specified ctx. func SetContext(req *Request, region *metapb.Region, peer *metapb.Peer) error { - ctx := &req.Context + // Shallow copy the context to avoid concurrent modification. + ctx := *(&req.Context) if region != nil { ctx.RegionId = region.Id ctx.RegionEpoch = region.RegionEpoch } ctx.Peer = peer - if !AttachContext(req, ctx) { + if !AttachContext(req, &ctx) { return errors.Errorf("invalid request type %v", req.Type) } return nil From 766bd188280d3801d4a5070476970774da3c53ad Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Tue, 17 Oct 2023 10:47:30 +0800 Subject: [PATCH 6/9] fix Signed-off-by: crazycs520 --- internal/locate/region_request_test.go | 5 ++--- tikvrpc/tikvrpc.go | 8 +++++--- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/internal/locate/region_request_test.go b/internal/locate/region_request_test.go index d432a7ffd..972f18f57 100644 --- a/internal/locate/region_request_test.go +++ b/internal/locate/region_request_test.go @@ -741,12 +741,11 @@ func (s *testRegionRequestToSingleStoreSuite) TestBatchClientSendLoopPanic() { config.UpdateGlobal(func(conf *config.Config) { conf.TiKVClient.MaxBatchSize = 128 })() - atomic.StoreInt64(&client.BatchSendLoopPanicFlag, 0) server, port := mock_server.StartMockTikvService() s.True(port > 0) rpcClient := client.NewRPCClient() - s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { + fnClient := &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { return rpcClient.SendRequest(ctx, server.Addr(), req, timeout) }} tf := func(s *Store, bo *retry.Backoffer) livenessState { @@ -775,7 +774,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestBatchClientSendLoopPanic() { cancel() }() req := tikvrpc.NewRequest(tikvrpc.CmdCop, &coprocessor.Request{Data: []byte("a"), StartTs: 1}) - regionRequestSender := NewRegionRequestSender(s.cache, s.regionRequestSender.client) + regionRequestSender := NewRegionRequestSender(s.cache, fnClient) regionRequestSender.regionCache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf)) regionRequestSender.SendReq(bo, req, region.Region, client.ReadTimeoutShort) } diff --git a/tikvrpc/tikvrpc.go b/tikvrpc/tikvrpc.go index 6a4d34f7c..3c36eb990 100644 --- a/tikvrpc/tikvrpc.go +++ b/tikvrpc/tikvrpc.go @@ -807,14 +807,16 @@ func AttachContext(req *Request, ctx *kvrpcpb.Context) bool { // SetContext set the Context field for the given req to the specified ctx. func SetContext(req *Request, region *metapb.Region, peer *metapb.Peer) error { - // Shallow copy the context to avoid concurrent modification. - ctx := *(&req.Context) + ctx := &req.Context if region != nil { ctx.RegionId = region.Id ctx.RegionEpoch = region.RegionEpoch } ctx.Peer = peer - if !AttachContext(req, &ctx) { + + // Shallow copy the context to avoid concurrent modification. + copyCtx := req.Context + if !AttachContext(req, ©Ctx) { return errors.Errorf("invalid request type %v", req.Type) } return nil From ad1e045f1cc6f5fd1ba59b2e90dd6abeffca2852 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Tue, 17 Oct 2023 12:41:56 +0800 Subject: [PATCH 7/9] refine Signed-off-by: crazycs520 --- internal/client/client_batch.go | 8 ++++---- internal/locate/region_request_test.go | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/client/client_batch.go b/internal/client/client_batch.go index 8ff7faeb2..9a3e18db2 100644 --- a/internal/client/client_batch.go +++ b/internal/client/client_batch.go @@ -298,8 +298,8 @@ func (a *batchConn) fetchMorePendingRequests( const idleTimeout = 3 * time.Minute -// BatchSendLoopPanicFlag only used for testing. -var BatchSendLoopPanicFlag = int64(0) +// BatchSendLoopPanicCounter only used for testing. +var BatchSendLoopPanicCounter int64 = 0 func (a *batchConn) batchSendLoop(cfg config.TiKVClient) { defer func() { @@ -308,8 +308,8 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) { logutil.BgLogger().Error("batchSendLoop", zap.Any("r", r), zap.Stack("stack")) - atomic.AddInt64(&BatchSendLoopPanicFlag, 1) - logutil.BgLogger().Info("restart batchSendLoop", zap.Int64("count", atomic.LoadInt64(&BatchSendLoopPanicFlag))) + atomic.AddInt64(&BatchSendLoopPanicCounter, 1) + logutil.BgLogger().Info("restart batchSendLoop", zap.Int64("count", atomic.LoadInt64(&BatchSendLoopPanicCounter))) go a.batchSendLoop(cfg) } }() diff --git a/internal/locate/region_request_test.go b/internal/locate/region_request_test.go index 972f18f57..be7ad625d 100644 --- a/internal/locate/region_request_test.go +++ b/internal/locate/region_request_test.go @@ -782,5 +782,5 @@ func (s *testRegionRequestToSingleStoreSuite) TestBatchClientSendLoopPanic() { } wg.Wait() // batchSendLoop should not panic. - s.Equal(atomic.LoadInt64(&client.BatchSendLoopPanicFlag), int64(0)) + s.Equal(atomic.LoadInt64(&client.BatchSendLoopPanicCounter), int64(0)) } From 6fb470c9a4e7f6f3e6dfbc3d239d73d46906703c Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Tue, 17 Oct 2023 12:57:35 +0800 Subject: [PATCH 8/9] address comment Signed-off-by: crazycs520 --- internal/client/client_batch.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/client/client_batch.go b/internal/client/client_batch.go index 9a3e18db2..95aed9013 100644 --- a/internal/client/client_batch.go +++ b/internal/client/client_batch.go @@ -298,7 +298,7 @@ func (a *batchConn) fetchMorePendingRequests( const idleTimeout = 3 * time.Minute -// BatchSendLoopPanicCounter only used for testing. +// BatchSendLoopPanicCounter is only used for testing. var BatchSendLoopPanicCounter int64 = 0 func (a *batchConn) batchSendLoop(cfg config.TiKVClient) { From 550c9dd0c0ce436b8c972915e078e13f22695f29 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Tue, 17 Oct 2023 15:16:49 +0800 Subject: [PATCH 9/9] address comment Signed-off-by: crazycs520 --- internal/apicodec/codec.go | 15 +++++++-------- tikvrpc/tikvrpc.go | 14 +++++++------- 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/internal/apicodec/codec.go b/internal/apicodec/codec.go index 7945a5a91..54b5840c9 100644 --- a/internal/apicodec/codec.go +++ b/internal/apicodec/codec.go @@ -92,28 +92,27 @@ func attachAPICtx(c Codec, req *tikvrpc.Request) *tikvrpc.Request { // Shallow copy the request to avoid concurrent modification. r := *req - ctx := &r.Context - ctx.ApiVersion = c.GetAPIVersion() - ctx.KeyspaceId = uint32(c.GetKeyspaceID()) + r.Context.ApiVersion = c.GetAPIVersion() + r.Context.KeyspaceId = uint32(c.GetKeyspaceID()) switch r.Type { case tikvrpc.CmdMPPTask: mpp := *r.DispatchMPPTask() // Shallow copy the meta to avoid concurrent modification. meta := *mpp.Meta - meta.KeyspaceId = ctx.KeyspaceId - meta.ApiVersion = ctx.ApiVersion + meta.KeyspaceId = r.Context.KeyspaceId + meta.ApiVersion = r.Context.ApiVersion mpp.Meta = &meta r.Req = &mpp case tikvrpc.CmdCompact: compact := *r.Compact() - compact.KeyspaceId = ctx.KeyspaceId - compact.ApiVersion = ctx.ApiVersion + compact.KeyspaceId = r.Context.KeyspaceId + compact.ApiVersion = r.Context.ApiVersion r.Req = &compact } - tikvrpc.AttachContext(&r, ctx) + tikvrpc.AttachContext(&r, r.Context) return &r } diff --git a/tikvrpc/tikvrpc.go b/tikvrpc/tikvrpc.go index 3c36eb990..13e857064 100644 --- a/tikvrpc/tikvrpc.go +++ b/tikvrpc/tikvrpc.go @@ -709,7 +709,9 @@ type MPPStreamResponse struct { // AttachContext sets the request context to the request, // return false if encounter unknown request type. -func AttachContext(req *Request, ctx *kvrpcpb.Context) bool { +// Parameter `rpcCtx` use `kvrpcpb.Context` instead of `*kvrpcpb.Context` to avoid concurrent modification by shallow copy. +func AttachContext(req *Request, rpcCtx kvrpcpb.Context) bool { + ctx := &rpcCtx switch req.Type { case CmdGet: req.Get().Context = ctx @@ -807,16 +809,14 @@ func AttachContext(req *Request, ctx *kvrpcpb.Context) bool { // SetContext set the Context field for the given req to the specified ctx. func SetContext(req *Request, region *metapb.Region, peer *metapb.Peer) error { - ctx := &req.Context if region != nil { - ctx.RegionId = region.Id - ctx.RegionEpoch = region.RegionEpoch + req.Context.RegionId = region.Id + req.Context.RegionEpoch = region.RegionEpoch } - ctx.Peer = peer + req.Context.Peer = peer // Shallow copy the context to avoid concurrent modification. - copyCtx := req.Context - if !AttachContext(req, ©Ctx) { + if !AttachContext(req, req.Context) { return errors.Errorf("invalid request type %v", req.Type) } return nil