Skip to content

Commit

Permalink
Send agent → server logs in batches (#3999)
Browse files Browse the repository at this point in the history
  • Loading branch information
hg committed Aug 16, 2024
1 parent 405d089 commit 7f36b3a
Show file tree
Hide file tree
Showing 11 changed files with 243 additions and 177 deletions.
77 changes: 66 additions & 11 deletions agent/rpc/client_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,21 @@ const ClientGrpcVersion int32 = proto.Version
type client struct {
client proto.WoodpeckerClient
conn *grpc.ClientConn
logs chan *proto.LogEntry
}

// NewGrpcClient returns a new grpc Client.
func NewGrpcClient(conn *grpc.ClientConn) rpc.Peer {
func NewGrpcClient(ctx context.Context, conn *grpc.ClientConn) rpc.Peer {
client := new(client)
client.client = proto.NewWoodpeckerClient(conn)
client.conn = conn
client.logs = make(chan *proto.LogEntry, 10) // max memory use: 10 lines * 1 MiB
go client.processLogs(ctx)
return client
}

func (c *client) Close() error {
close(c.logs)
return c.conn.Close()
}

Expand Down Expand Up @@ -367,18 +371,69 @@ func (c *client) Update(ctx context.Context, id string, state rpc.StepState) (er
return nil
}

// Log writes the workflow log entry.
func (c *client) Log(ctx context.Context, logEntry *rpc.LogEntry) (err error) {
// EnqueueLog queues the log entry to be written in a batch later.
func (c *client) EnqueueLog(logEntry *rpc.LogEntry) {
c.logs <- &proto.LogEntry{
StepUuid: logEntry.StepUUID,
Data: logEntry.Data,
Line: int32(logEntry.Line),
Time: logEntry.Time,
Type: int32(logEntry.Type),
}
}

func (c *client) processLogs(ctx context.Context) {
var entries []*proto.LogEntry
var bytes int

send := func() {
if len(entries) == 0 {
return
}

log.Debug().
Int("entries", len(entries)).
Int("bytes", bytes).
Msg("log drain: sending queued logs")

if err := c.sendLogs(ctx, entries); err != nil {
log.Error().Err(err).Msg("log drain: could not send logs to server")
}

// even if send failed, we don't have infinite memory; retry has already been used
entries = entries[:0]
bytes = 0
}

// ctx.Done() is covered by the log channel being closed
for {
select {
case entry, ok := <-c.logs:
if !ok {
log.Info().Msg("log drain: channel closed")
send()
return
}

entries = append(entries, entry)
bytes += len(entry.Data)

if bytes >= 256*1024 { // 256 KiB; picked to prevent GRPC message size overflow
send()
}

case <-time.After(time.Second):
send()
}
}
}

func (c *client) sendLogs(ctx context.Context, entries []*proto.LogEntry) error {
req := &proto.LogRequest{LogEntries: entries}
retry := c.newBackOff()
req := new(proto.LogRequest)
req.LogEntry = new(proto.LogEntry)
req.LogEntry.StepUuid = logEntry.StepUUID
req.LogEntry.Data = logEntry.Data
req.LogEntry.Line = int32(logEntry.Line)
req.LogEntry.Time = logEntry.Time
req.LogEntry.Type = int32(logEntry.Type)

for {
_, err = c.client.Log(ctx, req)
_, err := c.client.Log(ctx, req)
if err == nil {
break
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/agent/core/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func run(ctx context.Context, c *cli.Command, backends []types.Backend) error {
}
defer conn.Close()

client := agent_rpc.NewGrpcClient(conn)
client := agent_rpc.NewGrpcClient(ctx, conn)
agentConfigPersisted := atomic.Bool{}

grpcCtx := metadata.NewOutgoingContext(grpcClientCtx, metadata.Pairs("hostname", hostname))
Expand Down
6 changes: 1 addition & 5 deletions pipeline/log/line_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package log

import (
"context"
"io"
"strings"
"sync"
Expand Down Expand Up @@ -67,9 +66,6 @@ func (w *LineWriter) Write(p []byte) (n int, err error) {

w.num++

if err := w.peer.Log(context.Background(), line); err != nil {
return 0, err
}

w.peer.EnqueueLog(line)
return len(data), nil
}
6 changes: 3 additions & 3 deletions pipeline/log/line_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

func TestLineWriter(t *testing.T) {
peer := mocks.NewPeer(t)
peer.On("Log", mock.Anything, mock.Anything).Return(nil)
peer.On("EnqueueLog", mock.Anything)

secrets := []string{"world"}
lw := log.NewLineWriter(peer, "e9ea76a5-44a1-4059-9c4a-6956c478b26d", secrets...)
Expand All @@ -37,15 +37,15 @@ func TestLineWriter(t *testing.T) {
_, err = lw.Write([]byte("the previous line had no newline at the end"))
assert.NoError(t, err)

peer.AssertCalled(t, "Log", mock.Anything, &rpc.LogEntry{
peer.AssertCalled(t, "EnqueueLog", &rpc.LogEntry{
StepUUID: "e9ea76a5-44a1-4059-9c4a-6956c478b26d",
Time: 0,
Type: rpc.LogEntryStdout,
Line: 0,
Data: []byte("hello ********"),
})

peer.AssertCalled(t, "Log", mock.Anything, &rpc.LogEntry{
peer.AssertCalled(t, "EnqueueLog", &rpc.LogEntry{
StepUUID: "e9ea76a5-44a1-4059-9c4a-6956c478b26d",
Time: 0,
Type: rpc.LogEntryStdout,
Expand Down
23 changes: 5 additions & 18 deletions pipeline/rpc/mocks/peer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pipeline/rpc/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ type Peer interface {
// Update updates the step state
Update(c context.Context, workflowID string, state StepState) error

// Log writes the step log entry
Log(c context.Context, logEntry *LogEntry) error
// EnqueueLog queues the step log entry for delayed sending
EnqueueLog(logEntry *LogEntry)

// RegisterAgent register our agent to the server
RegisterAgent(ctx context.Context, platform, backend, version string, capacity int) (int64, error)
Expand Down
2 changes: 1 addition & 1 deletion pipeline/rpc/proto/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ package proto

// Version is the version of the woodpecker.proto file,
// IMPORTANT: increased by 1 each time it get changed.
const Version int32 = 9
const Version int32 = 10
Loading

0 comments on commit 7f36b3a

Please sign in to comment.