Skip to content

Commit

Permalink
feat(agent): add timeout on write calls
Browse files Browse the repository at this point in the history
When sending a message via agent, the write connection does not have
a default timeout, leading to blocking calls if the connections is
unhealthy or simply a bad route to it. Thus, adding a write timeout
to the writes so we fail but don't block. If deadline is exceeded then
we log an error but don't close a connection, since it might fail due
to bad route but not a broken pipe per-se. The new configuration can
be found under: `pitaya.buffer.agent.writetimeout`, with 10s by deafult.

The timeout is set using `SetWriteDeadline` function from go net pkg:
https://pkg.go.dev/net#Conn
  • Loading branch information
hspedro committed Jan 17, 2025
1 parent be642be commit 74298ad
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 49 deletions.
44 changes: 29 additions & 15 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ import (
e "errors"
"fmt"
"net"
"os"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/topfreegames/pitaya/v2/config"
"github.com/topfreegames/pitaya/v2/conn/codec"
"github.com/topfreegames/pitaya/v2/conn/message"
"github.com/topfreegames/pitaya/v2/conn/packet"
Expand Down Expand Up @@ -76,6 +78,7 @@ type (
decoder codec.PacketDecoder // binary decoder
encoder codec.PacketEncoder // binary encoder
heartbeatTimeout time.Duration
writeTimeout time.Duration
lastAt int64 // last heartbeat unix time stamp
messageEncoder message.Encoder
messagesBufferSize int // size of the pending messages buffer
Expand Down Expand Up @@ -130,6 +133,7 @@ type (
decoder codec.PacketDecoder // binary decoder
encoder codec.PacketEncoder // binary encoder
heartbeatTimeout time.Duration
writeTimeout time.Duration
messageEncoder message.Encoder
messagesBufferSize int // size of the pending messages buffer
metricsReporters []metrics.Reporter
Expand All @@ -144,6 +148,7 @@ func NewAgentFactory(
encoder codec.PacketEncoder,
serializer serialize.Serializer,
heartbeatTimeout time.Duration,
writeTimeout time.Duration,
messageEncoder message.Encoder,
messagesBufferSize int,
sessionPool session.SessionPool,
Expand All @@ -154,6 +159,7 @@ func NewAgentFactory(
decoder: decoder,
encoder: encoder,
heartbeatTimeout: heartbeatTimeout,
writeTimeout: writeTimeout,
messageEncoder: messageEncoder,
messagesBufferSize: messagesBufferSize,
sessionPool: sessionPool,
Expand All @@ -164,7 +170,7 @@ func NewAgentFactory(

// CreateAgent returns a new agent
func (f *agentFactoryImpl) CreateAgent(conn net.Conn) Agent {
return newAgent(conn, f.decoder, f.encoder, f.serializer, f.heartbeatTimeout, f.messagesBufferSize, f.appDieChan, f.messageEncoder, f.metricsReporters, f.sessionPool)
return newAgent(conn, f.decoder, f.encoder, f.serializer, f.heartbeatTimeout, f.writeTimeout, f.messagesBufferSize, f.appDieChan, f.messageEncoder, f.metricsReporters, f.sessionPool)
}

// NewAgent create new agent instance
Expand All @@ -174,6 +180,7 @@ func newAgent(
packetEncoder codec.PacketEncoder,
serializer serialize.Serializer,
heartbeatTime time.Duration,
writeTimeout time.Duration,
messagesBufferSize int,
dieChan chan bool,
messageEncoder message.Encoder,
Expand All @@ -188,6 +195,10 @@ func newAgent(
herdEncode(heartbeatTime, packetEncoder, messageEncoder.IsCompressionEnabled(), serializerName)
})

if writeTimeout <= 0 {
writeTimeout = config.DefaultWriteTimeout
}

a := &agentImpl{
appDieChan: dieChan,
chDie: make(chan struct{}),
Expand All @@ -199,6 +210,7 @@ func newAgent(
decoder: packetDecoder,
encoder: packetEncoder,
heartbeatTimeout: heartbeatTime,
writeTimeout: writeTimeout,
lastAt: time.Now().Unix(),
serializer: serializer,
state: constants.StatusStart,
Expand Down Expand Up @@ -503,19 +515,23 @@ func (a *agentImpl) write() {
ctx, err, data := pWrite.ctx, pWrite.err, pWrite.data

writeErr := a.writeToConnection(ctx, data)
if writeErr != nil {
err = errors.NewError(writeErr, errors.ErrClosedRequest)

logger.Log.Errorf("Failed to write in conn: %s (ctx=%v), agent will close", writeErr.Error(), ctx)
}

tracing.FinishSpan(ctx, nil)
metrics.ReportTimingFromCtx(ctx, a.metricsReporters, handlerType, err)

// close agent if low-level conn broke during write
if writeErr != nil {
return
if e.Is(writeErr, os.ErrDeadlineExceeded) {
// Log the timeout error but continue processing
logger.Log.Warnf("Context deadline exceeded for write in conn %s: %s (ctx=%v)", writeErr.Error(), ctx)
} else {
err = errors.NewError(writeErr, errors.ErrClosedRequest)
logger.Log.Errorf("Failed to write in conn: %s (ctx=%v), agent will close", writeErr.Error(), ctx)
metrics.ReportTimingFromCtx(ctx, a.metricsReporters, handlerType, err)
// close agent if low-level conn broke during write
return
}
}

metrics.ReportTimingFromCtx(ctx, a.metricsReporters, handlerType, err)
case <-a.chStopWrite:
return
}
Expand All @@ -524,17 +540,15 @@ func (a *agentImpl) write() {

func (a *agentImpl) writeToConnection(ctx context.Context, data []byte) error {
span := createConnectionSpan(ctx, a.conn, "conn write")

_, writeErr := a.conn.Write(data)

defer span.Finish()

a.conn.SetWriteDeadline(time.Now().Add(a.writeTimeout))
_, writeErr := a.conn.Write(data)
if writeErr != nil {
tracing.LogError(span, writeErr.Error())
return writeErr
}

return nil
return writeErr
}

func createConnectionSpan(ctx context.Context, conn net.Conn, op string) opentracing.Span {
Expand All @@ -549,7 +563,7 @@ func createConnectionSpan(ctx context.Context, conn net.Conn, op string) opentra

tags := opentracing.Tags{
"span.kind": "connection",
"addr": remoteAddress,
"addr": remoteAddress,
}

var parent opentracing.SpanContext
Expand Down
Loading

0 comments on commit 74298ad

Please sign in to comment.