Skip to content

Commit 95e9c97

Browse files
committed
feat(logging): add support for log message notifications
- Introduced `LoggingLevel.ShouldSendTo` for log level comparison - Added `SendLogMessageToSpecificClient` to send log notifications to a specific client - Added `SendLogMessageToClient` to send log notifications to the current session client - Implemented `SessionWithLogging` interface for `streamableHttpSession` - Added `sessionLogLevelsStore` to manage per-session log levels - Updated `StreamableHTTPServer` to support setting and retrieving log levels - Added tests covering log notification sending logic and format validation
1 parent 2d479bb commit 95e9c97

File tree

5 files changed

+649
-68
lines changed

5 files changed

+649
-68
lines changed

mcp/types.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -761,6 +761,26 @@ const (
761761
LoggingLevelEmergency LoggingLevel = "emergency"
762762
)
763763

764+
var levelToInt = map[LoggingLevel]int{
765+
LoggingLevelDebug: 0,
766+
LoggingLevelInfo: 1,
767+
LoggingLevelNotice: 2,
768+
LoggingLevelWarning: 3,
769+
LoggingLevelError: 4,
770+
LoggingLevelCritical: 5,
771+
LoggingLevelAlert: 6,
772+
LoggingLevelEmergency: 7,
773+
}
774+
775+
func (l LoggingLevel) ShouldSendTo(minLevel LoggingLevel) bool {
776+
ia, oka := levelToInt[l]
777+
ib, okb := levelToInt[minLevel]
778+
if !oka || !okb {
779+
return false
780+
}
781+
return ia >= ib
782+
}
783+
764784
/* Sampling */
765785

