Skip to content

Commit

Permalink
feat(agent): add timeout on write calls
Browse files Browse the repository at this point in the history
When sending a message via agent, adding a timeout to write
calls so it does not block on deadend connections. By default,
the value is set to 10 seconds, but it can be configured via
pitaya.agent.buffer.writetimeout variable
  • Loading branch information
hspedro committed Dec 17, 2024
1 parent c37f8a0 commit 7d97542
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 53 deletions.
70 changes: 51 additions & 19 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"sync/atomic"
"time"

"github.com/topfreegames/pitaya/v2/config"
"github.com/topfreegames/pitaya/v2/conn/codec"
"github.com/topfreegames/pitaya/v2/conn/message"
"github.com/topfreegames/pitaya/v2/conn/packet"
Expand Down Expand Up @@ -76,6 +77,7 @@ type (
decoder codec.PacketDecoder // binary decoder
encoder codec.PacketEncoder // binary encoder
heartbeatTimeout time.Duration
writeTimeout time.Duration
lastAt int64 // last heartbeat unix time stamp
messageEncoder message.Encoder
messagesBufferSize int // size of the pending messages buffer
Expand Down Expand Up @@ -130,6 +132,7 @@ type (
decoder codec.PacketDecoder // binary decoder
encoder codec.PacketEncoder // binary encoder
heartbeatTimeout time.Duration
writeTimeout time.Duration
messageEncoder message.Encoder
messagesBufferSize int // size of the pending messages buffer
metricsReporters []metrics.Reporter
Expand All @@ -144,6 +147,7 @@ func NewAgentFactory(
encoder codec.PacketEncoder,
serializer serialize.Serializer,
heartbeatTimeout time.Duration,
writeTimeout time.Duration,
messageEncoder message.Encoder,
messagesBufferSize int,
sessionPool session.SessionPool,
Expand All @@ -154,6 +158,7 @@ func NewAgentFactory(
decoder: decoder,
encoder: encoder,
heartbeatTimeout: heartbeatTimeout,
writeTimeout: writeTimeout,
messageEncoder: messageEncoder,
messagesBufferSize: messagesBufferSize,
sessionPool: sessionPool,
Expand All @@ -164,7 +169,7 @@ func NewAgentFactory(

// CreateAgent returns a new agent
func (f *agentFactoryImpl) CreateAgent(conn net.Conn) Agent {
return newAgent(conn, f.decoder, f.encoder, f.serializer, f.heartbeatTimeout, f.messagesBufferSize, f.appDieChan, f.messageEncoder, f.metricsReporters, f.sessionPool)
return newAgent(conn, f.decoder, f.encoder, f.serializer, f.heartbeatTimeout, f.writeTimeout, f.messagesBufferSize, f.appDieChan, f.messageEncoder, f.metricsReporters, f.sessionPool)
}

// NewAgent create new agent instance
Expand All @@ -174,6 +179,7 @@ func newAgent(
packetEncoder codec.PacketEncoder,
serializer serialize.Serializer,
heartbeatTime time.Duration,
writeTimeout time.Duration,
messagesBufferSize int,
dieChan chan bool,
messageEncoder message.Encoder,
Expand All @@ -199,6 +205,7 @@ func newAgent(
decoder: packetDecoder,
encoder: packetEncoder,
heartbeatTimeout: heartbeatTime,
writeTimeout: writeTimeout,
lastAt: time.Now().Unix(),
serializer: serializer,
state: constants.StatusStart,
Expand Down Expand Up @@ -500,21 +507,34 @@ func (a *agentImpl) write() {
for {
select {
case pWrite := <-a.chSend:
ctx, err, data := pWrite.ctx, pWrite.err, pWrite.data

writeErr := a.writeToConnection(ctx, data)
if writeErr != nil {
err = errors.NewError(writeErr, errors.ErrClosedRequest)

logger.Log.Errorf("Failed to write in conn: %s (ctx=%v), agent will close", writeErr.Error(), ctx)
ctxTimeout := a.writeTimeout
if ctxTimeout <= 0 {
ctxTimeout = config.DefaultWriteTimeout
}
writeCtx := pWrite.ctx
if writeCtx == nil {
writeCtx = context.Background()
}
ctx, cancel := context.WithTimeout(writeCtx, ctxTimeout)
defer cancel()

err, data := pWrite.err, pWrite.data

writeErr := a.writeToConnection(ctx, data)
tracing.FinishSpan(ctx, nil)
metrics.ReportTimingFromCtx(ctx, a.metricsReporters, handlerType, err)

// close agent if low-level conn broke during write
if writeErr != nil {
return
if e.Is(writeErr, context.DeadlineExceeded) {
// Log the timeout error but continue processing
logger.Log.Warnf("Context deadline exceeded for write in conn: %s (ctx=%v)", writeErr.Error(), ctx)
metrics.ReportTimingFromCtx(ctx, a.metricsReporters, handlerType, err)
} else {
err = errors.NewError(writeErr, errors.ErrClosedRequest)
logger.Log.Errorf("Failed to write in conn: %s (ctx=%v), agent will close", writeErr.Error(), ctx)
metrics.ReportTimingFromCtx(ctx, a.metricsReporters, handlerType, err)
// close agent if low-level conn broke during write
return
}
}
case <-a.chStopWrite:
return
Expand All @@ -524,17 +544,29 @@ func (a *agentImpl) write() {

func (a *agentImpl) writeToConnection(ctx context.Context, data []byte) error {
span := createConnectionSpan(ctx, a.conn, "conn write")
defer span.Finish()

_, writeErr := a.conn.Write(data)
done := make(chan error, 1)

defer span.Finish()
go func() {
_, writeErr := a.conn.Write(data)
done <- writeErr
close(done)
}()

if writeErr != nil {
tracing.LogError(span, writeErr.Error())
return writeErr
}
select {
case <-ctx.Done():
err := ctx.Err() // Context was cancelled or timed out
tracing.LogError(span, err.Error())
return err

return nil
case writeErr := <-done:
if writeErr != nil {
tracing.LogError(span, writeErr.Error())
return writeErr
}
return nil
}
}

func createConnectionSpan(ctx context.Context, conn net.Conn, op string) opentracing.Span {
Expand All @@ -549,7 +581,7 @@ func createConnectionSpan(ctx context.Context, conn net.Conn, op string) opentra

tags := opentracing.Tags{
"span.kind": "connection",
"addr": remoteAddress,
"addr": remoteAddress,
}

var parent opentracing.SpanContext
Expand Down
Loading

0 comments on commit 7d97542

Please sign in to comment.