Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(agent): add timeout on write calls #426

Open
wants to merge 3 commits into
base: v2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 49 additions & 17 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ import (
e "errors"
"fmt"
"net"
"os"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/topfreegames/pitaya/v2/config"
"github.com/topfreegames/pitaya/v2/conn/codec"
"github.com/topfreegames/pitaya/v2/conn/message"
"github.com/topfreegames/pitaya/v2/conn/packet"
Expand Down Expand Up @@ -76,6 +78,7 @@ type (
decoder codec.PacketDecoder // binary decoder
encoder codec.PacketEncoder // binary encoder
heartbeatTimeout time.Duration
writeTimeout time.Duration
lastAt int64 // last heartbeat unix time stamp
messageEncoder message.Encoder
messagesBufferSize int // size of the pending messages buffer
Expand Down Expand Up @@ -130,6 +133,7 @@ type (
decoder codec.PacketDecoder // binary decoder
encoder codec.PacketEncoder // binary encoder
heartbeatTimeout time.Duration
writeTimeout time.Duration
messageEncoder message.Encoder
messagesBufferSize int // size of the pending messages buffer
metricsReporters []metrics.Reporter
Expand All @@ -144,6 +148,7 @@ func NewAgentFactory(
encoder codec.PacketEncoder,
serializer serialize.Serializer,
heartbeatTimeout time.Duration,
writeTimeout time.Duration,
messageEncoder message.Encoder,
messagesBufferSize int,
sessionPool session.SessionPool,
Expand All @@ -154,6 +159,7 @@ func NewAgentFactory(
decoder: decoder,
encoder: encoder,
heartbeatTimeout: heartbeatTimeout,
writeTimeout: writeTimeout,
messageEncoder: messageEncoder,
messagesBufferSize: messagesBufferSize,
sessionPool: sessionPool,
Expand All @@ -164,7 +170,7 @@ func NewAgentFactory(

// CreateAgent returns a new agent
func (f *agentFactoryImpl) CreateAgent(conn net.Conn) Agent {
return newAgent(conn, f.decoder, f.encoder, f.serializer, f.heartbeatTimeout, f.messagesBufferSize, f.appDieChan, f.messageEncoder, f.metricsReporters, f.sessionPool)
return newAgent(conn, f.decoder, f.encoder, f.serializer, f.heartbeatTimeout, f.writeTimeout, f.messagesBufferSize, f.appDieChan, f.messageEncoder, f.metricsReporters, f.sessionPool)
}

// NewAgent create new agent instance
Expand All @@ -174,6 +180,7 @@ func newAgent(
packetEncoder codec.PacketEncoder,
serializer serialize.Serializer,
heartbeatTime time.Duration,
writeTimeout time.Duration,
messagesBufferSize int,
dieChan chan bool,
messageEncoder message.Encoder,
Expand All @@ -188,6 +195,10 @@ func newAgent(
herdEncode(heartbeatTime, packetEncoder, messageEncoder.IsCompressionEnabled(), serializerName)
})

if writeTimeout <= 0 {
writeTimeout = config.DefaultWriteTimeout
}

a := &agentImpl{
appDieChan: dieChan,
chDie: make(chan struct{}),
Expand All @@ -199,6 +210,7 @@ func newAgent(
decoder: packetDecoder,
encoder: packetEncoder,
heartbeatTimeout: heartbeatTime,
writeTimeout: writeTimeout,
lastAt: time.Now().Unix(),
serializer: serializer,
state: constants.StatusStart,
Expand Down Expand Up @@ -251,20 +263,32 @@ func (a *agentImpl) packetEncodeMessage(m *message.Message) ([]byte, error) {

func (a *agentImpl) send(pendingMsg pendingMessage) (err error) {
defer func() {
if e := recover(); e != nil {
err = errors.NewError(constants.ErrBrokenPipe, errors.ErrClientClosedRequest)
if panicErr := recover(); panicErr != nil {
err = errors.NewError(
fmt.Errorf("%s: %s", constants.ErrBrokenPipe.Error(), panicErr),
errors.ErrClientClosedRequest,
)
logger.Log.Error("agent send panicked: ", err)
}
}()
a.reportChannelSize()

m, err := a.getMessageFromPendingMessage(pendingMsg)
if err != nil {
logger.Log.Errorf(
"agent send failed when getting pending msg. route: %s, type: %s, err: %s",
pendingMsg.route, &pendingMsg.typ, err,
)
return err
}

// packet encode
p, err := a.packetEncodeMessage(m)
if err != nil {
logger.Log.Errorf(
"agent send failed when encoding the msg. route: %s, type: %s, err: %s",
pendingMsg.route, &pendingMsg.typ, err,
)
return err
}

Expand Down Expand Up @@ -503,19 +527,29 @@ func (a *agentImpl) write() {
ctx, err, data := pWrite.ctx, pWrite.err, pWrite.data

writeErr := a.writeToConnection(ctx, data)
if writeErr != nil {
err = errors.NewError(writeErr, errors.ErrClosedRequest)

logger.Log.Errorf("Failed to write in conn: %s (ctx=%v), agent will close", writeErr.Error(), ctx)
}

tracing.FinishSpan(ctx, nil)
metrics.ReportTimingFromCtx(ctx, a.metricsReporters, handlerType, err)

// close agent if low-level conn broke during write
if writeErr != nil {
return
if e.Is(writeErr, os.ErrDeadlineExceeded) {
// Log the timeout error but continue processing
logger.Log.Warnf(
"Context deadline exceeded for write in conn (%s) | session (%s): %s",
a.conn.RemoteAddr(), a.Session.UID(), writeErr.Error(),
)
} else {
err = errors.NewError(writeErr, errors.ErrClosedRequest)
logger.Log.Errorf(
"Failed to write in conn (%s) | session (%s): %s, agent will close",
a.conn.RemoteAddr(), a.Session.UID(), writeErr.Error(),
)
metrics.ReportTimingFromCtx(ctx, a.metricsReporters, handlerType, err)
// close agent if low-level conn broke during write
return
}
}

metrics.ReportTimingFromCtx(ctx, a.metricsReporters, handlerType, err)
case <-a.chStopWrite:
return
}
Expand All @@ -524,17 +558,15 @@ func (a *agentImpl) write() {

func (a *agentImpl) writeToConnection(ctx context.Context, data []byte) error {
span := createConnectionSpan(ctx, a.conn, "conn write")

_, writeErr := a.conn.Write(data)

defer span.Finish()

a.conn.SetWriteDeadline(time.Now().Add(a.writeTimeout))
_, writeErr := a.conn.Write(data)
if writeErr != nil {
tracing.LogError(span, writeErr.Error())
return writeErr
}

return nil
return writeErr
}

func createConnectionSpan(ctx context.Context, conn net.Conn, op string) opentracing.Span {
Expand All @@ -549,7 +581,7 @@ func createConnectionSpan(ctx context.Context, conn net.Conn, op string) opentra

tags := opentracing.Tags{
"span.kind": "connection",
"addr": remoteAddress,
"addr": remoteAddress,
}

var parent opentracing.SpanContext
Expand Down
Loading