Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: wait for binlog recovering when using HTTP API #13740

Merged
merged 16 commits into from
Dec 3, 2019
28 changes: 25 additions & 3 deletions server/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,12 @@ const (
)

// For query string
const qTableID = "table_id"
const qLimit = "limit"
const (
qTableID = "table_id"
qLimit = "limit"
qOperation = "op"
qSeconds = "seconds"
)

const (
headerContentType = "Content-Type"
Expand Down Expand Up @@ -669,7 +673,25 @@ func (h configReloadHandler) ServeHTTP(w http.ResponseWriter, req *http.Request)

// ServeHTTP recovers binlog service.
func (h binlogRecover) ServeHTTP(w http.ResponseWriter, req *http.Request) {
binloginfo.DisableSkipBinlogFlag()
op := req.FormValue(qOperation)
if op == "reset" {
binloginfo.ResetSkippedCommitterCounter()
} else if op == "nowait" {
binloginfo.DisableSkipBinlogFlag()
} else {
sec, err := strconv.ParseInt(req.FormValue(qSeconds), 10, 64)
if sec <= 0 || err != nil {
sec = 1800
}
binloginfo.DisableSkipBinlogFlag()
timeout := time.Duration(sec) * time.Second
err = binloginfo.WaitBinlogRecover(timeout)
if err != nil {
writeError(w, err)
return
}
}
writeData(w, "success!")
}

type tableFlashReplicaInfo struct {
Expand Down
63 changes: 63 additions & 0 deletions server/http_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/helper"
Expand Down Expand Up @@ -237,6 +238,68 @@ func (ts *HTTPHandlerTestSuite) TestGetRegionByIDWithError(c *C) {
defer resp.Body.Close()
}

func (ts *HTTPHandlerTestSuite) TestBinlogRecover(c *C) {
ts.startServer(c)
defer ts.stopServer(c)
binloginfo.EnableSkipBinlogFlag()
c.Assert(binloginfo.IsBinlogSkipped(), Equals, true)
resp, err := http.Get(fmt.Sprintf("http://127.0.0.1:10090/binlog/recover"))
c.Assert(err, IsNil)
defer resp.Body.Close()
c.Assert(resp.StatusCode, Equals, http.StatusOK)
c.Assert(binloginfo.IsBinlogSkipped(), Equals, false)

// Invalid operation will use the default operation.
binloginfo.EnableSkipBinlogFlag()
c.Assert(binloginfo.IsBinlogSkipped(), Equals, true)
resp, err = http.Get(fmt.Sprintf("http://127.0.0.1:10090/binlog/recover?op=abc"))
c.Assert(err, IsNil)
defer resp.Body.Close()
c.Assert(resp.StatusCode, Equals, http.StatusOK)
c.Assert(binloginfo.IsBinlogSkipped(), Equals, false)

binloginfo.EnableSkipBinlogFlag()
c.Assert(binloginfo.IsBinlogSkipped(), Equals, true)
resp, err = http.Get(fmt.Sprintf("http://127.0.0.1:10090/binlog/recover?op=abc&seconds=1"))
c.Assert(err, IsNil)
defer resp.Body.Close()
c.Assert(resp.StatusCode, Equals, http.StatusOK)
c.Assert(binloginfo.IsBinlogSkipped(), Equals, false)

binloginfo.EnableSkipBinlogFlag()
c.Assert(binloginfo.IsBinlogSkipped(), Equals, true)
binloginfo.AddOneSkippedCommitter()
resp, err = http.Get(fmt.Sprintf("http://127.0.0.1:10090/binlog/recover?op=abc&seconds=1"))
c.Assert(err, IsNil)
defer resp.Body.Close()
c.Assert(resp.StatusCode, Equals, http.StatusBadRequest)
c.Assert(binloginfo.IsBinlogSkipped(), Equals, false)
binloginfo.RemoveOneSkippedCommitter()

binloginfo.AddOneSkippedCommitter()
c.Assert(binloginfo.SkippedCommitterCount(), Equals, int32(1))
resp, err = http.Get(fmt.Sprintf("http://127.0.0.1:10090/binlog/recover?op=reset"))
c.Assert(err, IsNil)
defer resp.Body.Close()
c.Assert(resp.StatusCode, Equals, http.StatusOK)
c.Assert(binloginfo.SkippedCommitterCount(), Equals, int32(0))

binloginfo.EnableSkipBinlogFlag()
resp, err = http.Get(fmt.Sprintf("http://127.0.0.1:10090/binlog/recover?op=nowait"))
c.Assert(err, IsNil)
defer resp.Body.Close()
c.Assert(resp.StatusCode, Equals, http.StatusOK)
c.Assert(binloginfo.IsBinlogSkipped(), Equals, false)

// Only the first should work.
binloginfo.EnableSkipBinlogFlag()
resp, err = http.Get(fmt.Sprintf("http://127.0.0.1:10090/binlog/recover?op=nowait&op=reset"))
c.Assert(err, IsNil)
defer resp.Body.Close()
c.Assert(resp.StatusCode, Equals, http.StatusOK)
c.Assert(binloginfo.IsBinlogSkipped(), Equals, false)
}

func (ts *HTTPHandlerTestSuite) TestRegionsFromMeta(c *C) {
ts.startServer(c)
defer ts.stopServer(c)
Expand Down
88 changes: 81 additions & 7 deletions sessionctx/binloginfo/binloginfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,70 @@ var statusListener = func(_ BinlogStatus) error {
return nil
}

// EnableSkipBinlogFlag enables the skipBinlog flag.
// NOTE: it is used *ONLY* for test.
func EnableSkipBinlogFlag() {
atomic.StoreUint32(&skipBinlog, 1)
logutil.BgLogger().Warn("[binloginfo] enable the skipBinlog flag")
}

// DisableSkipBinlogFlag disable the skipBinlog flag.
func DisableSkipBinlogFlag() {
atomic.StoreUint32(&skipBinlog, 0)
logutil.BgLogger().Warn("[binloginfo] disable the skipBinlog flag")
}

// IsBinlogSkipped gets the skipBinlog flag.
func IsBinlogSkipped() bool {
return atomic.LoadUint32(&skipBinlog) > 0
}

var skippedCommitterCounter int32

// WaitBinlogRecover returns when all committing transaction finished.
func WaitBinlogRecover(timeout time.Duration) error {
logutil.BgLogger().Warn("[binloginfo] start waiting for binlog recovering")
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
start := time.Now()
for {
select {
case <-ticker.C:
if atomic.LoadInt32(&skippedCommitterCounter) == 0 {
logutil.BgLogger().Warn("[binloginfo] binlog recovered")
return nil
}
if time.Since(start) > timeout {
logutil.BgLogger().Warn("[binloginfo] waiting for binlog recovering timed out",
zap.Duration("duration", timeout))
return errors.New("timeout")
}
}
}
}

// SkippedCommitterCount returns the number of alive committers whick skipped the binlog writing.
// NOTE: it is used *ONLY* for test.
func SkippedCommitterCount() int32 {
return atomic.LoadInt32(&skippedCommitterCounter)
}

// ResetSkippedCommitterCounter is used to reset the skippedCommitterCounter.
func ResetSkippedCommitterCounter() {
atomic.StoreInt32(&skippedCommitterCounter, 0)
logutil.BgLogger().Warn("[binloginfo] skippedCommitterCounter is reset to 0")
}

// AddOneSkippedCommitter adds one committer to skippedCommitterCounter.
func AddOneSkippedCommitter() {
atomic.AddInt32(&skippedCommitterCounter, 1)
}

// RemoveOneSkippedCommitter removes one committer from skippedCommitterCounter.
func RemoveOneSkippedCommitter() {
atomic.AddInt32(&skippedCommitterCounter, -1)
}

// SetIgnoreError sets the ignoreError flag, this function called when TiDB start
// up and find config.Binlog.IgnoreError is true.
func SetIgnoreError(on bool) {
Expand Down Expand Up @@ -146,16 +204,32 @@ func RegisterStatusListener(listener func(BinlogStatus) error) {
statusListener = listener
}

// WriteResult is used for the returned chan of WriteBinlog.
type WriteResult struct {
skipped bool
err error
}

// Skipped if true stands for the binlog writing is skipped.
func (wr *WriteResult) Skipped() bool {
return wr.skipped
}

// GetError gets the error of WriteBinlog.
func (wr *WriteResult) GetError() error {
return wr.err
}

// WriteBinlog writes a binlog to Pump.
func (info *BinlogInfo) WriteBinlog(clusterID uint64) error {
func (info *BinlogInfo) WriteBinlog(clusterID uint64) *WriteResult {
skip := atomic.LoadUint32(&skipBinlog)
if skip > 0 {
metrics.CriticalErrorCounter.Add(1)
return nil
return &WriteResult{true, nil}
}

if info.Client == nil {
return errors.New("pumps client is nil")
return &WriteResult{false, errors.New("pumps client is nil")}
}

// it will retry in PumpsClient if write binlog fail.
Expand All @@ -176,18 +250,18 @@ func (info *BinlogInfo) WriteBinlog(clusterID uint64) error {
logutil.BgLogger().Warn("update binlog status failed", zap.Error(err))
}
}
return nil
return &WriteResult{true, nil}
}

if strings.Contains(err.Error(), "received message larger than max") {
// This kind of error is not critical, return directly.
return errors.Errorf("binlog data is too large (%s)", err.Error())
return &WriteResult{false, errors.Errorf("binlog data is too large (%s)", err.Error())}
}

return terror.ErrCritical.GenWithStackByArgs(err)
return &WriteResult{false, terror.ErrCritical.GenWithStackByArgs(err)}
}

return nil
return &WriteResult{false, nil}
}

// SetDDLBinlog sets DDL binlog in the kv.Transaction.
Expand Down
3 changes: 2 additions & 1 deletion sessionctx/binloginfo/binloginfo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,8 @@ func (s *testBinlogSuite) TestMaxRecvSize(c *C) {
},
Client: s.client,
}
err := info.WriteBinlog(1)
binlogWR := info.WriteBinlog(1)
err := binlogWR.GetError()
c.Assert(err, NotNil)
c.Assert(terror.ErrCritical.Equal(err), IsFalse, Commentf("%v", err))
}
Expand Down
51 changes: 30 additions & 21 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1021,19 +1021,9 @@ func (c *twoPhaseCommitter) pessimisticRollbackKeys(bo *Backoffer, keys [][]byte
return c.doActionOnKeys(bo, actionPessimisticRollback{}, keys)
}

