Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Make DDL unaffected by killing TiDB instance #43871

Merged
merged 2 commits into from
May 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1091,6 +1091,10 @@ func (d *ddl) DoDDLJob(ctx sessionctx.Context, job *model.Job) error {

// If the connection being killed, we need to CANCEL the DDL job.
if atomic.LoadUint32(&sessVars.Killed) == 1 {
if atomic.LoadInt32(&sessVars.ConnectionStatus) == variable.ConnStatusShutdown {
logutil.BgLogger().Info("[ddl] DoDDLJob will quit because context done")
return context.Canceled
}
if sessVars.StmtCtx.DDLJobID != 0 {
se, err := d.sessPool.Get()
if err != nil {
Expand Down
31 changes: 23 additions & 8 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ import (
const (
connStatusDispatching int32 = iota
connStatusReading
connStatusShutdown // Closed by server.
connStatusWaitShutdown // Notified by server to close.
connStatusShutdown = variable.ConnStatusShutdown // Closed by server.
connStatusWaitShutdown = 3 // Notified by server to close.
)

// newClientConn creates a *clientConn object.
Expand Down Expand Up @@ -188,6 +188,21 @@ func (cc *clientConn) String() string {
)
}

func (cc *clientConn) setStatus(status int32) {
atomic.StoreInt32(&cc.status, status)
if ctx := cc.getCtx(); ctx != nil {
atomic.StoreInt32(&ctx.GetSessionVars().ConnectionStatus, status)
}
}

func (cc *clientConn) getStatus() int32 {
return atomic.LoadInt32(&cc.status)
}

func (cc *clientConn) CompareAndSwapStatus(oldStatus, newStatus int32) bool {
return atomic.CompareAndSwapInt32(&cc.status, oldStatus, newStatus)
}

// authSwitchRequest is used by the server to ask the client to switch to a different authentication
// plugin. MySQL 8.0 libmysqlclient based clients by default always try `caching_sha2_password`, even
// when the server advertises the its default to be `mysql_native_password`. In addition to this switching
Expand Down Expand Up @@ -1079,7 +1094,7 @@ func (cc *clientConn) Run(ctx context.Context) {
terror.Log(err)
metrics.PanicCounter.WithLabelValues(metrics.LabelSession).Inc()
}
if atomic.LoadInt32(&cc.status) != connStatusShutdown {
if cc.getStatus() != connStatusShutdown {
err := cc.Close()
terror.Log(err)
}
Expand All @@ -1102,10 +1117,10 @@ func (cc *clientConn) Run(ctx context.Context) {
}
}

if !atomic.CompareAndSwapInt32(&cc.status, connStatusDispatching, connStatusReading) ||
if !cc.CompareAndSwapStatus(connStatusDispatching, connStatusReading) ||
// The judge below will not be hit by all means,
// But keep it stayed as a reminder and for the code reference for connStatusWaitShutdown.
atomic.LoadInt32(&cc.status) == connStatusWaitShutdown {
cc.getStatus() == connStatusWaitShutdown {
return
}

Expand All @@ -1119,7 +1134,7 @@ func (cc *clientConn) Run(ctx context.Context) {
if err != nil {
if terror.ErrorNotEqual(err, io.EOF) {
if netErr, isNetErr := errors.Cause(err).(net.Error); isNetErr && netErr.Timeout() {
if atomic.LoadInt32(&cc.status) == connStatusWaitShutdown {
if cc.getStatus() == connStatusWaitShutdown {
logutil.Logger(ctx).Info("read packet timeout because of killed connection")
} else {
idleTime := time.Since(start)
Expand All @@ -1146,7 +1161,7 @@ func (cc *clientConn) Run(ctx context.Context) {
return
}

if !atomic.CompareAndSwapInt32(&cc.status, connStatusReading, connStatusDispatching) {
if !cc.CompareAndSwapStatus(connStatusReading, connStatusDispatching) {
return
}

Expand Down Expand Up @@ -2120,7 +2135,7 @@ func (cc *clientConn) handleStmt(ctx context.Context, stmt ast.StmtNode, warns [
}

if rs != nil {
if connStatus := atomic.LoadInt32(&cc.status); connStatus == connStatusShutdown {
if cc.getStatus() == connStatusShutdown {
return false, exeerrors.ErrQueryInterrupted
}
if retryable, err := cc.writeResultSet(ctx, rs, false, status, 0); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,7 @@ func (s *Server) Kill(connectionID uint64, query bool) {
if !query {
// Mark the client connection status as WaitShutdown, when clientConn.Run detect
// this, it will end the dispatch loop and exit.
atomic.StoreInt32(&conn.status, connStatusWaitShutdown)
conn.setStatus(connStatusWaitShutdown)
}
killQuery(conn)
}
Expand Down Expand Up @@ -875,7 +875,7 @@ func (s *Server) KillAllConnections() {
s.rwlock.RLock()
defer s.rwlock.RUnlock()
for _, conn := range s.clients {
atomic.StoreInt32(&conn.status, connStatusShutdown)
conn.setStatus(connStatusShutdown)
if err := conn.closeWithoutLock(); err != nil {
terror.Log(err)
}
Expand Down
7 changes: 7 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ var (
enableAdaptiveReplicaRead uint32 = 1
)

// ConnStatusShutdown indicates that the connection status is closed by server.
// This code is put here because of package imports, and this value is the original server.connStatusShutdown.
const ConnStatusShutdown int32 = 2
zimulala marked this conversation as resolved.
Show resolved Hide resolved

// SetEnableAdaptiveReplicaRead set `enableAdaptiveReplicaRead` with given value.
// return true if the value is changed.
func SetEnableAdaptiveReplicaRead(enabled bool) bool {
Expand Down Expand Up @@ -1052,6 +1056,9 @@ type SessionVars struct {
// Killed is a flag to indicate that this query is killed.
Killed uint32

// ConnectionStatus indicates current connection status.
ConnectionStatus int32

// ConnectionInfo indicates current connection info used by current session.
ConnectionInfo *ConnectionInfo

Expand Down