Skip to content

Commit

Permalink
fix getty auto close bug (apache#130)
Browse files Browse the repository at this point in the history
  • Loading branch information
luky116 authored Jul 19, 2022
1 parent 5e2319e commit ed33bad
Show file tree
Hide file tree
Showing 12 changed files with 153 additions and 149 deletions.
10 changes: 9 additions & 1 deletion pkg/common/log/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,15 @@ var (
log Logger
zapLogger *zap.Logger

zapLoggerConfig = zap.NewDevelopmentConfig()
zapLoggerConfig = zap.Config{
// todo read level from config
Level: zap.NewAtomicLevelAt(zap.InfoLevel),
Development: true,
Encoding: "console",
EncoderConfig: zap.NewDevelopmentEncoderConfig(),
OutputPaths: []string{"stderr"},
ErrorOutputPaths: []string{"stderr"},
}
zapLoggerEncoderConfig = zapcore.EncoderConfig{
TimeKey: "time",
LevelKey: "level",
Expand Down
6 changes: 3 additions & 3 deletions pkg/config/getty_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ type GettyConfig struct {
// GetDefaultGettyConfig ...
func GetDefaultGettyConfig() GettyConfig {
return GettyConfig{
ReconnectInterval: 1,
ConnectionNum: 20,
ReconnectInterval: 0,
ConnectionNum: 1,
HeartbeatPeriod: 10 * time.Second,
GettySessionParam: GettySessionParam{
CompressEncoding: false,
Expand All @@ -51,7 +51,7 @@ func GetDefaultGettyConfig() GettyConfig {
TCPReadTimeout: time.Second,
TCPWriteTimeout: 5 * time.Second,
WaitTimeout: time.Second,
CronPeriod: 5 * time.Second,
CronPeriod: time.Second,
MaxMsgLen: 4096,
SessionName: "rpc_client",
},
Expand Down
8 changes: 4 additions & 4 deletions pkg/protocol/codec/branch_commit_response_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (g *BranchCommitResponseCodec) Decode(in []byte) interface{} {

data.ResultCode = message.ResultCode(bytes.ReadByte(buf))
if data.ResultCode == message.ResultCodeFailed {
data.Msg = bytes.ReadString16Length(buf)
data.Msg = bytes.ReadString8Length(buf)
}
data.TransactionExceptionCode = serror.TransactionExceptionCode(bytes.ReadByte(buf))
data.Xid = bytes.ReadString16Length(buf)
Expand All @@ -56,10 +56,10 @@ func (g *BranchCommitResponseCodec) Encode(in interface{}) []byte {
buf.WriteByte(byte(data.ResultCode))
if data.ResultCode == message.ResultCodeFailed {
msg := data.Msg
if len(data.Msg) > math.MaxInt16 {
msg = data.Msg[:math.MaxInt16]
if len(data.Msg) > math.MaxInt8 {
msg = data.Msg[:math.MaxInt8]
}
bytes.WriteString16Length(msg, buf)
bytes.WriteString8Length(msg, buf)
}
buf.WriteByte(byte(data.TransactionExceptionCode))
bytes.WriteString16Length(data.Xid, buf)
Expand Down
4 changes: 2 additions & 2 deletions pkg/protocol/codec/branch_rollback_response_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (g *BranchRollbackResponseCodec) Decode(in []byte) interface{} {

data.ResultCode = message.ResultCode(bytes.ReadByte(buf))
if data.ResultCode == message.ResultCodeFailed {
data.Msg = bytes.ReadString16Length(buf)
data.Msg = bytes.ReadString8Length(buf)
}
data.TransactionExceptionCode = serror.TransactionExceptionCode(bytes.ReadByte(buf))
data.Xid = bytes.ReadString16Length(buf)
Expand All @@ -59,7 +59,7 @@ func (g *BranchRollbackResponseCodec) Encode(in interface{}) []byte {
if len(data.Msg) > math.MaxInt16 {
msg = data.Msg[:math.MaxInt16]
}
bytes.WriteString16Length(msg, buf)
bytes.WriteString8Length(msg, buf)
}
buf.WriteByte(byte(data.TransactionExceptionCode))
bytes.WriteString16Length(data.Xid, buf)
Expand Down
35 changes: 21 additions & 14 deletions pkg/remoting/getty/getty_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ package getty

import (
"sync"
"time"

gxtime "github.com/dubbogo/gost/time"
"github.com/pkg/errors"
"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/protocol/codec"
"github.com/seata/seata-go/pkg/protocol/message"
"go.uber.org/atomic"
Expand Down Expand Up @@ -60,18 +62,18 @@ func (client *GettyRemotingClient) SendAsyncRequest(msg interface{}) error {
Compressor: 0,
Body: msg,
}
return GetGettyRemotingInstance().SendASync(rpcMessage)
return GetGettyRemotingInstance().SendASync(rpcMessage, nil, client.asyncCallback)
}

func (client *GettyRemotingClient) SendAsyncResponse(msg interface{}) error {
func (client *GettyRemotingClient) SendAsyncResponse(msgID int32, msg interface{}) error {
rpcMessage := message.RpcMessage{
ID: int32(client.idGenerator.Inc()),
ID: msgID,
Type: message.GettyRequestType_Response,
Codec: byte(codec.CodecTypeSeata),
Compressor: 0,
Body: msg,
}
return GetGettyRemotingInstance().SendASync(rpcMessage)
return GetGettyRemotingInstance().SendASync(rpcMessage, nil, nil)
}

func (client *GettyRemotingClient) SendSyncRequest(msg interface{}) (interface{}, error) {
Expand All @@ -82,16 +84,21 @@ func (client *GettyRemotingClient) SendSyncRequest(msg interface{}) (interface{}
Compressor: 0,
Body: msg,
}
return GetGettyRemotingInstance().SendSync(rpcMessage)
return GetGettyRemotingInstance().SendSync(rpcMessage, nil, client.syncCallback)
}

func (client *GettyRemotingClient) SendSyncRequestWithTimeout(msg interface{}, timeout time.Duration) (interface{}, error) {
rpcMessage := message.RpcMessage{
ID: int32(client.idGenerator.Inc()),
Type: message.GettyRequestType_RequestSync,
Codec: byte(codec.CodecTypeSeata),
Compressor: 0,
Body: msg,
func (g *GettyRemotingClient) asyncCallback(reqMsg message.RpcMessage, respMsg *message.MessageFuture) (interface{}, error) {
go g.asyncCallback(reqMsg, respMsg)
return nil, nil
}

func (g *GettyRemotingClient) syncCallback(reqMsg message.RpcMessage, respMsg *message.MessageFuture) (interface{}, error) {
select {
case <-gxtime.GetDefaultTimerWheel().After(RPC_REQUEST_TIMEOUT):
GetGettyRemotingInstance().RemoveMergedMessageFuture(reqMsg.ID)
log.Errorf("wait resp timeout: %#v", reqMsg)
return nil, errors.Errorf("wait response timeout, request: %#v", reqMsg)
case <-respMsg.Done:
return respMsg.Response, respMsg.Err
}
return GetGettyRemotingInstance().SendSyncWithTimeout(rpcMessage, timeout)
}
80 changes: 29 additions & 51 deletions pkg/remoting/getty/getty_remoting.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,26 @@ import (

getty "github.com/apache/dubbo-getty"

gxtime "github.com/dubbogo/gost/time"
"github.com/pkg/errors"
"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/protocol/message"
)

const (
RPC_REQUEST_TIMEOUT = 5 * time.Second
RPC_REQUEST_TIMEOUT = 2 * time.Second
)

var (
gettyRemoting *GettyRemoting
onceGettyRemoting = &sync.Once{}
)

type GettyRemoting struct {
futures *sync.Map
mergeMsgMap *sync.Map
}
type (
callbackMethod func(reqMsg message.RpcMessage, respMsg *message.MessageFuture) (interface{}, error)
GettyRemoting struct {
futures *sync.Map
mergeMsgMap *sync.Map
}
)

func GetGettyRemotingInstance() *GettyRemoting {
if gettyRemoting == nil {
Expand All @@ -55,67 +56,44 @@ func GetGettyRemotingInstance() *GettyRemoting {
return gettyRemoting
}

func (client *GettyRemoting) SendSync(msg message.RpcMessage) (interface{}, error) {
ss := sessionManager.AcquireGettySession()
return client.sendAsync(ss, msg, RPC_REQUEST_TIMEOUT)
}

func (client *GettyRemoting) SendSyncWithTimeout(msg message.RpcMessage, timeout time.Duration) (interface{}, error) {
ss := sessionManager.AcquireGettySession()
return client.sendAsync(ss, msg, timeout)
func (g *GettyRemoting) SendSync(msg message.RpcMessage, s getty.Session, callback callbackMethod) (interface{}, error) {
if s == nil {
s = sessionManager.selectSession()
}
return g.sendAsync(s, msg, callback)
}

func (client *GettyRemoting) SendASync(msg message.RpcMessage) error {
ss := sessionManager.AcquireGettySession()
_, err := client.sendAsync(ss, msg, 0*time.Second)
func (g *GettyRemoting) SendASync(msg message.RpcMessage, s getty.Session, callback callbackMethod) error {
if s == nil {
s = sessionManager.selectSession()
}
_, err := g.sendAsync(s, msg, callback)
return err
}

func (client *GettyRemoting) sendAsync(session getty.Session, msg message.RpcMessage, timeout time.Duration) (interface{}, error) {
log.Infof("send async message: {%#v}", msg)
func (g *GettyRemoting) sendAsync(session getty.Session, msg message.RpcMessage, callback callbackMethod) (interface{}, error) {
if _, ok := msg.Body.(message.HeartBeatMessage); ok {
log.Debug("send async message: {%#v}", msg)
} else {
log.Infof("send async message: {%#v}", msg)
}
var err error
if session == nil || session.IsClosed() {
log.Warn("sendAsyncRequestWithResponse nothing, caused by null channel.")
return nil, err
}
resp := message.NewMessageFuture(msg)
client.futures.Store(msg.ID, resp)
g.futures.Store(msg.ID, resp)
_, _, err = session.WritePkg(msg, time.Duration(0))
if err != nil {
client.futures.Delete(msg.ID)
g.futures.Delete(msg.ID)
log.Errorf("send message: %#v, session: %s", msg, session.Stat())
return nil, err
}

log.Debugf("send message: %#v, session: %s", msg, session.Stat())

actualTimeOut := timeout
if timeout <= time.Duration(0) {
// todo timeoue use config
actualTimeOut = time.Duration(2000)
}

wait := func() (interface{}, error) {
select {
case <-gxtime.GetDefaultTimerWheel().After(actualTimeOut):
client.futures.Delete(msg.ID)
if session != nil {
return nil, errors.Errorf("wait response timeout, ip: %s, request: %#v", session.RemoteAddr(), msg)
} else {
return nil, errors.Errorf("wait response timeout and session is nil, request: %#v", msg)
}
case <-resp.Done:
err = resp.Err
return resp.Response, err
}
}

if timeout > time.Duration(0) {
return wait()
} else {
go wait()
if callback != nil {
return callback(msg, resp)
}
return nil, err
return nil, nil
}

func (client *GettyRemoting) GetMessageFuture(msgID int32) *message.MessageFuture {
Expand Down
66 changes: 41 additions & 25 deletions pkg/remoting/getty/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
getty "github.com/apache/dubbo-getty"
"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/config"
"github.com/seata/seata-go/pkg/protocol/codec"
"github.com/seata/seata-go/pkg/protocol/message"
"github.com/seata/seata-go/pkg/remoting/processor"
"go.uber.org/atomic"
Expand All @@ -39,7 +40,8 @@ type gettyClientHandler struct {
idGenerator *atomic.Uint32
msgFutures *sync.Map
mergeMsgMap *sync.Map
processorTable map[message.MessageType]processor.RemotingProcessor
sessionManager *SessionManager
processorMap map[message.MessageType]processor.RemotingProcessor
}

func GetGettyClientHandlerInstance() *gettyClientHandler {
Expand All @@ -50,70 +52,84 @@ func GetGettyClientHandlerInstance() *gettyClientHandler {
idGenerator: &atomic.Uint32{},
msgFutures: &sync.Map{},
mergeMsgMap: &sync.Map{},
processorTable: make(map[message.MessageType]processor.RemotingProcessor, 0),
sessionManager: sessionManager,
processorMap: make(map[message.MessageType]processor.RemotingProcessor, 0),
}
})
}
return clientHandler
}

func (client *gettyClientHandler) OnOpen(session getty.Session) error {
sessionManager.RegisterGettySession(session)
func (g *gettyClientHandler) OnOpen(session getty.Session) error {
log.Infof("Open new getty session ")
g.sessionManager.registerSession(session)
go func() {
request := message.RegisterTMRequest{AbstractIdentifyRequest: message.AbstractIdentifyRequest{
Version: client.conf.SeataVersion,
ApplicationId: client.conf.ApplicationID,
TransactionServiceGroup: client.conf.TransactionServiceGroup,
Version: g.conf.SeataVersion,
ApplicationId: g.conf.ApplicationID,
TransactionServiceGroup: g.conf.TransactionServiceGroup,
}}
err := GetGettyRemotingClient().SendAsyncRequest(request)
//client.sendAsyncRequestWithResponse(session, request, RPC_REQUEST_TIMEOUT)
if err != nil {
log.Errorf("OnOpen error: {%#v}", err.Error())
sessionManager.ReleaseGettySession(session)
g.sessionManager.releaseSession(session)
return
}
//todo
//client.GettySessionOnOpenChannel <- session.RemoteAddr()
}()

return nil
}

func (client *gettyClientHandler) OnError(session getty.Session, err error) {
log.Infof("OnError session{%s} got error{%v}, will be closed.", session.Stat(), err)
sessionManager.ReleaseGettySession(session)
func (g *gettyClientHandler) OnError(session getty.Session, err error) {
log.Infof("session{%s} got error{%v}, will be closed.", session.Stat(), err)
g.sessionManager.releaseSession(session)
}

func (client *gettyClientHandler) OnClose(session getty.Session) {
log.Infof("OnClose session{%s} is closing......", session.Stat())
sessionManager.ReleaseGettySession(session)
func (g *gettyClientHandler) OnClose(session getty.Session) {
log.Infof("session{%s} is closing......", session.Stat())
g.sessionManager.releaseSession(session)
}

func (client *gettyClientHandler) OnMessage(session getty.Session, pkg interface{}) {
func (g *gettyClientHandler) OnMessage(session getty.Session, pkg interface{}) {
ctx := context.Background()
log.Debugf("received message: {%#v}", pkg)
log.Debug("received message: {%#v}", pkg)

rpcMessage, ok := pkg.(message.RpcMessage)
if !ok {
log.Errorf("received message is not protocol.RpcMessage. pkg: %#v", pkg)
return
}

if mm, ok := rpcMessage.Body.(message.MessageTypeAware); ok {
processor := client.processorTable[mm.GetTypeCode()]
processor := g.processorMap[mm.GetTypeCode()]
if processor != nil {
processor.Process(ctx, rpcMessage)
} else {
log.Errorf("This message type [%v] has no processor.", mm.GetTypeCode())
log.Errorf("This message type %v has no processor.", mm.GetTypeCode())
}
} else {
log.Errorf("This rpcMessage body %#v is not MessageTypeAware type.", rpcMessage.Body)
}
}

func (client *gettyClientHandler) OnCron(session getty.Session) {
//GetGettyRemotingClient().SendAsyncRequest(message.HeartBeatMessagePing)
func (g *gettyClientHandler) OnCron(session getty.Session) {
log.Debug("session{%s} Oncron executing", session.Stat())
g.transferBeatHeart(session, message.HeartBeatMessagePing)
}

func (g *gettyClientHandler) transferBeatHeart(session getty.Session, msg message.HeartBeatMessage) {
rpcMessage := message.RpcMessage{
ID: int32(g.idGenerator.Inc()),
Type: message.GettyRequestType_HeartbeatRequest,
Codec: byte(codec.CodecTypeSeata),
Compressor: 0,
Body: msg,
}
GetGettyRemotingInstance().SendASync(rpcMessage, session, nil)
}

func (client *gettyClientHandler) RegisterProcessor(msgType message.MessageType, processor processor.RemotingProcessor) {
func (g *gettyClientHandler) RegisterProcessor(msgType message.MessageType, processor processor.RemotingProcessor) {
if nil != processor {
client.processorTable[msgType] = processor
g.processorMap[msgType] = processor
}
}
Loading

0 comments on commit ed33bad

Please sign in to comment.