Skip to content

Commit

Permalink
*: Make DDL unaffected by killing TiDB instance (#43871)
Browse files Browse the repository at this point in the history
  • Loading branch information
zimulala committed May 17, 2023
1 parent a1f0097 commit 8ae3cac
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 10 deletions.
4 changes: 4 additions & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1091,6 +1091,10 @@ func (d *ddl) DoDDLJob(ctx sessionctx.Context, job *model.Job) error {

// If the connection being killed, we need to CANCEL the DDL job.
if atomic.LoadUint32(&sessVars.Killed) == 1 {
if atomic.LoadInt32(&sessVars.ConnectionStatus) == variable.ConnStatusShutdown {
logutil.BgLogger().Info("[ddl] DoDDLJob will quit because context done")
return context.Canceled
}
if sessVars.StmtCtx.DDLJobID != 0 {
se, err := d.sessPool.Get()
if err != nil {
Expand Down
31 changes: 23 additions & 8 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ import (
const (
connStatusDispatching int32 = iota
connStatusReading
connStatusShutdown // Closed by server.
connStatusWaitShutdown // Notified by server to close.
connStatusShutdown = variable.ConnStatusShutdown // Closed by server.
connStatusWaitShutdown = 3 // Notified by server to close.
)

// newClientConn creates a *clientConn object.
Expand Down Expand Up @@ -188,6 +188,21 @@ func (cc *clientConn) String() string {
)
}

func (cc *clientConn) setStatus(status int32) {
atomic.StoreInt32(&cc.status, status)
if ctx := cc.getCtx(); ctx != nil {
atomic.StoreInt32(&ctx.GetSessionVars().ConnectionStatus, status)
}
}

func (cc *clientConn) getStatus() int32 {
return atomic.LoadInt32(&cc.status)
}

func (cc *clientConn) CompareAndSwapStatus(oldStatus, newStatus int32) bool {
return atomic.CompareAndSwapInt32(&cc.status, oldStatus, newStatus)
}

// authSwitchRequest is used by the server to ask the client to switch to a different authentication
// plugin. MySQL 8.0 libmysqlclient based clients by default always try `caching_sha2_password`, even
// when the server advertises the its default to be `mysql_native_password`. In addition to this switching
Expand Down Expand Up @@ -1079,7 +1094,7 @@ func (cc *clientConn) Run(ctx context.Context) {
terror.Log(err)
metrics.PanicCounter.WithLabelValues(metrics.LabelSession).Inc()
}
if atomic.LoadInt32(&cc.status) != connStatusShutdown {
if cc.getStatus() != connStatusShutdown {
err := cc.Close()
terror.Log(err)
}
Expand All @@ -1102,10 +1117,10 @@ func (cc *clientConn) Run(ctx context.Context) {
}
}

if !atomic.CompareAndSwapInt32(&cc.status, connStatusDispatching, connStatusReading) ||
if !cc.CompareAndSwapStatus(connStatusDispatching, connStatusReading) ||
// The judge below will not be hit by all means,
// But keep it stayed as a reminder and for the code reference for connStatusWaitShutdown.
atomic.LoadInt32(&cc.status) == connStatusWaitShutdown {
cc.getStatus() == connStatusWaitShutdown {
return
}

Expand All @@ -1119,7 +1134,7 @@ func (cc *clientConn) Run(ctx context.Context) {
if err != nil {
if terror.ErrorNotEqual(err, io.EOF) {
if netErr, isNetErr := errors.Cause(err).(net.Error); isNetErr && netErr.Timeout() {
if atomic.LoadInt32(&cc.status) == connStatusWaitShutdown {
if cc.getStatus() == connStatusWaitShutdown {
logutil.Logger(ctx).Info("read packet timeout because of killed connection")
} else {
idleTime := time.Since(start)
Expand All @@ -1146,7 +1161,7 @@ func (cc *clientConn) Run(ctx context.Context) {
return
}

if !atomic.CompareAndSwapInt32(&cc.status, connStatusReading, connStatusDispatching) {
if !cc.CompareAndSwapStatus(connStatusReading, connStatusDispatching) {
return
}

Expand Down Expand Up @@ -2120,7 +2135,7 @@ func (cc *clientConn) handleStmt(ctx context.Context, stmt ast.StmtNode, warns [
}

if rs != nil {
if connStatus := atomic.LoadInt32(&cc.status); connStatus == connStatusShutdown {
if cc.getStatus() == connStatusShutdown {
return false, exeerrors.ErrQueryInterrupted
}
if retryable, err := cc.writeResultSet(ctx, rs, false, status, 0); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,7 @@ func (s *Server) Kill(connectionID uint64, query bool) {
if !query {
// Mark the client connection status as WaitShutdown, when clientConn.Run detect
// this, it will end the dispatch loop and exit.
atomic.StoreInt32(&conn.status, connStatusWaitShutdown)
conn.setStatus(connStatusWaitShutdown)
}
killQuery(conn)
}
Expand Down Expand Up @@ -875,7 +875,7 @@ func (s *Server) KillAllConnections() {
s.rwlock.RLock()
defer s.rwlock.RUnlock()
for _, conn := range s.clients {
atomic.StoreInt32(&conn.status, connStatusShutdown)
conn.setStatus(connStatusShutdown)
if err := conn.closeWithoutLock(); err != nil {
terror.Log(err)
}
Expand Down
7 changes: 7 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ var (
enableAdaptiveReplicaRead uint32 = 1
)

// ConnStatusShutdown indicates that the connection status is closed by server.
// This code is put here because of package imports, and this value is the original server.connStatusShutdown.
const ConnStatusShutdown int32 = 2

// SetEnableAdaptiveReplicaRead set `enableAdaptiveReplicaRead` with given value.
// return true if the value is changed.
func SetEnableAdaptiveReplicaRead(enabled bool) bool {
Expand Down Expand Up @@ -1052,6 +1056,9 @@ type SessionVars struct {
// Killed is a flag to indicate that this query is killed.
Killed uint32

// ConnectionStatus indicates current connection status.
ConnectionStatus int32

// ConnectionInfo indicates current connection info used by current session.
ConnectionInfo *ConnectionInfo

Expand Down

0 comments on commit 8ae3cac

Please sign in to comment.