func (c *twoPhaseCommitter) executeAndWriteFinishBinlog(ctx context.Context) error {
err := c.execute(ctx)
if err != nil {
c.writeFinishBinlog(ctx, binlog.BinlogType_Rollback, 0)
} else {
c.txn.commitTS = c.commitTS
c.writeFinishBinlog(ctx, binlog.BinlogType_Commit, int64(c.commitTS))
}
return errors.Trace(err)
}

// execute executes the two-phase commit protocol.
func (c *twoPhaseCommitter) execute(ctx context.Context) error {
func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
var binlogSkipped bool
defer func() {
// Always clean up all written keys if the txn does not commit.
c.mu.RLock()
Expand All @@ -1057,12 +1047,22 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error {
c.cleanWg.Done()
}()
}
c.txn.commitTS = c.commitTS
if binlogSkipped {
binloginfo.RemoveOneSkippedCommitter()
} else {
if err != nil {
c.writeFinishBinlog(ctx, binlog.BinlogType_Rollback, 0)
} else {
c.writeFinishBinlog(ctx, binlog.BinlogType_Commit, int64(c.commitTS))
}
}
}()

binlogChan := c.prewriteBinlog(ctx)
prewriteBo := NewBackoffer(ctx, PrewriteMaxBackoff).WithVars(c.txn.vars)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if panic happened after prewriteBinlog before <-binlogChan will miss RemoveOneSkippedCommitter

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, can not avoid it, I've added a reset operation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if there is in-flight txn, calling reset will make SkippedCommitterCounter < 0 finally?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, reset is used for troubleshooting, if sth wrong with it, e.g. counter != 0 forever after resetting, we could reset it once again. I think we could not avoid it to happen, but hopefully, it will never be used.

