Skip to content

Commit

Permalink
Process workflow logs in batches (#4045) (#4356)
Browse files Browse the repository at this point in the history
Co-authored-by: hg <k@isakov.net>
  • Loading branch information
6543 and hg authored Nov 13, 2024
1 parent b68b038 commit cbe74ec
Show file tree
Hide file tree
Showing 21 changed files with 358 additions and 208 deletions.
92 changes: 79 additions & 13 deletions agent/rpc/client_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,29 +25,44 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
grpcproto "google.golang.org/protobuf/proto"

backend "go.woodpecker-ci.org/woodpecker/v2/pipeline/backend/types"
"go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc"
"go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc/proto"
)

// Set grpc version on compile time to compare against server version response.
const ClientGrpcVersion int32 = proto.Version
const (
// Set grpc version on compile time to compare against server version response.
ClientGrpcVersion int32 = proto.Version

// Maximum size of an outgoing log message.
// Picked to prevent it from going over GRPC size limit (4 MiB) with a large safety margin.
maxLogBatchSize int = 1 * 1024 * 1024

// Maximum amount of time between sending consecutive batched log messages.
// Controls the delay between the CI job generating a log record, and web users receiving it.
maxLogFlushPeriod time.Duration = time.Second
)

type client struct {
client proto.WoodpeckerClient
conn *grpc.ClientConn
logs chan *proto.LogEntry
}

// NewGrpcClient returns a new grpc Client.
func NewGrpcClient(conn *grpc.ClientConn) rpc.Peer {
func NewGrpcClient(ctx context.Context, conn *grpc.ClientConn) rpc.Peer {
client := new(client)
client.client = proto.NewWoodpeckerClient(conn)
client.conn = conn
client.logs = make(chan *proto.LogEntry, 10) // max memory use: 10 lines * 1 MiB
go client.processLogs(ctx)
return client
}

func (c *client) Close() error {
close(c.logs)
return c.conn.Close()
}

Expand Down Expand Up @@ -367,18 +382,69 @@ func (c *client) Update(ctx context.Context, id string, state rpc.StepState) (er
return nil
}

// Log writes the workflow log entry.
func (c *client) Log(ctx context.Context, logEntry *rpc.LogEntry) (err error) {
// EnqueueLog queues the log entry to be written in a batch later.
func (c *client) EnqueueLog(logEntry *rpc.LogEntry) {
c.logs <- &proto.LogEntry{
StepUuid: logEntry.StepUUID,
Data: logEntry.Data,
Line: int32(logEntry.Line),
Time: logEntry.Time,
Type: int32(logEntry.Type),
}
}

func (c *client) processLogs(ctx context.Context) {
var entries []*proto.LogEntry
var bytes int

send := func() {
if len(entries) == 0 {
return
}

log.Debug().
Int("entries", len(entries)).
Int("bytes", bytes).
Msg("log drain: sending queued logs")

if err := c.sendLogs(ctx, entries); err != nil {
log.Error().Err(err).Msg("log drain: could not send logs to server")
}

// even if send failed, we don't have infinite memory; retry has already been used
entries = entries[:0]
bytes = 0
}

// ctx.Done() is covered by the log channel being closed
for {
select {
case entry, ok := <-c.logs:
if !ok {
log.Info().Msg("log drain: channel closed")
send()
return
}

entries = append(entries, entry)
bytes += grpcproto.Size(entry) // cspell:words grpcproto

if bytes >= maxLogBatchSize {
send()
}

case <-time.After(maxLogFlushPeriod):
send()
}
}
}

func (c *client) sendLogs(ctx context.Context, entries []*proto.LogEntry) error {
req := &proto.LogRequest{LogEntries: entries}
retry := c.newBackOff()
req := new(proto.LogRequest)
req.LogEntry = new(proto.LogEntry)
req.LogEntry.StepUuid = logEntry.StepUUID
req.LogEntry.Data = logEntry.Data
req.LogEntry.Line = int32(logEntry.Line)
req.LogEntry.Time = logEntry.Time
req.LogEntry.Type = int32(logEntry.Type)

for {
_, err = c.client.Log(ctx, req)
_, err := c.client.Log(ctx, req)
if err == nil {
break
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/agent/core/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func run(ctx context.Context, c *cli.Command, backends []types.Backend) error {
}
defer conn.Close()

client := agent_rpc.NewGrpcClient(conn)
client := agent_rpc.NewGrpcClient(ctx, conn)
agentConfigPersisted := atomic.Bool{}

grpcCtx := metadata.NewOutgoingContext(grpcClientCtx, metadata.Pairs("hostname", hostname))
Expand Down
6 changes: 1 addition & 5 deletions pipeline/log/line_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package log

import (
"context"
"io"
"strings"
"sync"
Expand Down Expand Up @@ -67,9 +66,6 @@ func (w *LineWriter) Write(p []byte) (n int, err error) {

w.num++

if err := w.peer.Log(context.Background(), line); err != nil {
return 0, err
}

w.peer.EnqueueLog(line)
return len(data), nil
}
6 changes: 3 additions & 3 deletions pipeline/log/line_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

func TestLineWriter(t *testing.T) {
peer := mocks.NewPeer(t)
peer.On("Log", mock.Anything, mock.Anything).Return(nil)
peer.On("EnqueueLog", mock.Anything)

secrets := []string{"world"}
lw := log.NewLineWriter(peer, "e9ea76a5-44a1-4059-9c4a-6956c478b26d", secrets...)
Expand All @@ -37,15 +37,15 @@ func TestLineWriter(t *testing.T) {
_, err = lw.Write([]byte("the previous line had no newline at the end"))
assert.NoError(t, err)

peer.AssertCalled(t, "Log", mock.Anything, &rpc.LogEntry{
peer.AssertCalled(t, "EnqueueLog", &rpc.LogEntry{
StepUUID: "e9ea76a5-44a1-4059-9c4a-6956c478b26d",
Time: 0,
Type: rpc.LogEntryStdout,
Line: 0,
Data: []byte("hello ********"),
})

peer.AssertCalled(t, "Log", mock.Anything, &rpc.LogEntry{
peer.AssertCalled(t, "EnqueueLog", &rpc.LogEntry{
StepUUID: "e9ea76a5-44a1-4059-9c4a-6956c478b26d",
Time: 0,
Type: rpc.LogEntryStdout,
Expand Down
23 changes: 5 additions & 18 deletions pipeline/rpc/mocks/peer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pipeline/rpc/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ type Peer interface {
// Update updates the step state
Update(c context.Context, workflowID string, state StepState) error

// Log writes the step log entry
Log(c context.Context, logEntry *LogEntry) error
// EnqueueLog queues the step log entry for delayed sending
EnqueueLog(logEntry *LogEntry)

// RegisterAgent register our agent to the server
RegisterAgent(ctx context.Context, platform, backend, version string, capacity int) (int64, error)
Expand Down
2 changes: 1 addition & 1 deletion pipeline/rpc/proto/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ package proto

// Version is the version of the woodpecker.proto file,
// IMPORTANT: increased by 1 each time it get changed.
const Version int32 = 9
const Version int32 = 10
Loading

0 comments on commit cbe74ec

Please sign in to comment.