Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: getty session auto close bug #130

Merged
merged 1 commit into from
Jul 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion pkg/common/log/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,15 @@ var (
log Logger
zapLogger *zap.Logger

zapLoggerConfig = zap.NewDevelopmentConfig()
zapLoggerConfig = zap.Config{
// todo read level from config
Level: zap.NewAtomicLevelAt(zap.InfoLevel),
Development: true,
Encoding: "console",
EncoderConfig: zap.NewDevelopmentEncoderConfig(),
OutputPaths: []string{"stderr"},
ErrorOutputPaths: []string{"stderr"},
luky116 marked this conversation as resolved.
Show resolved Hide resolved
}
zapLoggerEncoderConfig = zapcore.EncoderConfig{
TimeKey: "time",
LevelKey: "level",
Expand Down
6 changes: 3 additions & 3 deletions pkg/config/getty_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ type GettyConfig struct {
// GetDefaultGettyConfig ...
func GetDefaultGettyConfig() GettyConfig {
return GettyConfig{
ReconnectInterval: 1,
ConnectionNum: 20,
ReconnectInterval: 0,
ConnectionNum: 1,
HeartbeatPeriod: 10 * time.Second,
GettySessionParam: GettySessionParam{
CompressEncoding: false,
Expand All @@ -51,7 +51,7 @@ func GetDefaultGettyConfig() GettyConfig {
TCPReadTimeout: time.Second,
TCPWriteTimeout: 5 * time.Second,
WaitTimeout: time.Second,
CronPeriod: 5 * time.Second,
CronPeriod: time.Second,
MaxMsgLen: 4096,
SessionName: "rpc_client",
},
Expand Down
8 changes: 4 additions & 4 deletions pkg/protocol/codec/branch_commit_response_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (g *BranchCommitResponseCodec) Decode(in []byte) interface{} {

data.ResultCode = message.ResultCode(bytes.ReadByte(buf))
if data.ResultCode == message.ResultCodeFailed {
data.Msg = bytes.ReadString16Length(buf)
data.Msg = bytes.ReadString8Length(buf)
}
data.TransactionExceptionCode = serror.TransactionExceptionCode(bytes.ReadByte(buf))
data.Xid = bytes.ReadString16Length(buf)
Expand All @@ -56,10 +56,10 @@ func (g *BranchCommitResponseCodec) Encode(in interface{}) []byte {
buf.WriteByte(byte(data.ResultCode))
if data.ResultCode == message.ResultCodeFailed {
msg := data.Msg
if len(data.Msg) > math.MaxInt16 {
msg = data.Msg[:math.MaxInt16]
if len(data.Msg) > math.MaxInt8 {
msg = data.Msg[:math.MaxInt8]
}
bytes.WriteString16Length(msg, buf)
bytes.WriteString8Length(msg, buf)
}
buf.WriteByte(byte(data.TransactionExceptionCode))
bytes.WriteString16Length(data.Xid, buf)
Expand Down
4 changes: 2 additions & 2 deletions pkg/protocol/codec/branch_rollback_response_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (g *BranchRollbackResponseCodec) Decode(in []byte) interface{} {

data.ResultCode = message.ResultCode(bytes.ReadByte(buf))
if data.ResultCode == message.ResultCodeFailed {
data.Msg = bytes.ReadString16Length(buf)
data.Msg = bytes.ReadString8Length(buf)
}
data.TransactionExceptionCode = serror.TransactionExceptionCode(bytes.ReadByte(buf))
data.Xid = bytes.ReadString16Length(buf)
Expand All @@ -59,7 +59,7 @@ func (g *BranchRollbackResponseCodec) Encode(in interface{}) []byte {
if len(data.Msg) > math.MaxInt16 {
msg = data.Msg[:math.MaxInt16]
}
bytes.WriteString16Length(msg, buf)
bytes.WriteString8Length(msg, buf)
}
buf.WriteByte(byte(data.TransactionExceptionCode))
bytes.WriteString16Length(data.Xid, buf)
Expand Down
35 changes: 21 additions & 14 deletions pkg/remoting/getty/getty_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ package getty

import (
"sync"
"time"

gxtime "github.com/dubbogo/gost/time"
"github.com/pkg/errors"
"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/protocol/codec"
"github.com/seata/seata-go/pkg/protocol/message"
"go.uber.org/atomic"
Expand Down Expand Up @@ -60,18 +62,18 @@ func (client *GettyRemotingClient) SendAsyncRequest(msg interface{}) error {
Compressor: 0,
Body: msg,
}
return GetGettyRemotingInstance().SendASync(rpcMessage)
return GetGettyRemotingInstance().SendASync(rpcMessage, nil, client.asyncCallback)
}

func (client *GettyRemotingClient) SendAsyncResponse(msg interface{}) error {
func (client *GettyRemotingClient) SendAsyncResponse(msgID int32, msg interface{}) error {
rpcMessage := message.RpcMessage{
ID: int32(client.idGenerator.Inc()),
ID: msgID,
Type: message.GettyRequestType_Response,
Codec: byte(codec.CodecTypeSeata),
Compressor: 0,
Body: msg,
}
return GetGettyRemotingInstance().SendASync(rpcMessage)
return GetGettyRemotingInstance().SendASync(rpcMessage, nil, nil)
}

func (client *GettyRemotingClient) SendSyncRequest(msg interface{}) (interface{}, error) {
Expand All @@ -82,16 +84,21 @@ func (client *GettyRemotingClient) SendSyncRequest(msg interface{}) (interface{}
Compressor: 0,
Body: msg,
}
return GetGettyRemotingInstance().SendSync(rpcMessage)
return GetGettyRemotingInstance().SendSync(rpcMessage, nil, client.syncCallback)
}

func (client *GettyRemotingClient) SendSyncRequestWithTimeout(msg interface{}, timeout time.Duration) (interface{}, error) {
rpcMessage := message.RpcMessage{
ID: int32(client.idGenerator.Inc()),
Type: message.GettyRequestType_RequestSync,
Codec: byte(codec.CodecTypeSeata),
Compressor: 0,
Body: msg,
func (g *GettyRemotingClient) asyncCallback(reqMsg message.RpcMessage, respMsg *message.MessageFuture) (interface{}, error) {
go g.asyncCallback(reqMsg, respMsg)
return nil, nil
}

func (g *GettyRemotingClient) syncCallback(reqMsg message.RpcMessage, respMsg *message.MessageFuture) (interface{}, error) {
select {
case <-gxtime.GetDefaultTimerWheel().After(RPC_REQUEST_TIMEOUT):
GetGettyRemotingInstance().RemoveMergedMessageFuture(reqMsg.ID)
log.Errorf("wait resp timeout: %#v", reqMsg)
return nil, errors.Errorf("wait response timeout, request: %#v", reqMsg)
case <-respMsg.Done:
return respMsg.Response, respMsg.Err
}
return GetGettyRemotingInstance().SendSyncWithTimeout(rpcMessage, timeout)
}
80 changes: 29 additions & 51 deletions pkg/remoting/getty/getty_remoting.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,26 @@ import (

getty "github.com/apache/dubbo-getty"

gxtime "github.com/dubbogo/gost/time"
"github.com/pkg/errors"
"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/protocol/message"
)

const (
RPC_REQUEST_TIMEOUT = 5 * time.Second
RPC_REQUEST_TIMEOUT = 2 * time.Second
)

var (
gettyRemoting *GettyRemoting
onceGettyRemoting = &sync.Once{}
)

type GettyRemoting struct {
futures *sync.Map
mergeMsgMap *sync.Map
}
type (
callbackMethod func(reqMsg message.RpcMessage, respMsg *message.MessageFuture) (interface{}, error)
GettyRemoting struct {
futures *sync.Map
mergeMsgMap *sync.Map
}
)

func GetGettyRemotingInstance() *GettyRemoting {
if gettyRemoting == nil {
Expand All @@ -55,67 +56,44 @@ func GetGettyRemotingInstance() *GettyRemoting {
return gettyRemoting
}

func (client *GettyRemoting) SendSync(msg message.RpcMessage) (interface{}, error) {
ss := sessionManager.AcquireGettySession()
return client.sendAsync(ss, msg, RPC_REQUEST_TIMEOUT)
}

func (client *GettyRemoting) SendSyncWithTimeout(msg message.RpcMessage, timeout time.Duration) (interface{}, error) {
ss := sessionManager.AcquireGettySession()
return client.sendAsync(ss, msg, timeout)
func (g *GettyRemoting) SendSync(msg message.RpcMessage, s getty.Session, callback callbackMethod) (interface{}, error) {
if s == nil {
s = sessionManager.selectSession()
}
return g.sendAsync(s, msg, callback)
}

func (client *GettyRemoting) SendASync(msg message.RpcMessage) error {
ss := sessionManager.AcquireGettySession()
_, err := client.sendAsync(ss, msg, 0*time.Second)
func (g *GettyRemoting) SendASync(msg message.RpcMessage, s getty.Session, callback callbackMethod) error {
if s == nil {
s = sessionManager.selectSession()
}
_, err := g.sendAsync(s, msg, callback)
return err
}

func (client *GettyRemoting) sendAsync(session getty.Session, msg message.RpcMessage, timeout time.Duration) (interface{}, error) {
log.Infof("send async message: {%#v}", msg)
func (g *GettyRemoting) sendAsync(session getty.Session, msg message.RpcMessage, callback callbackMethod) (interface{}, error) {
if _, ok := msg.Body.(message.HeartBeatMessage); ok {
log.Debug("send async message: {%#v}", msg)
} else {
log.Infof("send async message: {%#v}", msg)
}
var err error
if session == nil || session.IsClosed() {
log.Warn("sendAsyncRequestWithResponse nothing, caused by null channel.")
return nil, err
}
resp := message.NewMessageFuture(msg)
client.futures.Store(msg.ID, resp)
g.futures.Store(msg.ID, resp)
_, _, err = session.WritePkg(msg, time.Duration(0))
if err != nil {
client.futures.Delete(msg.ID)
g.futures.Delete(msg.ID)
log.Errorf("send message: %#v, session: %s", msg, session.Stat())
return nil, err
}

log.Debugf("send message: %#v, session: %s", msg, session.Stat())

actualTimeOut := timeout
if timeout <= time.Duration(0) {
// todo timeoue use config
actualTimeOut = time.Duration(2000)
}

wait := func() (interface{}, error) {
select {
case <-gxtime.GetDefaultTimerWheel().After(actualTimeOut):
client.futures.Delete(msg.ID)
if session != nil {
return nil, errors.Errorf("wait response timeout, ip: %s, request: %#v", session.RemoteAddr(), msg)
} else {
return nil, errors.Errorf("wait response timeout and session is nil, request: %#v", msg)
}
case <-resp.Done:
err = resp.Err
return resp.Response, err
}
}

if timeout > time.Duration(0) {
return wait()
} else {
go wait()
if callback != nil {
return callback(msg, resp)
}
return nil, err
return nil, nil
}

func (client *GettyRemoting) GetMessageFuture(msgID int32) *message.MessageFuture {
Expand Down
66 changes: 41 additions & 25 deletions pkg/remoting/getty/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
getty "github.com/apache/dubbo-getty"
"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/config"
"github.com/seata/seata-go/pkg/protocol/codec"
"github.com/seata/seata-go/pkg/protocol/message"
"github.com/seata/seata-go/pkg/remoting/processor"
"go.uber.org/atomic"
Expand All @@ -39,7 +40,8 @@ type gettyClientHandler struct {
idGenerator *atomic.Uint32
msgFutures *sync.Map
mergeMsgMap *sync.Map
processorTable map[message.MessageType]processor.RemotingProcessor
sessionManager *SessionManager
processorMap map[message.MessageType]processor.RemotingProcessor
}

func GetGettyClientHandlerInstance() *gettyClientHandler {
Expand All @@ -50,70 +52,84 @@ func GetGettyClientHandlerInstance() *gettyClientHandler {
idGenerator: &atomic.Uint32{},
msgFutures: &sync.Map{},
mergeMsgMap: &sync.Map{},
processorTable: make(map[message.MessageType]processor.RemotingProcessor, 0),
sessionManager: sessionManager,
processorMap: make(map[message.MessageType]processor.RemotingProcessor, 0),
}
})
}
return clientHandler
}

func (client *gettyClientHandler) OnOpen(session getty.Session) error {
sessionManager.RegisterGettySession(session)
func (g *gettyClientHandler) OnOpen(session getty.Session) error {
log.Infof("Open new getty session ")
g.sessionManager.registerSession(session)
go func() {
request := message.RegisterTMRequest{AbstractIdentifyRequest: message.AbstractIdentifyRequest{
Version: client.conf.SeataVersion,
ApplicationId: client.conf.ApplicationID,
TransactionServiceGroup: client.conf.TransactionServiceGroup,
Version: g.conf.SeataVersion,
ApplicationId: g.conf.ApplicationID,
TransactionServiceGroup: g.conf.TransactionServiceGroup,
}}
err := GetGettyRemotingClient().SendAsyncRequest(request)
//client.sendAsyncRequestWithResponse(session, request, RPC_REQUEST_TIMEOUT)
if err != nil {
log.Errorf("OnOpen error: {%#v}", err.Error())
sessionManager.ReleaseGettySession(session)
g.sessionManager.releaseSession(session)
return
}
//todo
//client.GettySessionOnOpenChannel <- session.RemoteAddr()
}()

return nil
}

func (client *gettyClientHandler) OnError(session getty.Session, err error) {
log.Infof("OnError session{%s} got error{%v}, will be closed.", session.Stat(), err)
sessionManager.ReleaseGettySession(session)
func (g *gettyClientHandler) OnError(session getty.Session, err error) {
log.Infof("session{%s} got error{%v}, will be closed.", session.Stat(), err)
g.sessionManager.releaseSession(session)
}

func (client *gettyClientHandler) OnClose(session getty.Session) {
log.Infof("OnClose session{%s} is closing......", session.Stat())
sessionManager.ReleaseGettySession(session)
func (g *gettyClientHandler) OnClose(session getty.Session) {
log.Infof("session{%s} is closing......", session.Stat())
g.sessionManager.releaseSession(session)
}

func (client *gettyClientHandler) OnMessage(session getty.Session, pkg interface{}) {
func (g *gettyClientHandler) OnMessage(session getty.Session, pkg interface{}) {
ctx := context.Background()
log.Debugf("received message: {%#v}", pkg)
log.Debug("received message: {%#v}", pkg)

rpcMessage, ok := pkg.(message.RpcMessage)
if !ok {
log.Errorf("received message is not protocol.RpcMessage. pkg: %#v", pkg)
return
}

if mm, ok := rpcMessage.Body.(message.MessageTypeAware); ok {
processor := client.processorTable[mm.GetTypeCode()]
processor := g.processorMap[mm.GetTypeCode()]
if processor != nil {
processor.Process(ctx, rpcMessage)
} else {
log.Errorf("This message type [%v] has no processor.", mm.GetTypeCode())
log.Errorf("This message type %v has no processor.", mm.GetTypeCode())
}
} else {
log.Errorf("This rpcMessage body %#v is not MessageTypeAware type.", rpcMessage.Body)
}
}

func (client *gettyClientHandler) OnCron(session getty.Session) {
//GetGettyRemotingClient().SendAsyncRequest(message.HeartBeatMessagePing)
func (g *gettyClientHandler) OnCron(session getty.Session) {
log.Debug("session{%s} Oncron executing", session.Stat())
g.transferBeatHeart(session, message.HeartBeatMessagePing)
}

func (g *gettyClientHandler) transferBeatHeart(session getty.Session, msg message.HeartBeatMessage) {
rpcMessage := message.RpcMessage{
ID: int32(g.idGenerator.Inc()),
Type: message.GettyRequestType_HeartbeatRequest,
Codec: byte(codec.CodecTypeSeata),
Compressor: 0,
Body: msg,
}
GetGettyRemotingInstance().SendASync(rpcMessage, session, nil)
}

func (client *gettyClientHandler) RegisterProcessor(msgType message.MessageType, processor processor.RemotingProcessor) {
func (g *gettyClientHandler) RegisterProcessor(msgType message.MessageType, processor processor.RemotingProcessor) {
if nil != processor {
client.processorTable[msgType] = processor
g.processorMap[msgType] = processor
}
}
Loading