diff --git a/ddl/ddl.go b/ddl/ddl.go index f6c0a20d97fa2..1301c8166a067 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -1091,6 +1091,10 @@ func (d *ddl) DoDDLJob(ctx sessionctx.Context, job *model.Job) error { // If the connection being killed, we need to CANCEL the DDL job. if atomic.LoadUint32(&sessVars.Killed) == 1 { + if atomic.LoadInt32(&sessVars.ConnectionStatus) == variable.ConnStatusShutdown { + logutil.BgLogger().Info("[ddl] DoDDLJob will quit because context done") + return context.Canceled + } if sessVars.StmtCtx.DDLJobID != 0 { se, err := d.sessPool.Get() if err != nil { diff --git a/server/conn.go b/server/conn.go index f6e93057dbeb4..62115649fe642 100644 --- a/server/conn.go +++ b/server/conn.go @@ -102,8 +102,8 @@ import ( const ( connStatusDispatching int32 = iota connStatusReading - connStatusShutdown // Closed by server. - connStatusWaitShutdown // Notified by server to close. + connStatusShutdown = variable.ConnStatusShutdown // Closed by server. + connStatusWaitShutdown = 3 // Notified by server to close. ) // newClientConn creates a *clientConn object. @@ -188,6 +188,21 @@ func (cc *clientConn) String() string { ) } +func (cc *clientConn) setStatus(status int32) { + atomic.StoreInt32(&cc.status, status) + if ctx := cc.getCtx(); ctx != nil { + atomic.StoreInt32(&ctx.GetSessionVars().ConnectionStatus, status) + } +} + +func (cc *clientConn) getStatus() int32 { + return atomic.LoadInt32(&cc.status) +} + +func (cc *clientConn) CompareAndSwapStatus(oldStatus, newStatus int32) bool { + return atomic.CompareAndSwapInt32(&cc.status, oldStatus, newStatus) +} + // authSwitchRequest is used by the server to ask the client to switch to a different authentication // plugin. MySQL 8.0 libmysqlclient based clients by default always try `caching_sha2_password`, even // when the server advertises the its default to be `mysql_native_password`. In addition to this switching @@ -1079,7 +1094,7 @@ func (cc *clientConn) Run(ctx context.Context) { terror.Log(err) metrics.PanicCounter.WithLabelValues(metrics.LabelSession).Inc() } - if atomic.LoadInt32(&cc.status) != connStatusShutdown { + if cc.getStatus() != connStatusShutdown { err := cc.Close() terror.Log(err) } @@ -1102,10 +1117,10 @@ func (cc *clientConn) Run(ctx context.Context) { } } - if !atomic.CompareAndSwapInt32(&cc.status, connStatusDispatching, connStatusReading) || + if !cc.CompareAndSwapStatus(connStatusDispatching, connStatusReading) || // The judge below will not be hit by all means, // But keep it stayed as a reminder and for the code reference for connStatusWaitShutdown. - atomic.LoadInt32(&cc.status) == connStatusWaitShutdown { + cc.getStatus() == connStatusWaitShutdown { return } @@ -1119,7 +1134,7 @@ func (cc *clientConn) Run(ctx context.Context) { if err != nil { if terror.ErrorNotEqual(err, io.EOF) { if netErr, isNetErr := errors.Cause(err).(net.Error); isNetErr && netErr.Timeout() { - if atomic.LoadInt32(&cc.status) == connStatusWaitShutdown { + if cc.getStatus() == connStatusWaitShutdown { logutil.Logger(ctx).Info("read packet timeout because of killed connection") } else { idleTime := time.Since(start) @@ -1146,7 +1161,7 @@ func (cc *clientConn) Run(ctx context.Context) { return } - if !atomic.CompareAndSwapInt32(&cc.status, connStatusReading, connStatusDispatching) { + if !cc.CompareAndSwapStatus(connStatusReading, connStatusDispatching) { return } @@ -2120,7 +2135,7 @@ func (cc *clientConn) handleStmt(ctx context.Context, stmt ast.StmtNode, warns [ } if rs != nil { - if connStatus := atomic.LoadInt32(&cc.status); connStatus == connStatusShutdown { + if cc.getStatus() == connStatusShutdown { return false, exeerrors.ErrQueryInterrupted } if retryable, err := cc.writeResultSet(ctx, rs, false, status, 0); err != nil { diff --git a/server/server.go b/server/server.go index 4ad743335bc4a..c5ec178d92862 100644 --- a/server/server.go +++ b/server/server.go @@ -825,7 +825,7 @@ func (s *Server) Kill(connectionID uint64, query bool) { if !query { // Mark the client connection status as WaitShutdown, when clientConn.Run detect // this, it will end the dispatch loop and exit. - atomic.StoreInt32(&conn.status, connStatusWaitShutdown) + conn.setStatus(connStatusWaitShutdown) } killQuery(conn) } @@ -875,7 +875,7 @@ func (s *Server) KillAllConnections() { s.rwlock.RLock() defer s.rwlock.RUnlock() for _, conn := range s.clients { - atomic.StoreInt32(&conn.status, connStatusShutdown) + conn.setStatus(connStatusShutdown) if err := conn.closeWithoutLock(); err != nil { terror.Log(err) } diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 92df02193667e..7dd080adc8524 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -75,6 +75,10 @@ var ( enableAdaptiveReplicaRead uint32 = 1 ) +// ConnStatusShutdown indicates that the connection status is closed by server. +// This code is put here because of package imports, and this value is the original server.connStatusShutdown. +const ConnStatusShutdown int32 = 2 + // SetEnableAdaptiveReplicaRead set `enableAdaptiveReplicaRead` with given value. // return true if the value is changed. func SetEnableAdaptiveReplicaRead(enabled bool) bool { @@ -1052,6 +1056,9 @@ type SessionVars struct { // Killed is a flag to indicate that this query is killed. Killed uint32 + // ConnectionStatus indicates current connection status. + ConnectionStatus int32 + // ConnectionInfo indicates current connection info used by current session. ConnectionInfo *ConnectionInfo