Skip to content

Commit

Permalink
lightning: fix pd retry and add ut for it (#43432)
Browse files Browse the repository at this point in the history
close #43400
  • Loading branch information
lichunzhu authored Apr 27, 2023
1 parent 9b9796f commit 0b11444
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 16 deletions.
8 changes: 3 additions & 5 deletions br/pkg/lightning/common/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"database/sql"
"database/sql/driver"
goerrors "errors"
"io"
"net"
"os"
Expand Down Expand Up @@ -101,11 +102,8 @@ func isSingleRetryableError(err error) bool {
if nerr.Timeout() {
return true
}
if cause, ok := nerr.(*net.OpError); ok {
syscallErr, ok := cause.Unwrap().(*os.SyscallError)
if ok {
return syscallErr.Err == syscall.ECONNREFUSED || syscallErr.Err == syscall.ECONNRESET
}
if syscallErr, ok := goerrors.Unwrap(err).(*os.SyscallError); ok {
return syscallErr.Err == syscall.ECONNREFUSED || syscallErr.Err == syscall.ECONNRESET
}
return false
case *mysql.MySQLError:
Expand Down
40 changes: 30 additions & 10 deletions br/pkg/pdutil/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@ import (
"fmt"
"io"
"math"
"net"
"net/http"
"net/url"
"os"
"strings"
"syscall"
"time"

"github.com/coreos/go-semver/semver"
Expand Down Expand Up @@ -165,22 +168,39 @@ func pdRequestWithCode(
if err != nil {
return 0, nil, errors.Trace(err)
}
resp, err := cli.Do(req)
if err != nil {
return 0, nil, errors.Trace(err)
}
var resp *http.Response
count := 0
for {
resp, err = cli.Do(req) //nolint:bodyclose
count++
if count > pdRequestRetryTime || resp.StatusCode < 500 {
failpoint.Inject("InjectClosed", func(v failpoint.Value) {
if failType, ok := v.(int); ok && count <= pdRequestRetryTime-1 {
resp = nil
switch failType {
case 0:
err = &net.OpError{
Op: "read",
Err: os.NewSyscallError("connect", syscall.ECONNREFUSED),
}
default:
err = &url.Error{
Op: "read",
Err: os.NewSyscallError("connect", syscall.ECONNREFUSED),
}
}
}
})
if count > pdRequestRetryTime || (resp != nil && resp.StatusCode < 500) ||
(err != nil && !common.IsRetryableError(err)) {
break
}
_ = resp.Body.Close()
time.Sleep(pdRequestRetryInterval())
resp, err = cli.Do(req)
if err != nil {
return 0, nil, errors.Trace(err)
if resp != nil {
_ = resp.Body.Close()
}
time.Sleep(pdRequestRetryInterval())
}
if err != nil {
return 0, nil, errors.Trace(err)
}
defer func() {
_ = resp.Body.Close()
Expand Down
26 changes: 25 additions & 1 deletion br/pkg/pdutil/pd_serial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,34 @@ func TestPDRequestRetry(t *testing.T) {
}
w.WriteHeader(http.StatusOK)
}))
defer ts.Close()
taddr = ts.URL
_, reqErr = pdRequest(ctx, taddr, "", cli, http.MethodGet, nil)
require.Error(t, reqErr)
ts.Close()

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/pdutil/InjectClosed",
fmt.Sprintf("return(%d)", 0)))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/br/pkg/pdutil/InjectClosed"))
}()
ts = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
taddr = ts.URL
_, reqErr = pdRequest(ctx, taddr, "", cli, http.MethodGet, nil)
require.NoError(t, reqErr)
ts.Close()

require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/br/pkg/pdutil/InjectClosed"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/pdutil/InjectClosed",
fmt.Sprintf("return(%d)", 1)))
ts = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
defer ts.Close()
taddr = ts.URL
_, reqErr = pdRequest(ctx, taddr, "", cli, http.MethodGet, nil)
require.NoError(t, reqErr)
}

func TestPDResetTSCompatibility(t *testing.T) {
Expand Down

0 comments on commit 0b11444

Please sign in to comment.