766786
const (

server/session.go

Lines changed: 122 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -96,35 +96,38 @@ func (s *MCPServer) RegisterSession(
9696
return nil
9797
}
9898

99-
// UnregisterSession removes from storage session that is shut down.
100-
func (s *MCPServer) UnregisterSession(
101-
ctx context.Context,
102-
sessionID string,
103-
) {
104-
sessionValue, ok := s.sessions.LoadAndDelete(sessionID)
105-
if !ok {
106-
return
107-
}
108-
if session, ok := sessionValue.(ClientSession); ok {
109-
s.hooks.UnregisterSession(ctx, session)
110-
}
111-
}
112-
113-
// SendNotificationToAllClients sends a notification to all the currently active clients.
114-
func (s *MCPServer) SendNotificationToAllClients(
115-
method string,
116-
params map[string]any,
117-
) {
118-
notification := mcp.JSONRPCNotification{
99+
func (s *MCPServer) buildLogNotification(notification mcp.LoggingMessageNotification) mcp.JSONRPCNotification {
100+
return mcp.JSONRPCNotification{
119101
JSONRPC: mcp.JSONRPC_VERSION,
120102
Notification: mcp.Notification{
121-
Method: method,
103+
Method: notification.Method,
122104
Params: mcp.NotificationParams{
123-
AdditionalFields: params,
105+
AdditionalFields: map[string]any{
106+
"level": notification.Params.Level,
107+
"logger": notification.Params.Logger,
108+
"data": notification.Params.Data,
109+
},
124110
},
125111
},
126112
}
113+
}
127114

115+
func (s *MCPServer) SendLogMessageToClient(ctx context.Context, notification mcp.LoggingMessageNotification) error {
116+
session := ClientSessionFromContext(ctx)
117+
if session == nil || !session.Initialized() {
118+
return ErrNotificationNotInitialized
119+
}
120+
sessionLogging, ok := session.(SessionWithLogging)
121+
if !ok {
122+
return ErrSessionDoesNotSupportLogging
123+
}
124+
if !notification.Params.Level.ShouldSendTo(sessionLogging.GetLogLevel()) {
125+
return nil
126+
}
127+
return s.sendNotificationCore(ctx, session, s.buildLogNotification(notification))
128+
}
129+
130+
func (s *MCPServer) sendNotificationToAllClients(notification mcp.JSONRPCNotification) {
128131
s.sessions.Range(func(k, v any) bool {
129132
if session, ok := v.(ClientSession); ok && session.Initialized() {
130133
select {
@@ -140,7 +143,7 @@ func (s *MCPServer) SendNotificationToAllClients(
140143
ctx := context.Background()
141144
// Use the error hook to report the blocked channel
142145
hooks.onError(ctx, nil, "notification", map[string]any{
143-
"method": method,
146+
"method": notification.Method,
144147
"sessionID": sessionID,
145148
}, fmt.Errorf("notification channel blocked for session %s: %w", sessionID, err))
146149
}(session.SessionID(), hooks)
@@ -151,22 +154,71 @@ func (s *MCPServer) SendNotificationToAllClients(
151154
})
152155
}
153156

154-
// SendNotificationToClient sends a notification to the current client
155-
func (s *MCPServer) SendNotificationToClient(
156-
ctx context.Context,
157-
method string,
158-
params map[string]any,
159-
) error {
160-
session := ClientSessionFromContext(ctx)
161-
if session == nil || !session.Initialized() {
162-
return ErrNotificationNotInitialized
163-
}
164-
157+
func (s *MCPServer) sendNotificationToSpecificClient(session ClientSession, notification mcp.JSONRPCNotification) error {
165158
// upgrades the client-server communication to SSE stream when the server sends notifications to the client
166159
if sessionWithStreamableHTTPConfig, ok := session.(SessionWithStreamableHTTPConfig); ok {
167160
sessionWithStreamableHTTPConfig.UpgradeToSSEWhenReceiveNotification()
168161
}
162+
select {
163+
case session.NotificationChannel() <- notification:
164+
return nil
165+
default:
166+
// Channel is blocked, if there's an error hook, use it
167+
if s.hooks != nil && len(s.hooks.OnError) > 0 {
168+
err := ErrNotificationChannelBlocked
169+
ctx := context.Background()
170+
// Copy hooks pointer to local variable to avoid race condition
171+
hooks := s.hooks
172+
go func(sID string, hooks *Hooks) {
173+
// Use the error hook to report the blocked channel
174+
hooks.onError(ctx, nil, "notification", map[string]any{
175+
"method": notification.Method,
176+
"sessionID": sID,
177+
}, fmt.Errorf("notification channel blocked for session %s: %w", sID, err))
178+
}(session.SessionID(), hooks)
179+
}
180+
return ErrNotificationChannelBlocked
181+
}
182+
}
183+
184+
func (s *MCPServer) SendLogMessageToSpecificClient(sessionID string, notification mcp.LoggingMessageNotification) error {
185+
sessionValue, ok := s.sessions.Load(sessionID)
186+
if !ok {
187+
return ErrSessionNotFound
188+
}
189+
session, ok := sessionValue.(ClientSession)
190+
if !ok || !session.Initialized() {
191+
return ErrSessionNotInitialized
192+
}
193+
sessionLogging, ok := session.(SessionWithLogging)
194+
if !ok {
195+
return ErrSessionDoesNotSupportLogging
196+
}
197+
if !notification.Params.Level.ShouldSendTo(sessionLogging.GetLogLevel()) {
198+
return nil
199+
}
200+
return s.sendNotificationToSpecificClient(session, s.buildLogNotification(notification))
201+
}
202+
203+
// UnregisterSession removes from storage session that is shut down.
204+
func (s *MCPServer) UnregisterSession(
205+
ctx context.Context,
206+
sessionID string,
207+
) {
208+
sessionValue, ok := s.sessions.LoadAndDelete(sessionID)
209+
if !ok {
210+
return
211+
}
212+
if session, ok := sessionValue.(ClientSession); ok {
213+
s.hooks.UnregisterSession(ctx, session)
214+
}
215+
}
169216

217+
// SendNotificationToAllClients sends a notification to all the currently active clients.
218+
func (s *MCPServer) SendNotificationToAllClients(
219+
method string,
220+
params map[string]any,
221+
) {
170222
notification := mcp.JSONRPCNotification{
171223
JSONRPC: mcp.JSONRPC_VERSION,
172224
Notification: mcp.Notification{
@@ -176,13 +228,26 @@ func (s *MCPServer) SendNotificationToClient(
176228
},
177229
},
178230
}
231+
s.sendNotificationToAllClients(notification)
232+
}
179233

234+
// SendNotificationToClient sends a notification to the current client
235+
func (s *MCPServer) sendNotificationCore(
236+
ctx context.Context,
237+
session ClientSession,
238+
notification mcp.JSONRPCNotification,
239+
) error {
240+
// upgrades the client-server communication to SSE stream when the server sends notifications to the client
241+
if sessionWithStreamableHTTPConfig, ok := session.(SessionWithStreamableHTTPConfig); ok {
242+
sessionWithStreamableHTTPConfig.UpgradeToSSEWhenReceiveNotification()
243+
}
180244
select {
181245
case session.NotificationChannel() <- notification:
182246
return nil
183247
default:
184248
// Channel is blocked, if there's an error hook, use it
185249
if s.hooks != nil && len(s.hooks.OnError) > 0 {
250+
method := notification.Notification.Method
186251
err := ErrNotificationChannelBlocked
187252
// Copy hooks pointer to local variable to avoid race condition
188253
hooks := s.hooks
@@ -198,6 +263,28 @@ func (s *MCPServer) SendNotificationToClient(
198263
}
199264
}
200265

266+
// SendNotificationToClient sends a notification to the current client
267+
func (s *MCPServer) SendNotificationToClient(
268+
ctx context.Context,
269+
method string,
270+
params map[string]any,
271+
) error {
272+
session := ClientSessionFromContext(ctx)
273+
if session == nil || !session.Initialized() {
274+
return ErrNotificationNotInitialized
275+
}
276+
notification := mcp.JSONRPCNotification{
277+
JSONRPC: mcp.JSONRPC_VERSION,
278+
Notification: mcp.Notification{
279+
Method: method,
280+
Params: mcp.NotificationParams{
281+
AdditionalFields: params,
282+
},
283+
},
284+
}
285+
return s.sendNotificationCore(ctx, session, notification)
286+
}
287+
201288
// SendNotificationToSpecificClient sends a notification to a specific client by session ID
202289
func (s *MCPServer) SendNotificationToSpecificClient(
203290
sessionID string,
@@ -208,17 +295,10 @@ func (s *MCPServer) SendNotificationToSpecificClient(
208295
if !ok {
209296
return ErrSessionNotFound
210297
}
211-
212298
session, ok := sessionValue.(ClientSession)
213299
if !ok || !session.Initialized() {
214300
return ErrSessionNotInitialized
215301
}
216-
217-
// upgrades the client-server communication to SSE stream when the server sends notifications to the client
218-
if sessionWithStreamableHTTPConfig, ok := session.(SessionWithStreamableHTTPConfig); ok {
219-
sessionWithStreamableHTTPConfig.UpgradeToSSEWhenReceiveNotification()
220-
}
221-
222302
notification := mcp.JSONRPCNotification{
223303
JSONRPC: mcp.JSONRPC_VERSION,
224304
Notification: mcp.Notification{
@@ -228,27 +308,7 @@ func (s *MCPServer) SendNotificationToSpecificClient(
228308
},
229309
},
230310
}
231-
232-
select {
233-
case session.NotificationChannel() <- notification:
234-
return nil
235-
default:
236-
// Channel is blocked, if there's an error hook, use it
237-
if s.hooks != nil && len(s.hooks.OnError) > 0 {
238-
err := ErrNotificationChannelBlocked
239-
ctx := context.Background()
240-
// Copy hooks pointer to local variable to avoid race condition
241-
hooks := s.hooks
242-
go func(sID string, hooks *Hooks) {
243-
// Use the error hook to report the blocked channel
244-
hooks.onError(ctx, nil, "notification", map[string]any{
245-
"method": method,
246-
"sessionID": sID,
247-
}, fmt.Errorf("notification channel blocked for session %s: %w", sID, err))
248-
}(sessionID, hooks)
249-
}
250-
return ErrNotificationChannelBlocked
251-
}
311+
return s.sendNotificationToSpecificClient(session, notification)
252312
}
253313

254314
// AddSessionTool adds a tool for a specific session

0 commit comments

Comments
 (0)