start := time.Now()
err := c.prewriteKeys(prewriteBo, c.keys)
err = c.prewriteKeys(prewriteBo, c.keys)
commitDetail := c.getDetail()
commitDetail.PrewriteTime = time.Since(start)
if prewriteBo.totalSleep > 0 {
Expand All @@ -1072,9 +1072,13 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error {
commitDetail.Mu.Unlock()
}
if binlogChan != nil {
binlogErr := <-binlogChan
if binlogErr != nil {
return errors.Trace(binlogErr)
binlogWriteResult := <-binlogChan
if binlogWriteResult != nil {
binlogSkipped = binlogWriteResult.Skipped()
binlogErr := binlogWriteResult.GetError()
if binlogErr != nil {
return binlogErr
}
}
}
if err != nil {
Expand Down Expand Up @@ -1161,11 +1165,11 @@ func (c *twoPhaseCommitter) checkSchemaValid() error {
return nil
}

func (c *twoPhaseCommitter) prewriteBinlog(ctx context.Context) chan error {
func (c *twoPhaseCommitter) prewriteBinlog(ctx context.Context) chan *binloginfo.WriteResult {
if !c.shouldWriteBinlog() {
return nil
}
ch := make(chan error, 1)
ch := make(chan *binloginfo.WriteResult, 1)
go func() {
logutil.Eventf(ctx, "start prewrite binlog")
binInfo := c.txn.us.GetOption(kv.BinlogInfo).(*binloginfo.BinlogInfo)
Expand All @@ -1174,9 +1178,13 @@ func (c *twoPhaseCommitter) prewriteBinlog(ctx context.Context) chan error {
if bin.Tp == binlog.BinlogType_Prewrite {
bin.PrewriteKey = c.keys[0]
}
err := binInfo.WriteBinlog(c.store.clusterID)
wr := binInfo.WriteBinlog(c.store.clusterID)
if wr.Skipped() {
binInfo.Data.PrewriteValue = nil
binloginfo.AddOneSkippedCommitter()
}
logutil.Eventf(ctx, "finish prewrite binlog")
ch <- errors.Trace(err)
ch <- wr
}()
return ch
}
Expand All @@ -1191,7 +1199,8 @@ func (c *twoPhaseCommitter) writeFinishBinlog(ctx context.Context, tp binlog.Bin
binInfo.Data.PrewriteValue = nil
go func() {
logutil.Eventf(ctx, "start write finish binlog")
err := binInfo.WriteBinlog(c.store.clusterID)
binlogWriteResult := binInfo.WriteBinlog(c.store.clusterID)
err := binlogWriteResult.GetError()
if err != nil {
logutil.BgLogger().Error("failed to write binlog",
zap.Error(err))
Expand Down
4 changes: 2 additions & 2 deletions store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func (txn *tikvTxn) Commit(ctx context.Context) error {
// latches disabled
// pessimistic transaction should also bypass latch.
if txn.store.txnLatches == nil || txn.IsPessimistic() {
err = committer.executeAndWriteFinishBinlog(ctx)
err = committer.execute(ctx)
logutil.Logger(ctx).Debug("[kv] txnLatches disabled, 2pc directly", zap.Error(err))
return errors.Trace(err)
}
Expand All @@ -343,7 +343,7 @@ func (txn *tikvTxn) Commit(ctx context.Context) error {
if lock.IsStale() {
return kv.ErrWriteConflictInTiDB.FastGenByArgs(txn.startTS)
}
err = committer.executeAndWriteFinishBinlog(ctx)
err = committer.execute(ctx)
if err == nil {
lock.SetCommitTS(committer.commitTS)
}
Expand Down