Skip to content

Commit

Permalink
[bug] proxy: fix bug that may cause mo hang
Browse files Browse the repository at this point in the history
  • Loading branch information
volgariver6 committed Nov 13, 2024
1 parent 7fd55f2 commit 4bbfa2b
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 25 deletions.
4 changes: 0 additions & 4 deletions pkg/proxy/client_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,10 +511,6 @@ func (c *clientConn) connectToBackend(prevAdd string) (ServerConn, error) {
return nil, err
}
v2.ProxyConnectSuccessCounter.Inc()

// manage this connection in the manager.
c.tun.rebalancer.connManager.connect(sc.GetCNServer(), c.tun)

return sc, nil
}
}
Expand Down
14 changes: 10 additions & 4 deletions pkg/proxy/conn_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ const (
// EntryOperation contains the operations of the entry in the store.
// Locked is required before the operations are called.
type EntryOperation interface {
// doPush is the operator to push the connection entry into the store.
// push is the operator to push the connection entry into the store.
push(store *cacheStore, conn *serverConnAuth, postPush func())
// doPop is the operator to pop the connection entry from the store.
// pop is the operator to pop the connection entry from the store.
pop(store *cacheStore, postPop func()) *serverConnAuth
// doPeek is the operator to peek the connection entry in the store.
// peek is the operator to peek the connection entry in the store.
peek(store *cacheStore) *serverConnAuth
}

Expand Down Expand Up @@ -383,14 +383,20 @@ func (c *connCache) Pop(key cacheKey, connID uint32, salt []byte, authResp []byt
s: fmt.Sprintf(setConnectionIDSQL, connID),
}, nil)
if err != nil || !ok {
c.logger.Error("failed to set conn id",
zap.Uint32("conn ID", sc.ConnID()),
zap.Error(err),
)
// Failed to set connection ID, try to send quit command to the server.
if err := sc.Quit(); err != nil {
c.logger.Error("failed to send quit cmd to server",
zap.Uint32("conn ID", sc.ConnID()),
zap.Error(err),
)
// If send quit failed, pop the connection from the store.
connOperator[c.opStrategy].pop(c.mu.cache[key], postPop)
} else {
// If send quit successfully, pop the connection from the store.
// If send quit successfully, pop the connection from the store either.
connOperator[c.opStrategy].pop(c.mu.cache[key], postPop)
}
continue
Expand Down
12 changes: 6 additions & 6 deletions pkg/proxy/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,12 +205,12 @@ func (h *handler) handle(c goetty.IOSession) error {
}
h.logger.Debug("server conn created")
defer func() {
// This Close() function just disconnect from connManager,
// but do not close the real raw connection. The raw connection
// is closed if the server connection could not be pushed into
// the connection cache, which is in (*clientConn).handleQuitEvent()
// function.
_ = sc.Close()
// If the connCache is nil, means the connection cached feature is not
// enabled. Do close the server connection if the connection cache
// feature is not enabled.
if h.connCache == nil {
_ = sc.Close()
}
}()

h.logger.Info("build connection",
Expand Down
4 changes: 1 addition & 3 deletions pkg/proxy/server_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,7 @@ func (s *serverConn) CreateTime() time.Time {

func (s *serverConn) Quit() error {
defer func() {
// Disconnect from the connection manager, also, close the
// raw TCP connection.
_ = s.RawConn().Close()
_ = s.Close()
}()
_, err := s.ExecStmt(internalStmt{
cmdType: cmdQuit,
Expand Down
16 changes: 8 additions & 8 deletions pkg/proxy/tunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,14 +498,14 @@ func (t *tunnel) Close() error {
if cc != nil && !t.realConn {
_ = cc.Close()
}
if !t.connCacheEnabled && sc != nil {
_ = sc.Close()
}

// close the server connection
serverC := t.getServerConn()
if serverC != nil {
_ = serverC.Close()
if !t.connCacheEnabled {
// close the server connection
serverC := t.getServerConn()
if serverC != nil {
_ = serverC.Close()
} else if sc != nil {
_ = sc.Close()
}
}
})
return nil
Expand Down

0 comments on commit 4bbfa2b

Please sign in to comment.