Skip to content

Commit

Permalink
Prevent log line reordering when streaming to web clients
Browse files Browse the repository at this point in the history
  • Loading branch information
hg committed Aug 17, 2024
1 parent 5afdc25 commit d4f589d
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 25 deletions.
25 changes: 16 additions & 9 deletions server/api/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/rs/zerolog/log"

"go.woodpecker-ci.org/woodpecker/v2/server"
"go.woodpecker-ci.org/woodpecker/v2/server/logging"
"go.woodpecker-ci.org/woodpecker/v2/server/model"
"go.woodpecker-ci.org/woodpecker/v2/server/pubsub"
"go.woodpecker-ci.org/woodpecker/v2/server/router/middleware/session"
Expand Down Expand Up @@ -213,17 +214,23 @@ func LogStreamSSE(c *gin.Context) {
}

go func() {
err := server.Config.Services.Logs.Tail(ctx, step.ID, func(entries ...*model.LogEntry) {
for _, entry := range entries {
select {
case <-ctx.Done():
return
default:
ee, _ := json.Marshal(entry)
logChan <- ee
batches := make(logging.LogChan, 100)

go func() {
for entries := range batches {
for _, entry := range entries {
select {
case <-ctx.Done():
return
default:
ee, _ := json.Marshal(entry)
logChan <- ee
}
}
}
})
}()

err := server.Config.Services.Logs.Tail(ctx, step.ID, batches)
if err != nil {
log.Error().Err(err).Msg("tail of logs failed")
}
Expand Down
2 changes: 1 addition & 1 deletion server/grpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ func (s *RPC) Log(c context.Context, stepUUID string, rpcLogEntries []*rpc.LogEn
// make sure writes to pubsub are non blocking (https://github.com/woodpecker-ci/woodpecker/blob/c919f32e0b6432a95e1a6d3d0ad662f591adf73f/server/logging/log.go#L9)
go func() {
// write line to listening web clients
if err := s.logger.Write(c, step.ID, logEntries...); err != nil {
if err := s.logger.Write(c, step.ID, logEntries); err != nil {
log.Error().Err(err).Msgf("rpc server could not write to logger")
}
}()
Expand Down
18 changes: 12 additions & 6 deletions server/logging/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"sync"

logger "github.com/rs/zerolog/log"
"go.woodpecker-ci.org/woodpecker/v2/server/model"
)

Expand All @@ -38,7 +39,7 @@ import (
// sub.start()... event loop

type subscriber struct {
handler Handler
receiver LogChan
}

type stream struct {
Expand Down Expand Up @@ -77,7 +78,7 @@ func (l *log) Open(_ context.Context, stepID int64) error {
return nil
}

func (l *log) Write(ctx context.Context, stepID int64, entries ...*model.LogEntry) error {
func (l *log) Write(ctx context.Context, stepID int64, entries []*model.LogEntry) error {
l.Lock()
s, ok := l.streams[stepID]
l.Unlock()
Expand All @@ -94,13 +95,18 @@ func (l *log) Write(ctx context.Context, stepID int64, entries ...*model.LogEntr
s.Lock()
s.list = append(s.list, entries...)
for sub := range s.subs {
go sub.handler(entries...)
select {
case sub.receiver <- entries:
default:
logger.Info().Msgf("subscriber channel is full -- dropping logs for step %d", stepID)
}
}
s.Unlock()

return nil
}

func (l *log) Tail(c context.Context, stepID int64, handler Handler) error {
func (l *log) Tail(c context.Context, stepID int64, receiver LogChan) error {
l.Lock()
s, ok := l.streams[stepID]
l.Unlock()
Expand All @@ -109,11 +115,11 @@ func (l *log) Tail(c context.Context, stepID int64, handler Handler) error {
}

sub := &subscriber{
handler: handler,
receiver: receiver,
}
s.Lock()
if len(s.list) != 0 {
sub.handler(s.list...)
sub.receiver <- s.list
}
s.subs[sub] = struct{}{}
s.Unlock()
Expand Down
19 changes: 14 additions & 5 deletions server/logging/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,28 +39,37 @@ func TestLogging(t *testing.T) {
context.Background(),
)

receiver := make(LogChan, 10)

go func() {
for {
<-receiver
wg.Done()
}
}()

logger := New()
assert.NoError(t, logger.Open(ctx, testStepID))
go func() {
assert.NoError(t, logger.Tail(ctx, testStepID, func(_ ...*model.LogEntry) { wg.Done() }))
assert.NoError(t, logger.Tail(ctx, testStepID, receiver))
}()
go func() {
assert.NoError(t, logger.Tail(ctx, testStepID, func(_ ...*model.LogEntry) { wg.Done() }))
assert.NoError(t, logger.Tail(ctx, testStepID, receiver))
}()

<-time.After(500 * time.Millisecond)

wg.Add(4)
go func() {
assert.NoError(t, logger.Write(ctx, testStepID, testEntry))
assert.NoError(t, logger.Write(ctx, testStepID, testEntry))
assert.NoError(t, logger.Write(ctx, testStepID, []*model.LogEntry{testEntry}))
assert.NoError(t, logger.Write(ctx, testStepID, []*model.LogEntry{testEntry}))
}()

wg.Wait()

wg.Add(1)
go func() {
assert.NoError(t, logger.Tail(ctx, testStepID, func(_ ...*model.LogEntry) { wg.Done() }))
assert.NoError(t, logger.Tail(ctx, testStepID, receiver))
}()

<-time.After(500 * time.Millisecond)
Expand Down
8 changes: 4 additions & 4 deletions server/logging/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,19 @@ import (
// ErrNotFound is returned when the log does not exist.
var ErrNotFound = errors.New("stream: not found")

// Handler defines a callback function for handling log entries.
type Handler func(...*model.LogEntry)
// LogChan defines a channel type for receiving ordered batches of log entries.
type LogChan chan []*model.LogEntry

// Log defines a log multiplexer.
type Log interface {
// Open opens the log.
Open(c context.Context, stepID int64) error

// Write writes the entry to the log.
Write(c context.Context, stepID int64, entries ...*model.LogEntry) error
Write(c context.Context, stepID int64, entries []*model.LogEntry) error

// Tail tails the log.
Tail(c context.Context, stepID int64, handler Handler) error
Tail(c context.Context, stepID int64, handler LogChan) error

// Close closes the log.
Close(c context.Context, stepID int64) error
Expand Down

0 comments on commit d4f589d

Please sign in to comment.