Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backend: add quit source to ConnContext #236

Merged
merged 6 commits into from
Mar 6, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 41 additions & 12 deletions pkg/proxy/backend/backend_conn_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ type BackendConnManager struct {
handshakeHandler HandshakeHandler
ctxmap sync.Map
connectionID uint64
quitSource ErrorSource
}

// NewBackendConnManager creates a BackendConnManager.
Expand All @@ -151,6 +152,7 @@ func NewBackendConnManager(logger *zap.Logger, handshakeHandler HandshakeHandler
// There are 2 types of signals, which may be sent concurrently.
signalReceived: make(chan signalType, signalTypeNums),
redirectResCh: make(chan *redirectResult, 1),
quitSource: SrcClientConn,
}
return mgr
}
Expand All @@ -172,6 +174,13 @@ func (mgr *BackendConnManager) Connect(ctx context.Context, clientIO *pnet.Packe
err := mgr.authenticator.handshakeFirstTime(mgr.logger.Named("authenticator"), mgr, clientIO, mgr.handshakeHandler, mgr.getBackendIO, frontendTLSConfig, backendTLSConfig)
mgr.handshakeHandler.OnHandshake(mgr, mgr.ServerAddr(), err)
if err != nil {
if errors.Is(err, ErrBackendConn) {
mgr.quitSource = SrcBackendConn
} else if IsMySQLError(err) {
mgr.quitSource = SrcClientErr
} else if !errors.Is(err, ErrClientConn) {
mgr.quitSource = SrcProxyErr
}
WriteUserError(clientIO, err, mgr.logger)
return err
}
Expand Down Expand Up @@ -233,7 +242,7 @@ func (mgr *BackendConnManager) getBackendIO(cctx ConnContext, auth *Authenticato
// NOTE: should use DNS name as much as possible
// Usually certs are signed with domain instead of IP addrs
// And `RemoteAddr()` will return IP addr
backendIO := pnet.NewPacketIO(cn, pnet.WithRemoteAddr(addr, cn.RemoteAddr()))
backendIO := pnet.NewPacketIO(cn, pnet.WithRemoteAddr(addr, cn.RemoteAddr()), pnet.WithWrapError(ErrBackendConn))
mgr.backendIO.Store(backendIO)
mgr.setKeepAlive(mgr.config.HealthyKeepAlive)
return backendIO, nil
Expand Down Expand Up @@ -264,9 +273,19 @@ func (mgr *BackendConnManager) getBackendIO(cctx ConnContext, auth *Authenticato

// ExecuteCmd forwards messages between the client and the backend.
// If it finds that the session is ready for redirection, it migrates the session.
func (mgr *BackendConnManager) ExecuteCmd(ctx context.Context, request []byte) error {
func (mgr *BackendConnManager) ExecuteCmd(ctx context.Context, request []byte) (err error) {
defer func() {
if err != nil {
if errors.Is(err, ErrBackendConn) {
mgr.quitSource = SrcBackendConn
} else if !errors.Is(err, ErrClientConn) {
mgr.quitSource = SrcClientErr
}
djshow832 marked this conversation as resolved.
Show resolved Hide resolved
}
}()
if len(request) < 1 {
return mysql.ErrMalformPacket
err = mysql.ErrMalformPacket
return
}
cmd := request[0]
startTime := time.Now()
Expand All @@ -275,25 +294,26 @@ func (mgr *BackendConnManager) ExecuteCmd(ctx context.Context, request []byte) e

switch mgr.closeStatus.Load() {
case statusClosing, statusClosed:
return nil
return
}
defer mgr.resetCheckBackendTicker()
waitingRedirect := atomic.LoadPointer(&mgr.signal) != nil
holdRequest, err := mgr.cmdProcessor.executeCmd(request, mgr.clientIO, mgr.backendIO.Load(), waitingRedirect)
var holdRequest bool
holdRequest, err = mgr.cmdProcessor.executeCmd(request, mgr.clientIO, mgr.backendIO.Load(), waitingRedirect)
if !holdRequest {
addCmdMetrics(cmd, mgr.ServerAddr(), startTime)
}
if err != nil {
if !IsMySQLError(err) {
return err
return
} else {
mgr.logger.Debug("got a mysql error", zap.Error(err))
}
}
if err == nil {
switch cmd {
case mysql.ComQuit:
return nil
return
case mysql.ComSetOption:
val := binary.LittleEndian.Uint16(request[1:])
switch val {
Expand All @@ -304,12 +324,13 @@ func (mgr *BackendConnManager) ExecuteCmd(ctx context.Context, request []byte) e
mgr.authenticator.capability &^= mysql.ClientMultiStatements
mgr.cmdProcessor.capability &^= mysql.ClientMultiStatements
default:
return errors.Errorf("unrecognized set_option value:%d", val)
err = errors.Errorf("unrecognized set_option value:%d", val)
return
}
case mysql.ComChangeUser:
username, db := pnet.ParseChangeUser(request)
mgr.authenticator.changeUser(username, db)
return nil
return
}
}
// Even if it meets an MySQL error, it may have changed the status, such as when executing multi-statements.
Expand All @@ -320,7 +341,7 @@ func (mgr *BackendConnManager) ExecuteCmd(ctx context.Context, request []byte) e
_, err = mgr.cmdProcessor.executeCmd(request, mgr.clientIO, mgr.backendIO.Load(), false)
addCmdMetrics(cmd, mgr.ServerAddr(), startTime)
if err != nil && !IsMySQLError(err) {
return err
return
}
} else if mgr.closeStatus.Load() == statusNotifyClose {
mgr.tryGracefulClose(ctx)
Expand All @@ -329,7 +350,8 @@ func (mgr *BackendConnManager) ExecuteCmd(ctx context.Context, request []byte) e
}
}
// Ignore MySQL errors, only return unexpected errors.
return nil
err = nil
return
}

// SetEventReceiver implements RedirectableConn.SetEventReceiver interface.
Expand Down Expand Up @@ -428,6 +450,7 @@ func (mgr *BackendConnManager) tryRedirect(ctx context.Context) {
// If the backend connection is closed, also close the client connection.
// Otherwise, if the client is idle, the mgr will keep retrying.
if errors.Is(rs.err, net.ErrClosed) || pnet.IsDisconnectError(rs.err) || errors.Is(rs.err, os.ErrDeadlineExceeded) {
mgr.quitSource = SrcBackendConn
if ignoredErr := mgr.clientIO.GracefulClose(); ignoredErr != nil {
mgr.logger.Warn("graceful close client IO error", zap.Stringer("addr", mgr.clientIO.RemoteAddr()), zap.Error(ignoredErr))
}
Expand All @@ -444,7 +467,7 @@ func (mgr *BackendConnManager) tryRedirect(ctx context.Context) {
mgr.handshakeHandler.OnHandshake(mgr, rs.to, rs.err)
return
}
newBackendIO := pnet.NewPacketIO(cn, pnet.WithRemoteAddr(rs.to, cn.RemoteAddr()))
newBackendIO := pnet.NewPacketIO(cn, pnet.WithRemoteAddr(rs.to, cn.RemoteAddr()), pnet.WithWrapError(ErrBackendConn))

if rs.err = mgr.authenticator.handshakeSecondTime(mgr.logger, mgr.clientIO, newBackendIO, mgr.backendTLS, sessionToken); rs.err == nil {
rs.err = mgr.initSessionStates(newBackendIO, sessionStates)
Expand Down Expand Up @@ -538,6 +561,7 @@ func (mgr *BackendConnManager) tryGracefulClose(ctx context.Context) {
if !mgr.cmdProcessor.finishedTxn() {
return
}
mgr.quitSource = SrcProxyQuit
// Closing clientIO will cause the whole connection to be closed.
if err := mgr.clientIO.GracefulClose(); err != nil {
mgr.logger.Warn("graceful close client IO error", zap.Stringer("addr", mgr.clientIO.RemoteAddr()), zap.Error(err))
Expand All @@ -557,6 +581,7 @@ func (mgr *BackendConnManager) checkBackendActive() {
if !backendIO.IsPeerActive() {
mgr.logger.Info("backend connection is closed, close client connection", zap.Stringer("client", mgr.clientIO.RemoteAddr()),
zap.Stringer("backend", backendIO.RemoteAddr()))
mgr.quitSource = SrcBackendConn
if err := mgr.clientIO.GracefulClose(); err != nil {
mgr.logger.Warn("graceful close client IO error", zap.Stringer("addr", mgr.clientIO.RemoteAddr()), zap.Error(err))
}
Expand Down Expand Up @@ -602,6 +627,10 @@ func (mgr *BackendConnManager) ClientOutBytes() uint64 {
return mgr.clientIO.OutBytes()
}

func (mgr *BackendConnManager) QuitSource() ErrorSource {
return mgr.quitSource
}

func (mgr *BackendConnManager) SetValue(key, val any) {
mgr.ctxmap.Store(key, val)
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/proxy/backend/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ const (
capabilityErrMsg = "Verify TiDB capability failed, please upgrade TiDB"
)

var (
ErrClientConn = errors.New("this is an error from client")
ErrBackendConn = errors.New("this is an error from backend")
)

// UserError is returned to the client.
// err is used to log and userMsg is used to report to the user.
type UserError struct {
Expand Down
36 changes: 36 additions & 0 deletions pkg/proxy/backend/handshake_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,49 @@ const (
ConnContextKeyTLSState ConnContextKey = "tls-state"
)

type ErrorSource int

const (
// SrcClientConn includes: client quit; bad client conn
SrcClientConn ErrorSource = iota
djshow832 marked this conversation as resolved.
Show resolved Hide resolved
// SrcClientErr includes: wrong password; mal format packet
SrcClientErr
// SrcProxyQuit includes: proxy graceful shutdown
SrcProxyQuit
// SrcProxyErr includes: cannot get backend list; capability negotiation
SrcProxyErr
// SrcBackendConn includes: backend quit
SrcBackendConn
// SrcBackendErr is reserved
SrcBackendErr
)

func (es ErrorSource) String() string {
switch es {
case SrcClientConn:
return "client disconnect"
case SrcClientErr:
return "client error"
case SrcProxyQuit:
return "proxy shutdown"
case SrcProxyErr:
return "proxy error"
case SrcBackendConn:
return "backend disconnect"
case SrcBackendErr:
return "backend error"
}
return "unknown"
}

var _ HandshakeHandler = (*DefaultHandshakeHandler)(nil)

type ConnContext interface {
ClientAddr() string
ServerAddr() string
ClientInBytes() uint64
ClientOutBytes() uint64
QuitSource() ErrorSource
SetValue(key, val any)
Value(key any) any
}
Expand Down
18 changes: 6 additions & 12 deletions pkg/proxy/client/client_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ package client
import (
"context"
"crypto/tls"
"io"
"net"
"os"

"github.com/pingcap/TiProxy/lib/util/errors"
"github.com/pingcap/TiProxy/pkg/proxy/backend"
Expand All @@ -28,10 +26,6 @@ import (
"go.uber.org/zap"
)

var (
ErrClientConn = errors.New("this is an error from client")
)

type ClientConnection struct {
logger *zap.Logger
frontendTLSConfig *tls.Config // the TLS config to connect to clients.
Expand All @@ -44,7 +38,7 @@ func NewClientConnection(logger *zap.Logger, conn net.Conn, frontendTLSConfig *t
hsHandler backend.HandshakeHandler, connID uint64, bcConfig *backend.BCConfig) *ClientConnection {
bemgr := backend.NewBackendConnManager(logger.Named("be"), hsHandler, connID, bcConfig)
opts := make([]pnet.PacketIOption, 0, 2)
opts = append(opts, pnet.WithWrapError(ErrClientConn))
opts = append(opts, pnet.WithWrapError(backend.ErrClientConn))
if bcConfig.ProxyProtocol {
opts = append(opts, pnet.WithProxy)
}
Expand Down Expand Up @@ -72,12 +66,12 @@ func (cc *ClientConnection) Run(ctx context.Context) {
}

clean:
clientErr := errors.Is(err, ErrClientConn)
// EOF: client closes; DeadlineExceeded: graceful shutdown; Closed: shut down.
if clientErr && (errors.Is(err, io.EOF) || errors.Is(err, os.ErrDeadlineExceeded) || errors.Is(err, net.ErrClosed)) {
return
src := cc.connMgr.QuitSource()
switch src {
case backend.SrcClientConn, backend.SrcClientErr, backend.SrcProxyQuit:
default:
cc.logger.Info(msg, zap.Error(err), zap.Stringer("quit source", src))
}
cc.logger.Info(msg, zap.Error(err), zap.Bool("clientErr", clientErr), zap.Bool("serverErr", !clientErr))
}

func (cc *ClientConnection) processMsg(ctx context.Context) error {
Expand Down