Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txn: fix some logs and assumptions are inaccurate when the async commit protocol is used #24140

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1034,6 +1034,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
// RPCs fails. However, if there are multiple errors and some of the errors
// are not RPC failures, we can return the actual error instead of undetermined.
if undeterminedErr := c.getUndeterminedErr(); undeterminedErr != nil {
c.mu.committed = true
logutil.Logger(ctx).Error("2PC commit result undetermined",
zap.Error(err),
zap.NamedError("rpcErr", undeterminedErr),
Expand Down Expand Up @@ -1177,6 +1178,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
if c.isAsyncCommit() {
// For async commit protocol, the commit is considered success here.
c.txn.commitTS = c.commitTS
c.mu.committed = true
logutil.Logger(ctx).Debug("2PC will use async commit protocol to commit this txn",
zap.Uint64("startTS", c.startTS), zap.Uint64("commitTS", c.commitTS),
zap.Uint64("sessionID", c.sessionID))
Expand Down
57 changes: 34 additions & 23 deletions store/tikv/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
pb "github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/store/tikv/client"
tikverr "github.com/pingcap/tidb/store/tikv/error"
"github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/logutil"
"github.com/pingcap/tidb/store/tikv/metrics"
"github.com/pingcap/tidb/store/tikv/retry"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
Expand All @@ -47,10 +45,10 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch
StartVersion: c.startTS,
Keys: keys,
CommitVersion: c.commitTS,
}, pb.Context{Priority: c.priority, SyncLog: c.syncLog, ResourceGroupTag: c.resourceGroupTag})
}, pb.Context{Priority: c.priority, SyncLog: c.syncLog})

sender := NewRegionRequestSender(c.store.regionCache, c.store.GetTiKVClient())
resp, err := sender.SendReq(bo, req, batch.region, client.ReadTimeoutShort)
sender := NewRegionRequestSender(c.store.regionCache, c.store.client)
resp, err := sender.SendReq(bo, req, batch.region, ReadTimeoutShort)

// If we fail to receive response for the request that commits primary key, it will be undetermined whether this
// transaction has been successfully committed.
Expand All @@ -69,7 +67,7 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch
return errors.Trace(err)
}
if regionErr != nil {
err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
err = bo.Backoff(BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return errors.Trace(err)
}
Expand All @@ -78,7 +76,7 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch
return errors.Trace(err)
}
if resp.Resp == nil {
return errors.Trace(tikverr.ErrBodyMissing)
return errors.Trace(kv.ErrBodyMissing)
}
commitResp := resp.Resp.(*pb.CommitResponse)
// Here we can make sure tikv has processed the commit primary key request. So
Expand All @@ -88,7 +86,7 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch
}
if keyErr := commitResp.GetError(); keyErr != nil {
if rejected := keyErr.GetCommitTsExpired(); rejected != nil {
logutil.Logger(bo.GetCtx()).Info("2PC commitTS rejected by TiKV, retry with a newer commitTS",
logutil.Logger(bo.ctx).Info("2PC commitTS rejected by TiKV, retry with a newer commitTS",
zap.Uint64("txnStartTS", c.startTS),
zap.Stringer("info", logutil.Hex(rejected)))

Expand All @@ -101,9 +99,9 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch
}

// Update commit ts and retry.
commitTS, err := c.store.getTimestampWithRetry(bo, c.txn.GetScope())
commitTS, err := c.store.getTimestampWithRetry(bo, c.txn.GetUnionStore().GetOption(kv.TxnScope).(string))
if err != nil {
logutil.Logger(bo.GetCtx()).Warn("2PC get commitTS failed",
logutil.Logger(bo.ctx).Warn("2PC get commitTS failed",
zap.Error(err),
zap.Uint64("txnStartTS", c.startTS))
return errors.Trace(err)
Expand All @@ -128,20 +126,33 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch
}
return res
}
logutil.Logger(bo.GetCtx()).Error("2PC failed commit key after primary key committed",
zap.Error(err),
zap.Uint64("txnStartTS", c.startTS),
zap.Uint64("commitTS", c.commitTS),
zap.Strings("keys", hexBatchKeys(keys)))
if c.isAsyncCommit() {
logutil.Logger(bo.ctx).Error("2PC failed commit secondary key",
zap.Error(err),
zap.Uint64("txnStartTS", c.startTS),
zap.Uint64("commitTS", c.commitTS),
zap.Strings("keys", hexBatchKeys(keys)))
} else {
logutil.Logger(bo.ctx).Error("2PC failed commit key after primary key committed",
zap.Error(err),
zap.Uint64("txnStartTS", c.startTS),
zap.Uint64("commitTS", c.commitTS),
zap.Strings("keys", hexBatchKeys(keys)))
}
return errors.Trace(err)
}
// The transaction maybe rolled back by concurrent transactions.
logutil.Logger(bo.GetCtx()).Debug("2PC failed commit primary key",
zap.Error(err),
zap.Uint64("txnStartTS", c.startTS))
if batch.isPrimary {
// The transaction maybe rolled back by concurrent transactions.
logutil.Logger(bo.ctx).Debug("2PC failed commit primary key",
zap.Error(err),
zap.Uint64("txnStartTS", c.startTS))
} else {
logutil.Logger(bo.ctx).Debug("2PC failed commit secondary key",
zap.Error(err),
zap.Uint64("txnStartTS", c.startTS))
}
return err
}

c.mu.Lock()
defer c.mu.Unlock()
// Group that contains primary key is always the first.
Expand All @@ -151,10 +162,10 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch
}

func (c *twoPhaseCommitter) commitMutations(bo *Backoffer, mutations CommitterMutations) error {
if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil {
if span := opentracing.SpanFromContext(bo.ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("twoPhaseCommitter.commitMutations", opentracing.ChildOf(span.Context()))
defer span1.Finish()
bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1))
bo.ctx = opentracing.ContextWithSpan(bo.ctx, span1)
}

return c.doActionOnMutations(bo, actionCommit{}, mutations)
Expand Down