Skip to content

Commit

Permalink
Issue #191: Fix logs channel race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
F1bonacc1 committed Jul 25, 2024
1 parent f5d9f06 commit bd0da22
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 32 deletions.
4 changes: 4 additions & 0 deletions issues/issue_191/process-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
version: "0.5"
processes:
noisy:
command: while true; do date; done
34 changes: 23 additions & 11 deletions src/api/ws_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/rs/zerolog/log"
"net/http"
"strconv"
"sync"
)

var upgrader = websocket.Upgrader{}
Expand All @@ -27,23 +28,34 @@ func (api *PcApi) HandleLogsStream(c *gin.Context) {

done := make(chan struct{})
logChan := make(chan LogMessage, 256)
connector := pclog.NewConnector(func(messages []string) {
for _, message := range messages {
msg := LogMessage{
Message: message,
ProcessName: procName,
chanCloseMtx := &sync.Mutex{}
isChannelClosed := false
connector := pclog.NewConnector(
func(messages []string) {
for _, message := range messages {
msg := LogMessage{
Message: message,
ProcessName: procName,
}
logChan <- msg
}
logChan <- msg
}
if !follow {
close(logChan)
}
},
if !follow {
chanCloseMtx.Lock()
defer chanCloseMtx.Unlock()
close(logChan)
isChannelClosed = true
}
},
func(message string) (n int, err error) {
msg := LogMessage{
Message: message,
ProcessName: procName,
}
chanCloseMtx.Lock()
defer chanCloseMtx.Unlock()
if isChannelClosed {
return 0, nil
}
logChan <- msg
return len(message), nil
},
Expand Down
3 changes: 2 additions & 1 deletion src/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ func (p *PcClient) GetLogLength() int {
}

func (p *PcClient) GetLogsAndSubscribe(name string, observer pclog.LogObserver) error {
return p.logger.ReadProcessLogs(name, p.logLength, true, observer)
_, err := p.logger.ReadProcessLogs(name, p.logLength, true, observer)
return err
}

func (p *PcClient) UnSubscribeLogger(name string, observer pclog.LogObserver) error {
Expand Down
22 changes: 4 additions & 18 deletions src/client/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func NewLogClient(address, socketPath string) *LogClient {
}
}

func (l *LogClient) ReadProcessLogs(name string, offset int, follow bool, out io.StringWriter) (err error) {
func (l *LogClient) ReadProcessLogs(name string, offset int, follow bool, out io.StringWriter) (done chan struct{}, err error) {

url := fmt.Sprintf("ws://%s/process/logs/ws?name=%s&offset=%d&follow=%v", l.address, name, offset, follow)
log.Info().Msgf("Connecting to %s", url)
Expand All @@ -42,28 +42,14 @@ func (l *LogClient) ReadProcessLogs(name string, offset int, follow bool, out io

if err != nil {
log.Error().Msgf("failed to dial to %s error: %v", url, err)
return err
return done, err
}
//defer l.ws.Close()
done := make(chan struct{})
done = make(chan struct{})

go l.readLogs(done, l.ws, follow, out)

/*for {
select {
case <-done:
return nil
case <-interrupt:
fmt.Println("interrupt")
select {
case <-done:
case <-time.After(time.Second):
}
return nil
}
}*/
return nil
return done, nil
}

// CloseChannel Cleanly close the connection by sending a close message and then
Expand Down
6 changes: 4 additions & 2 deletions src/cmd/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@ var logsCmd = &cobra.Command{
name := args[0]

logger := getLogClient()
err := logger.ReadProcessLogs(name, *pcFlags.LogTailLength, *pcFlags.LogFollow, os.Stdout)
done, err := logger.ReadProcessLogs(name, *pcFlags.LogTailLength, *pcFlags.LogFollow, os.Stdout)
if err != nil {
log.Fatal().Err(err).Msgf("Failed to fetch logs for process %s", name)
}
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
select {
case <-interrupt:
fmt.Println("interrupt")
_ = logger.CloseChannel()
time.Sleep(time.Second)
case <-done:
_ = logger.CloseChannel()
time.Sleep(time.Second)
}
Expand Down

0 comments on commit bd0da22

Please sign in to comment.