Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/observability gaps #406

Merged
merged 22 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 54 additions & 8 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ var (
// herd contains the handshake error response data
herd []byte
once sync.Once

noOpTracer = opentracing.NoopTracer{}
)

const handlerType = "handler"
Expand Down Expand Up @@ -498,22 +500,66 @@ func (a *agentImpl) write() {
for {
select {
case pWrite := <-a.chSend:
// close agent if low-level Conn broken
if _, err := a.conn.Write(pWrite.data); err != nil {
tracing.FinishSpan(pWrite.ctx, err)
metrics.ReportTimingFromCtx(pWrite.ctx, a.metricsReporters, handlerType, err)
logger.Log.Errorf("Failed to write in conn: %s", err.Error())
ctx, err, data := pWrite.ctx, pWrite.err, pWrite.data

writeErr := a.writeToConnection(ctx, data)
if writeErr != nil {
err = errors.NewError(writeErr, errors.ErrClosedRequest)

logger.Log.Errorf("Failed to write in conn: %s (ctx=%v), agent will close", writeErr.Error(), ctx)
}

tracing.FinishSpan(ctx, nil)
metrics.ReportTimingFromCtx(ctx, a.metricsReporters, handlerType, err)

// close agent if low-level conn broke during write
if writeErr != nil {
return
}
var e error
tracing.FinishSpan(pWrite.ctx, e)
metrics.ReportTimingFromCtx(pWrite.ctx, a.metricsReporters, handlerType, pWrite.err)
case <-a.chStopWrite:
return
}
}
}

func (a *agentImpl) writeToConnection(ctx context.Context, data []byte) error {
span := createConnectionSpan(ctx, a.conn, "conn write")

_, writeErr := a.conn.Write(data)

defer span.Finish()

if writeErr != nil {
tracing.LogError(span, writeErr.Error())
return writeErr
}

return nil
}

func createConnectionSpan(ctx context.Context, conn net.Conn, op string) opentracing.Span {
if ctx == nil {
return noOpTracer.StartSpan(op)
}

remoteAddress := ""
if conn.RemoteAddr() != nil {
remoteAddress = conn.RemoteAddr().String()
}

tags := opentracing.Tags{
"span.kind": "connection",
"addr": remoteAddress,
}

var parent opentracing.SpanContext
if span := opentracing.SpanFromContext(ctx); span != nil {
parent = span.Context()
}

return opentracing.StartSpan(op, opentracing.ChildOf(parent), tags)
}

// SendRequest sends a request to a server
func (a *agentImpl) SendRequest(ctx context.Context, serverID, route string, v interface{}) (*protos.Response, error) {
return nil, e.New("not implemented")
Expand Down
51 changes: 51 additions & 0 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ func TestAgentSendSerializeErr(t *testing.T) {

var wg sync.WaitGroup
wg.Add(1)
mockConn.EXPECT().RemoteAddr().Times(2).Return(&mockAddr{})
mockConn.EXPECT().Write(expectedPacket).Do(func(b []byte) {
wg.Done()
})
Expand Down Expand Up @@ -997,6 +998,7 @@ func TestAgentWriteChSend(t *testing.T) {

var wg sync.WaitGroup
wg.Add(1)
mockConn.EXPECT().RemoteAddr().Times(2).Return(&mockAddr{})
mockConn.EXPECT().Write(expectedPacket).Do(func(b []byte) {
time.Sleep(10 * time.Millisecond)
wg.Done()
Expand Down Expand Up @@ -1110,3 +1112,52 @@ func TestIPVersion(t *testing.T) {
})
}
}

func TestAgentWriteChSendWriteError(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

mockSerializer := serializemocks.NewMockSerializer(ctrl)
mockSerializer.EXPECT().GetName()

mockEncoder := codecmocks.NewMockPacketEncoder(ctrl)
mockConn := mocks.NewMockPlayerConn(ctrl)
messageEncoder := message.NewMessagesEncoder(false)
mockMetricsReporter := metricsmocks.NewMockReporter(ctrl)
mockMetricsReporter.EXPECT().ReportGauge(metrics.ConnectedClients, gomock.Any(), gomock.Any())

mockMetricsReporters := []metrics.Reporter{mockMetricsReporter}
sessionPool := session.NewSessionPool()

ag := newAgent(mockConn, nil, mockEncoder, mockSerializer, time.Second, 0, nil, messageEncoder, mockMetricsReporters, sessionPool).(*agentImpl)

ctx := getCtxWithRequestKeys()

expectedPacket := []byte("final")

writeError := errors.New("write error")

var wg sync.WaitGroup
wg.Add(2)

errorTags := map[string]string{}
errorTags["route"] = "route"
errorTags["status"] = "failed"
errorTags["type"] = "handler"
errorTags["code"] = e.ErrClosedRequest

mockMetricsReporter.EXPECT().ReportGauge(metrics.ConnectedClients, gomock.Any(), gomock.Any())
mockMetricsReporter.EXPECT().ReportSummary(metrics.ResponseTime, errorTags, gomock.Any())

mockConn.EXPECT().RemoteAddr().Return(&mockAddr{}).Times(3)
mockConn.EXPECT().Close().Do(func() {
wg.Done()
})
mockConn.EXPECT().Write(expectedPacket).Do(func(b []byte) {
wg.Done()
}).Return(0, writeError)

go ag.write()
ag.chSend <- pendingWrite{ctx: ctx, data: expectedPacket, err: nil}
wg.Wait()
}
4 changes: 4 additions & 0 deletions cluster/nats_rpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,8 @@ func (ns *NatsRPCServer) getUserKickChannel() chan *protos.KickMsg {
func (ns *NatsRPCServer) marshalResponse(res *protos.Response) ([]byte, error) {
p, err := proto.Marshal(res)
if err != nil {
logger.Log.Errorf("error marshaling response: %s", err.Error())

res := &protos.Response{
Error: &protos.Error{
Code: e.ErrUnknownCode,
Expand All @@ -271,6 +273,8 @@ func (ns *NatsRPCServer) processMessages(threadID int) {
Msg: err.Error(),
},
}

logger.Log.Errorf("error getting context from request: %s", err)
} else {
ns.responses[threadID], err = ns.pitayaServer.Call(ctx, ns.requests[threadID])
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ const ErrBadRequestCode = "PIT-400"
// ErrClientClosedRequest is a string code representing the client closed request error
const ErrClientClosedRequest = "PIT-499"

// ErrClosedRequest is a string code representing the closed request error
const ErrClosedRequest = "PIT-498"

// Error is an error with a code, message and metadata
type Error struct {
Code string
Expand Down
6 changes: 5 additions & 1 deletion service/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,13 +344,17 @@ func (h *HandlerService) localProcess(ctx context.Context, a agent.Agent, route
} else {
err := a.GetSession().ResponseMID(ctx, mid, ret)
if err != nil {
logger.Log.Errorf("Failed to process handler message: %s", err.Error())
tracing.FinishSpan(ctx, err)
metrics.ReportTimingFromCtx(ctx, h.metricsReporters, handlerType, err)
}
}
} else {
metrics.ReportTimingFromCtx(ctx, h.metricsReporters, handlerType, nil)
metrics.ReportTimingFromCtx(ctx, h.metricsReporters, handlerType, err)
tracing.FinishSpan(ctx, err)
if err != nil {
logger.Log.Errorf("Failed to process notify message: %s", err.Error())
}
}
}

Expand Down
1 change: 1 addition & 0 deletions service/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,7 @@ func (r *RemoteService) remoteCall(
if target == nil {
target, err = r.router.Route(ctx, rpcType, svType, route, msg)
if err != nil {
logger.Log.Errorf("error making call for route %s: %w", route.String(), err)
return nil, e.NewError(err, e.ErrInternalCode)
}
}
Expand Down
Loading