Skip to content

Commit 2ea0c97

Browse files
refactor(stdio): improve stdio server message handling (#73)
- Add context-aware message reading with cancellation support - Implement graceful EOF handling in input stream processing - Add comprehensive error handling and logging - Improve notification handling with dedicated goroutine Co-authored-by: winter_wang <winter_wang@trendmicro.com>
1 parent ec9e8a2 commit 2ea0c97

File tree

1 file changed

+81
-58
lines changed

1 file changed

+81
-58
lines changed

server/stdio.go

Lines changed: 81 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,85 @@ func (s *StdioServer) SetContextFunc(fn StdioContextFunc) {
103103
s.contextFunc = fn
104104
}
105105

106+
// handleNotifications continuously processes notifications from the session's notification channel
107+
// and writes them to the provided output. It runs until the context is cancelled.
108+
// Any errors encountered while writing notifications are logged but do not stop the handler.
109+
func (s *StdioServer) handleNotifications(ctx context.Context, stdout io.Writer) {
110+
for {
111+
select {
112+
case notification := <-stdioSessionInstance.notifications:
113+
if err := s.writeResponse(notification, stdout); err != nil {
114+
s.errLogger.Printf("Error writing notification: %v", err)
115+
}
116+
case <-ctx.Done():
117+
return
118+
}
119+
}
120+
}
121+
122+
// processInputStream continuously reads and processes messages from the input stream.
123+
// It handles EOF gracefully as a normal termination condition.
124+
// The function returns when either:
125+
// - The context is cancelled (returns context.Err())
126+
// - EOF is encountered (returns nil)
127+
// - An error occurs while reading or processing messages (returns the error)
128+
func (s *StdioServer) processInputStream(ctx context.Context, reader *bufio.Reader, stdout io.Writer) error {
129+
for {
130+
if err := ctx.Err(); err != nil {
131+
return err
132+
}
133+
134+
line, err := s.readNextLine(ctx, reader)
135+
if err != nil {
136+
if err == io.EOF {
137+
return nil
138+
}
139+
s.errLogger.Printf("Error reading input: %v", err)
140+
return err
141+
}
142+
143+
if err := s.processMessage(ctx, line, stdout); err != nil {
144+
if err == io.EOF {
145+
return nil
146+
}
147+
s.errLogger.Printf("Error handling message: %v", err)
148+
return err
149+
}
150+
}
151+
}
152+
153+
// readNextLine reads a single line from the input reader in a context-aware manner.
154+
// It uses channels to make the read operation cancellable via context.
155+
// Returns the read line and any error encountered. If the context is cancelled,
156+
// returns an empty string and the context's error. EOF is returned when the input
157+
// stream is closed.
158+
func (s *StdioServer) readNextLine(ctx context.Context, reader *bufio.Reader) (string, error) {
159+
readChan := make(chan string, 1)
160+
errChan := make(chan error, 1)
161+
defer func() {
162+
close(readChan)
163+
close(errChan)
164+
}()
165+
166+
go func() {
167+
line, err := reader.ReadString('\n')
168+
if err != nil {
169+
errChan <- err
170+
return
171+
}
172+
readChan <- line
173+
}()
174+
175+
select {
176+
case <-ctx.Done():
177+
return "", ctx.Err()
178+
case err := <-errChan:
179+
return "", err
180+
case line := <-readChan:
181+
return line, nil
182+
}
183+
}
184+
106185
// Listen starts listening for JSON-RPC messages on the provided input and writes responses to the provided output.
107186
// It runs until the context is cancelled or an error occurs.
108187
// Returns an error if there are issues with reading input or writing output.
@@ -126,64 +205,8 @@ func (s *StdioServer) Listen(
126205
reader := bufio.NewReader(stdin)
127206

128207
// Start notification handler
129-
go func() {
130-
for {
131-
select {
132-
case notification := <-stdioSessionInstance.notifications:
133-
err := s.writeResponse(
134-
notification,
135-
stdout,
136-
)
137-
if err != nil {
138-
s.errLogger.Printf(
139-
"Error writing notification: %v",
140-
err,
141-
)
142-
}
143-
case <-ctx.Done():
144-
return
145-
}
146-
}
147-
}()
148-
149-
for {
150-
select {
151-
case <-ctx.Done():
152-
return ctx.Err()
153-
default:
154-
// Use a goroutine to make the read cancellable
155-
readChan := make(chan string, 1)
156-
errChan := make(chan error, 1)
157-
158-
go func() {
159-
line, err := reader.ReadString('\n')
160-
if err != nil {
161-
errChan <- err
162-
return
163-
}
164-
readChan <- line
165-
}()
166-
167-
select {
168-
case <-ctx.Done():
169-
return ctx.Err()
170-
case err := <-errChan:
171-
if err == io.EOF {
172-
return nil
173-
}
174-
s.errLogger.Printf("Error reading input: %v", err)
175-
return err
176-
case line := <-readChan:
177-
if err := s.processMessage(ctx, line, stdout); err != nil {
178-
if err == io.EOF {
179-
return nil
180-
}
181-
s.errLogger.Printf("Error handling message: %v", err)
182-
return err
183-
}
184-
}
185-
}
186-
}
208+
go s.handleNotifications(ctx, stdout)
209+
return s.processInputStream(ctx, reader, stdout)
187210
}
188211

189212
// processMessage handles a single JSON-RPC message and writes the response.

0 commit comments

Comments
 (0)