Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First set of log improvements #349

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 17 additions & 14 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/topfreegames/pitaya/v2/constants"
"github.com/topfreegames/pitaya/v2/errors"
"github.com/topfreegames/pitaya/v2/logger"
"github.com/topfreegames/pitaya/v2/logger/interfaces"
"github.com/topfreegames/pitaya/v2/metrics"
"github.com/topfreegames/pitaya/v2/protos"
"github.com/topfreegames/pitaya/v2/serialize"
Expand Down Expand Up @@ -80,6 +81,7 @@ type (
metricsReporters []metrics.Reporter
serializer serialize.Serializer // message serializer
state int32 // current agent state
logger interfaces.Logger
}

pendingMessage struct {
Expand Down Expand Up @@ -203,10 +205,11 @@ func newAgent(
messageEncoder: messageEncoder,
metricsReporters: metricsReporters,
sessionPool: sessionPool,
logger: logger.Log,
}

// binding session
s := sessionPool.NewSession(a, true)
a.logger = a.logger.WithField("session_id", s.ID())
metrics.ReportNumberOfConnectedClients(metricsReporters, sessionPool.GetSessionCount())
a.Session = s
return a
Expand Down Expand Up @@ -296,10 +299,10 @@ func (a *agentImpl) Push(route string, v interface{}) error {

switch d := v.(type) {
case []byte:
logger.Log.Debugf("Type=Push, ID=%d, UID=%s, Route=%s, Data=%dbytes",
a.logger.Debugf("Type=Push, ID=%d, UID=%s, Route=%s, Data=%dbytes",
a.Session.ID(), a.Session.UID(), route, len(d))
default:
logger.Log.Debugf("Type=Push, ID=%d, UID=%s, Route=%s, Data=%+v",
a.logger.Debugf("Type=Push, ID=%d, UID=%s, Route=%s, Data=%+v",
a.Session.ID(), a.Session.UID(), route, v)
}
return a.send(pendingMessage{typ: message.Push, route: route, payload: v})
Expand All @@ -322,10 +325,10 @@ func (a *agentImpl) ResponseMID(ctx context.Context, mid uint, v interface{}, is

switch d := v.(type) {
case []byte:
logger.Log.Debugf("Type=Response, ID=%d, UID=%s, MID=%d, Data=%dbytes",
a.logger.Debugf("Type=Response, ID=%d, UID=%s, MID=%d, Data=%dbytes",
a.Session.ID(), a.Session.UID(), mid, len(d))
default:
logger.Log.Infof("Type=Response, ID=%d, UID=%s, MID=%d, Data=%+v",
a.logger.Infof("Type=Response, ID=%d, UID=%s, MID=%d, Data=%+v",
a.Session.ID(), a.Session.UID(), mid, v)
}

Expand All @@ -342,7 +345,7 @@ func (a *agentImpl) Close() error {
}
a.SetStatus(constants.StatusClosed)

logger.Log.Debugf("Session closed, ID=%d, UID=%s, IP=%s",
a.logger.Debugf("Session closed, ID=%d, UID=%s, IP=%s",
a.Session.ID(), a.Session.UID(), a.conn.RemoteAddr())

// prevent closing closed channel
Expand Down Expand Up @@ -402,7 +405,7 @@ func (a *agentImpl) SetStatus(state int32) {
func (a *agentImpl) Handle() {
defer func() {
a.Close()
logger.Log.Debugf("Session handle goroutine exit, SessionID=%d, UID=%s", a.Session.ID(), a.Session.UID())
a.logger.Debugf("Session handle goroutine exit, SessionID=%d, UID=%s", a.Session.ID(), a.Session.UID())
}()

go a.write()
Expand Down Expand Up @@ -440,7 +443,7 @@ func (a *agentImpl) heartbeat() {
case <-ticker.C:
deadline := time.Now().Add(-2 * a.heartbeatTimeout).Unix()
if atomic.LoadInt64(&a.lastAt) < deadline {
logger.Log.Debugf("Session heartbeat timeout, LastTime=%d, Deadline=%d", atomic.LoadInt64(&a.lastAt), deadline)
a.logger.Debugf("Session heartbeat timeout, LastTime=%d, Deadline=%d", atomic.LoadInt64(&a.lastAt), deadline)
return
}

Expand All @@ -463,7 +466,7 @@ func (a *agentImpl) heartbeat() {
func (a *agentImpl) onSessionClosed(s session.Session) {
defer func() {
if err := recover(); err != nil {
logger.Log.Errorf("pitaya/onSessionClosed: %v", err)
a.logger.Errorf("pitaya/onSessionClosed: %v", err)
}
}()

Expand Down Expand Up @@ -502,7 +505,7 @@ func (a *agentImpl) write() {
if _, err := a.conn.Write(pWrite.data); err != nil {
tracing.FinishSpan(pWrite.ctx, err)
metrics.ReportTimingFromCtx(pWrite.ctx, a.metricsReporters, handlerType, err)
logger.Log.Errorf("Failed to write in conn: %s", err.Error())
a.logger.Errorf("Failed to write in conn: %s", err.Error())
return
}
var e error
Expand Down Expand Up @@ -536,12 +539,12 @@ func (a *agentImpl) AnswerWithError(ctx context.Context, mid uint, err error) {
}
p, e := util.GetErrorPayload(a.serializer, err)
if e != nil {
logger.Log.Errorf("error answering the user with an error: %s", e.Error())
a.logger.Errorf("error answering the user with an error: %s", e.Error())
return
}
e = a.Session.ResponseMID(ctx, mid, p, true)
if e != nil {
logger.Log.Errorf("error answering the user with an error: %s", e.Error())
a.logger.Errorf("error answering the user with an error: %s", e.Error())
}
}

Expand Down Expand Up @@ -614,11 +617,11 @@ func encodeAndCompress(data interface{}, dataCompression bool) ([]byte, error) {
func (a *agentImpl) reportChannelSize() {
chSendCapacity := a.messagesBufferSize - len(a.chSend)
if chSendCapacity == 0 {
logger.Log.Warnf("chSend is at maximum capacity")
a.logger.Warnf("chSend is at maximum capacity")
}
for _, mr := range a.metricsReporters {
if err := mr.ReportGauge(metrics.ChannelCapacity, map[string]string{"channel": "agent_chsend"}, float64(chSendCapacity)); err != nil {
logger.Log.Warnf("failed to report chSend channel capaacity: %s", err.Error())
a.logger.Warnf("failed to report chSend channel capacity: %s", err.Error())
}
}
}
5 changes: 4 additions & 1 deletion agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
e "github.com/topfreegames/pitaya/v2/errors"
"github.com/topfreegames/pitaya/v2/helpers"
"github.com/topfreegames/pitaya/v2/metrics"
"github.com/topfreegames/pitaya/v2/logger"
metricsmocks "github.com/topfreegames/pitaya/v2/metrics/mocks"
"github.com/topfreegames/pitaya/v2/mocks"
"github.com/topfreegames/pitaya/v2/protos"
Expand Down Expand Up @@ -223,6 +224,7 @@ func TestAgentSendSerializeErr(t *testing.T) {
messageEncoder: messageEncoder,
metricsReporters: mockMetricsReporters,
Session: sessionPool.NewSession(nil, true),
logger: logger.Log,
}

ctx := getCtxWithRequestKeys()
Expand Down Expand Up @@ -989,6 +991,7 @@ func TestAgentWriteChSend(t *testing.T) {
serializer: mockSerializer,
messageEncoder: messageEncoder,
metricsReporters: mockMetricsReporters,
logger: logger.Log,
}
ctx := getCtxWithRequestKeys()
mockMetricsReporters[0].(*metricsmocks.MockReporter).EXPECT().ReportSummary(metrics.ResponseTime, gomock.Any(), gomock.Any())
Expand Down Expand Up @@ -1104,7 +1107,7 @@ func TestIPVersion(t *testing.T) {
mockAddr := &customMockAddr{str: table.addr}

mockConn.EXPECT().RemoteAddr().Return(mockAddr)
a := &agentImpl{conn: mockConn}
a := &agentImpl{conn: mockConn, logger: logger.Log}

assert.Equal(t, table.ipVersion, a.IPVersion())
})
Expand Down
10 changes: 7 additions & 3 deletions service/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,17 +109,21 @@ func (r *RemoteService) remoteProcess(
route *route.Route,
msg *message.Message,
) {
log := logger.Log.WithFields(map[string]interface{}{
"route": route.String(),
"uid": a.GetSession().UID(),
})
res, err := r.remoteCall(ctx, server, protos.RPCType_Sys, route, a.GetSession(), msg)
switch msg.Type {
case message.Request:
if err != nil {
logger.Log.Errorf("Failed to process remote server: %s", err.Error())
log.Errorf("Failed to process remote server: %s", err.Error())
a.AnswerWithError(ctx, msg.ID, err)
return
}
err := a.GetSession().ResponseMID(ctx, msg.ID, res.Data)
if err != nil {
logger.Log.Errorf("Failed to respond to remote server: %s", err.Error())
log.Errorf("Failed to respond to remote server: %s", err.Error())
a.AnswerWithError(ctx, msg.ID, err)
}
case message.Notify:
Expand All @@ -128,7 +132,7 @@ func (r *RemoteService) remoteProcess(
err = errors.New(res.Error.GetMsg())
}
if err != nil {
logger.Log.Errorf("error while sending a notify to server: %s", err.Error())
log.Errorf("error while sending a notify to server: %s", err.Error())
}
}
}
Expand Down
1 change: 1 addition & 0 deletions service/remote_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,7 @@ func TestRemoteServiceRemoteProcess(t *testing.T) {
} else if expectedMsg.Type != message.Notify {
mockSession.EXPECT().ResponseMID(ctx, expectedMsg.ID, gomock.Any()).Return(table.responseMIDErr)
}
mockSession.EXPECT().UID()

if table.responseMIDErr != nil {
mockAgent.EXPECT().AnswerWithError(ctx, expectedMsg.ID, table.responseMIDErr)
Expand Down
18 changes: 14 additions & 4 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
nats "github.com/nats-io/nats.go"
"github.com/topfreegames/pitaya/v2/constants"
"github.com/topfreegames/pitaya/v2/logger"
"github.com/topfreegames/pitaya/v2/logger/interfaces"
"github.com/topfreegames/pitaya/v2/networkentity"
"github.com/topfreegames/pitaya/v2/protos"
)
Expand Down Expand Up @@ -99,6 +100,7 @@ type sessionImpl struct {
Subscriptions []*nats.Subscription // subscription created on bind when using nats rpc server
requestsInFlight ReqInFlight // whether the session is waiting from a response from a remote
pool *sessionPoolImpl
logger interfaces.Logger // logger instance for this session
}

type ReqInFlight struct {
Expand Down Expand Up @@ -190,13 +192,16 @@ func (pool *sessionPoolImpl) NewSession(entity networkentity.NetworkEntity, fron
IsFrontend: frontend,
pool: pool,
requestsInFlight: ReqInFlight{m: make(map[string]string)},
logger: logger.Log,
}
if frontend {
pool.sessionsByID.Store(s.id, s)
atomic.AddInt64(&pool.SessionCount, 1)
}
s.logger = s.logger.WithField("session_id",s.id)
if len(UID) > 0 {
s.uid = UID[0]
s.logger = s.logger.WithField("uid",s.uid)
}
return s
}
Expand Down Expand Up @@ -419,13 +424,15 @@ func (s *sessionImpl) Bind(ctx context.Context, uid string) error {
}

if s.UID() != "" {
s.logger.Debugf("Error trying to bind UID %s. A UID is already bound in this session", uid)
return constants.ErrSessionAlreadyBound
}

s.uid = uid
for _, cb := range s.pool.sessionBindCallbacks {
err := cb(ctx, s)
if err != nil {
s.logger.Error("Error running session bind callback. Removing uid from session")
s.uid = ""
return err
}
Expand All @@ -435,6 +442,7 @@ func (s *sessionImpl) Bind(ctx context.Context, uid string) error {
if s.IsFrontend {
// If a session with the same UID already exists in this frontend server, close it
if val, ok := s.pool.sessionsByUID.Load(uid); ok {
s.logger.Warn("A session for this UID %s already existed in this frontend, on session ID %v closing it", val.(Session).ID())
val.(Session).Close()
}
s.pool.sessionsByUID.Store(uid, s)
Expand All @@ -443,7 +451,7 @@ func (s *sessionImpl) Bind(ctx context.Context, uid string) error {
// is not the frontend server that received the user request
err := s.bindInFront(ctx)
if err != nil {
logger.Log.Error("error while trying to push session to front: ", err)
s.logger.Error("error while trying to push session to front: ", err)
s.uid = ""
return err
}
Expand Down Expand Up @@ -485,6 +493,7 @@ func (s *sessionImpl) OnClose(c func()) error {
func (s *sessionImpl) Close() {
atomic.AddInt64(&s.pool.SessionCount, -1)
s.pool.sessionsByID.Delete(s.ID())
s.logger.Debug("Closing session")
// Only remove session by UID if the session ID matches the one being closed. This avoids problems with removing a valid session after the user has already reconnected before this session's heartbeat times out
if val, ok := s.pool.sessionsByUID.Load(s.UID()); ok {
if (val.(Session)).ID() == s.ID() {
Expand All @@ -497,9 +506,9 @@ func (s *sessionImpl) Close() {
for _, sub := range s.Subscriptions {
err := sub.Drain()
if err != nil {
logger.Log.Errorf("error unsubscribing to user's messages channel: %s, this can cause performance and leak issues", err.Error())
s.logger.Errorf("error unsubscribing to user's messages channel: %s, this can cause performance and leak issues", err.Error())
} else {
logger.Log.Debugf("successfully unsubscribed to user's %s messages channel", s.UID())
s.logger.Debug("successfully unsubscribed to user's messages channel")
}
}
}
Expand Down Expand Up @@ -830,6 +839,7 @@ func (s *sessionImpl) ValidateHandshake(data *HandshakeData) error {
}

func (s *sessionImpl) sendRequestToFront(ctx context.Context, route string, includeData bool) error {
log := s.logger.WithField("route", route)
sessionData := &protos.Session{
Id: s.frontendSessionID,
Uid: s.uid,
Expand All @@ -845,7 +855,7 @@ func (s *sessionImpl) sendRequestToFront(ctx context.Context, route string, incl
if err != nil {
return err
}
logger.Log.Debugf("%s Got response: %+v", route, res)
log.Debugf("Got response: %+v", res)
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,10 @@ func CtxWithDefaultLogger(ctx context.Context, route, userID string) context.Con
requestID := pcontext.GetFromPropagateCtx(ctx, constants.RequestIDKey)
if rID, ok := requestID.(string); ok {
if rID == "" {
requestID = nuid.New()
requestID = nuid.New().Next()
}
} else {
requestID = nuid.New()
requestID = nuid.New().Next()
}
defaultLogger := logger.Log.WithFields(
map[string]interface{}{
Expand Down