Skip to content

Commit

Permalink
Merge branch 'master' into temporary-pessimistic-lock
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao committed May 25, 2021
2 parents 3cc0c79 + ab5cf85 commit 6364a82
Show file tree
Hide file tree
Showing 24 changed files with 623 additions and 539 deletions.
4 changes: 3 additions & 1 deletion store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,8 @@ func (b *batchCopIterator) retryBatchCopTask(ctx context.Context, bo *Backoffer,
return buildBatchCopTasks(bo, b.store.GetRegionCache(), tikv.NewKeyRanges(ranges), b.req.StoreType)
}

const readTimeoutUltraLong = 3600 * time.Second // For requests that may scan many regions for tiflash.

func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *Backoffer, task *batchCopTask) ([]*batchCopTask, error) {
sender := tikv.NewRegionBatchRequestSender(b.store.GetRegionCache(), b.store.GetTiKVClient())
var regionInfos = make([]*coprocessor.RegionInfo, 0, len(task.regionInfos))
Expand Down Expand Up @@ -525,7 +527,7 @@ func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *Backoffer, ta
req.StoreTp = tikvrpc.TiFlash

logutil.BgLogger().Debug("send batch request to ", zap.String("req info", req.String()), zap.Int("cop task len", len(task.regionInfos)))
resp, retry, cancel, err := sender.SendReqToAddr(bo.TiKVBackoffer(), task.ctx, task.regionInfos, req, tikv.ReadTimeoutUltraLong)
resp, retry, cancel, err := sender.SendReqToAddr(bo.TiKVBackoffer(), task.ctx, task.regionInfos, req, readTimeoutUltraLong)
// If there are store errors, we should retry for all regions.
if retry {
return b.retryBatchCopTask(ctx, bo, task)
Expand Down
2 changes: 1 addition & 1 deletion store/copr/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func (m *mppIterator) establishMPPConns(bo *Backoffer, req *kv.MPPDispatchReques

// Drain result from root task.
// We don't need to process any special error. When we meet errors, just let it fail.
rpcResp, err := m.store.GetTiKVClient().SendRequest(bo.GetCtx(), req.Meta.GetAddress(), wrappedReq, tikv.ReadTimeoutUltraLong)
rpcResp, err := m.store.GetTiKVClient().SendRequest(bo.GetCtx(), req.Meta.GetAddress(), wrappedReq, readTimeoutUltraLong)

if err != nil {
logutil.BgLogger().Error("establish mpp connection meet error", zap.String("error", err.Error()))
Expand Down
1 change: 0 additions & 1 deletion store/driver/sql_fail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ func (s *testSQLSerialSuite) TestFailBusyServerCop(c *C) {
}

func TestMain(m *testing.M) {
tikv.ReadTimeoutMedium = 2 * time.Second
os.Exit(m.Run())
}

Expand Down
16 changes: 11 additions & 5 deletions store/gcworker/gc_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,12 @@ var gcVariableComments = map[string]string{
gcScanLockModeKey: "Mode of scanning locks, \"physical\" or \"legacy\"",
}

const (
unsafeDestroyRangeTimeout = 5 * time.Minute
accessLockObserverTimeout = 10 * time.Second
gcTimeout = 5 * time.Minute
)

func (w *GCWorker) start(ctx context.Context, wg *sync.WaitGroup) {
logutil.Logger(ctx).Info("[gc worker] start",
zap.String("uuid", w.uuid))
Expand Down Expand Up @@ -808,7 +814,7 @@ func (w *GCWorker) doUnsafeDestroyRangeRequest(ctx context.Context, startKey []b
go func() {
defer wg.Done()

resp, err1 := w.tikvStore.GetTiKVClient().SendRequest(ctx, address, req, tikv.UnsafeDestroyRangeTimeout)
resp, err1 := w.tikvStore.GetTiKVClient().SendRequest(ctx, address, req, unsafeDestroyRangeTimeout)
if err1 == nil {
if resp == nil || resp.Resp == nil {
err1 = errors.Errorf("unsafe destroy range returns nil response from store %v", storeID)
Expand Down Expand Up @@ -1272,7 +1278,7 @@ func (w *GCWorker) registerLockObservers(ctx context.Context, safePoint uint64,
for _, store := range stores {
address := store.Address

resp, err := w.tikvStore.GetTiKVClient().SendRequest(ctx, address, req, tikv.AccessLockObserverTimeout)
resp, err := w.tikvStore.GetTiKVClient().SendRequest(ctx, address, req, accessLockObserverTimeout)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -1312,7 +1318,7 @@ func (w *GCWorker) checkLockObservers(ctx context.Context, safePoint uint64, sto
for _, store := range stores {
address := store.Address

resp, err := w.tikvStore.GetTiKVClient().SendRequest(ctx, address, req, tikv.AccessLockObserverTimeout)
resp, err := w.tikvStore.GetTiKVClient().SendRequest(ctx, address, req, accessLockObserverTimeout)
if err != nil {
logError(store, err)
continue
Expand Down Expand Up @@ -1378,7 +1384,7 @@ func (w *GCWorker) removeLockObservers(ctx context.Context, safePoint uint64, st
for _, store := range stores {
address := store.Address

resp, err := w.tikvStore.GetTiKVClient().SendRequest(ctx, address, req, tikv.AccessLockObserverTimeout)
resp, err := w.tikvStore.GetTiKVClient().SendRequest(ctx, address, req, accessLockObserverTimeout)
if err != nil {
logError(store, err)
continue
Expand Down Expand Up @@ -1600,7 +1606,7 @@ func (w *GCWorker) doGCForRegion(bo *tikv.Backoffer, safePoint uint64, region ti
SafePoint: safePoint,
})

resp, err := w.tikvStore.SendReq(bo, req, region, tikv.GCTimeout)
resp, err := w.tikvStore.SendReq(bo, req, region, gcTimeout)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
3 changes: 2 additions & 1 deletion store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/failpoint"
pb "github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/store/tikv/client"
"github.com/pingcap/tidb/store/tikv/config"
tikverr "github.com/pingcap/tidb/store/tikv/error"
"github.com/pingcap/tidb/store/tikv/kv"
Expand Down Expand Up @@ -794,7 +795,7 @@ func sendTxnHeartBeat(bo *Backoffer, store *KVStore, primary []byte, startTS, tt
if err != nil {
return 0, errors.Trace(err)
}
resp, err := store.SendReq(bo, req, loc.Region, ReadTimeoutShort)
resp, err := store.SendReq(bo, req, loc.Region, client.ReadTimeoutShort)
if err != nil {
return 0, errors.Trace(err)
}
Expand Down
3 changes: 2 additions & 1 deletion store/tikv/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package tikv
import (
"github.com/pingcap/errors"
pb "github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/store/tikv/client"
"github.com/pingcap/tidb/store/tikv/logutil"
"github.com/pingcap/tidb/store/tikv/metrics"
"github.com/pingcap/tidb/store/tikv/retry"
Expand All @@ -41,7 +42,7 @@ func (actionCleanup) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batc
Keys: batch.mutations.GetKeys(),
StartVersion: c.startTS,
}, pb.Context{Priority: c.priority, SyncLog: c.syncLog, ResourceGroupTag: c.resourceGroupTag})
resp, err := c.store.SendReq(bo, req, batch.region, ReadTimeoutShort)
resp, err := c.store.SendReq(bo, req, batch.region, client.ReadTimeoutShort)
if err != nil {
return errors.Trace(err)
}
Expand Down
Loading

0 comments on commit 6364a82

Please sign in to comment.