Skip to content

Commit

Permalink
etcd/client(ticdc): add retry operation for etcd transaction api (pin…
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Jan 27, 2022
1 parent fd26702 commit 730e41e
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 14 deletions.
10 changes: 10 additions & 0 deletions pkg/etcd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,16 @@ func (c *Client) Txn(ctx context.Context) clientv3.Txn {
return c.cli.Txn(ctx)
}

// Txn delegates request to clientv3.KV.Txn
func (c *Client) Txn1(ctx context.Context, cmps []clientv3.Cmp, opsThen, opsElse []clientv3.Op) (resp *clientv3.TxnResponse, err error) {
err = retryRPC(EtcdTxn, c.metrics[EtcdTxn], func() error {
var inErr error
resp, inErr = c.cli.Txn(ctx).If(cmps...).Then(opsThen...).Else(opsElse...).Commit()
return inErr
})
return
}

// Grant delegates request to clientv3.Lease.Grant
func (c *Client) Grant(ctx context.Context, ttl int64) (resp *clientv3.LeaseGrantResponse, err error) {
err = retryRPC(EtcdGrant, c.metrics[EtcdGrant], func() error {
Expand Down
20 changes: 15 additions & 5 deletions pkg/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,12 +304,17 @@ func (c CDCEtcdClient) CreateChangefeedInfo(ctx context.Context, info *model.Cha
if err != nil {
return errors.Trace(err)
}
resp, err := c.Client.Txn(ctx).If(

cmps := []clientv3.Cmp{
clientv3.Compare(clientv3.ModRevision(infoKey), "=", 0),
clientv3.Compare(clientv3.ModRevision(jobKey), "=", 0),
).Then(
}
opsThen := []clientv3.Op{
clientv3.OpPut(infoKey, value),
).Commit()
}
opsElse := make([]clientv3.Op, 0)
resp, err := c.Client.Txn1(ctx, cmps, opsThen, opsElse)

if err != nil {
return cerror.WrapError(cerror.ErrPDEtcdAPIError, err)
}
Expand Down Expand Up @@ -504,10 +509,15 @@ func (c CDCEtcdClient) PutTaskPositionOnChange(
}

key := GetEtcdKeyTaskPosition(changefeedID, captureID)
resp, err := c.Client.Txn(ctx).If(
cmps := []clientv3.Cmp{
clientv3.Compare(clientv3.ModRevision(key), ">", 0),
clientv3.Compare(clientv3.Value(key), "=", data),
).Else(clientv3.OpPut(key, data)).Commit()
}
opsThen := make([]clientv3.Op, 0)
opsElse := []clientv3.Op{
clientv3.OpPut(key, data),
}
resp, err := c.Client.Txn1(ctx, cmps, opsThen, opsElse)
if err != nil {
return false, cerror.WrapError(cerror.ErrPDEtcdAPIError, err)
}
Expand Down
23 changes: 14 additions & 9 deletions pkg/orchestrator/etcd_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,10 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session,
if len(pendingPatches) > 0 {
// Here we have some patches yet to be uploaded to Etcd.
pendingPatches, err = worker.applyPatchGroups(ctx, pendingPatches)
if isRetryableError(err) {
continue
}
if err != nil {
// etcd client 报错为 ErrTimeoutDueToLeaderFail 时也应该 continue 重试
if cerrors.ErrEtcdTryAgain.Equal(errors.Cause(err)) {
continue
}
return errors.Trace(err)
}
} else {
Expand Down Expand Up @@ -258,6 +257,10 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session,
}
}

func isRetryableError(err error) bool {
return cerrors.ErrEtcdTryAgain.Equal(errors.Cause(err))
}

func (worker *EtcdWorker) handleEvent(_ context.Context, event *clientv3.Event) {
if worker.isDeleteCounterKey(event.Kv.Key) {
switch event.Type {
Expand Down Expand Up @@ -352,7 +355,8 @@ func (worker *EtcdWorker) commitChangedState(ctx context.Context, changedState m
}

cmps := make([]clientv3.Cmp, 0, len(changedState))
ops := make([]clientv3.Op, 0, len(changedState))
opsThen := make([]clientv3.Op, 0, len(changedState))
opsElse := make([]clientv3.Op, 0)
hasDelete := false

for key, value := range changedState {
Expand All @@ -374,11 +378,11 @@ func (worker *EtcdWorker) commitChangedState(ctx context.Context, changedState m
op = clientv3.OpDelete(key.String())
hasDelete = true
}
ops = append(ops, op)
opsThen = append(opsThen, op)
}

if hasDelete {
ops = append(ops, clientv3.OpPut(worker.prefix.String()+deletionCounterKey, fmt.Sprint(worker.deleteCounter+1)))
opsThen = append(opsThen, clientv3.OpPut(worker.prefix.String()+deletionCounterKey, fmt.Sprint(worker.deleteCounter+1)))
}
if worker.deleteCounter > 0 {
cmps = append(cmps, clientv3.Compare(clientv3.Value(worker.prefix.String()+deletionCounterKey), "=", fmt.Sprint(worker.deleteCounter)))
Expand All @@ -392,7 +396,8 @@ func (worker *EtcdWorker) commitChangedState(ctx context.Context, changedState m
startTime := time.Now()

txnCtx, cancel := context.WithTimeout(ctx, etcdTxnTimeoutDuration)
resp, err := worker.client.Txn(txnCtx).If(cmps...).Then(ops...).Commit()
resp, err := worker.client.Txn1(txnCtx, cmps, opsThen, opsElse)
// resp, err = worker.client.Txn(txnCtx).If(cmps...).Then(opsThen...).Commit()
cancel()

// For testing the situation where we have a progress notification that
Expand All @@ -412,7 +417,7 @@ func (worker *EtcdWorker) commitChangedState(ctx context.Context, changedState m
return errors.Trace(err)
}

logEtcdOps(ops, resp.Succeeded)
logEtcdOps(opsThen, resp.Succeeded)
if resp.Succeeded {
worker.barrierRev = resp.Header.GetRevision()
return nil
Expand Down

0 comments on commit 730e41e

Please sign in to comment.