Skip to content

Commit

Permalink
fix: improve eventlogger.GeneratePlainTextFile() performance for big …
Browse files Browse the repository at this point in the history
…files (#248)
  • Loading branch information
lucaspin authored Aug 8, 2024
1 parent d0c0c6c commit 58209f7
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 64 deletions.
2 changes: 2 additions & 0 deletions pkg/eventlogger/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ type Backend interface {
Open() error
Write(interface{}) error
Read(startFrom, maxLines int, writer io.Writer) (int, error)
Iterate(fn func(event []byte) error) error
Close() error
CloseWithOptions(CloseOptions) error
}
Expand All @@ -15,4 +16,5 @@ type CloseOptions struct {
}

var _ Backend = (*FileBackend)(nil)
var _ Backend = (*HTTPBackend)(nil)
var _ Backend = (*InMemoryBackend)(nil)
22 changes: 22 additions & 0 deletions pkg/eventlogger/filebackend.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,28 @@ func (l *FileBackend) CloseWithOptions(options CloseOptions) error {
return nil
}

func (l *FileBackend) Iterate(fn func([]byte) error) error {
fd, err := os.OpenFile(l.path, os.O_RDONLY, os.ModePerm)
if err != nil {
return fmt.Errorf("error opening file '%s': %v", l.path, err)
}

defer fd.Close()

scanner := bufio.NewScanner(fd)
for scanner.Scan() {
if len(scanner.Bytes()) == 0 {
continue
}

if err := fn(scanner.Bytes()); err != nil {
return fmt.Errorf("error processing event: %v", err)
}
}

return scanner.Err()
}

func (l *FileBackend) Read(startingLineNumber, maxLines int, writer io.Writer) (int, error) {
fd, err := os.OpenFile(l.path, os.O_RDONLY, os.ModePerm)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions pkg/eventlogger/httpbackend.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ func (l *HTTPBackend) Read(startFrom, maxLines int, writer io.Writer) (int, erro
return l.fileBackend.Read(startFrom, maxLines, writer)
}

func (l *HTTPBackend) Iterate(fn func([]byte) error) error {
return l.fileBackend.Iterate(fn)
}

func (l *HTTPBackend) push() {
log.Infof("Logs will be pushed to %s", l.config.URL)

Expand Down
10 changes: 8 additions & 2 deletions pkg/eventlogger/httpbackend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,15 +278,15 @@ func Test__TokenIsRefreshed(t *testing.T) {
mockServer.Close()
}

func generateLogEvents(t *testing.T, outputEventsCount int, backend Backend) {
func generateLogEventsWithOutputGenerator(t testing.TB, outputEventsCount int, backend Backend, outputGenerator func() string) {
timestamp := int(time.Now().Unix())

assert.Nil(t, backend.Write(&JobStartedEvent{Timestamp: timestamp, Event: "job_started"}))
assert.Nil(t, backend.Write(&CommandStartedEvent{Timestamp: timestamp, Event: "cmd_started", Directive: "echo hello"}))

count := outputEventsCount
for count > 0 {
assert.Nil(t, backend.Write(&CommandOutputEvent{Timestamp: timestamp, Event: "cmd_output", Output: "hello\n"}))
assert.Nil(t, backend.Write(&CommandOutputEvent{Timestamp: timestamp, Event: "cmd_output", Output: outputGenerator() + "\n"}))
count--
}

Expand All @@ -301,3 +301,9 @@ func generateLogEvents(t *testing.T, outputEventsCount int, backend Backend) {

assert.Nil(t, backend.Write(&JobFinishedEvent{Timestamp: timestamp, Event: "job_finished", Result: "passed"}))
}

func generateLogEvents(t testing.TB, outputEventsCount int, backend Backend) {
generateLogEventsWithOutputGenerator(t, outputEventsCount, backend, func() string {
return "hello"
})
}
4 changes: 4 additions & 0 deletions pkg/eventlogger/inmemorybackend.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ func (l *InMemoryBackend) Open() error {
return nil
}

func (l *InMemoryBackend) Iterate(fn func([]byte) error) error {
return nil
}

func (l *InMemoryBackend) Read(startFrom, maxLines int, writer io.Writer) (int, error) {
return 0, nil
}
Expand Down
94 changes: 37 additions & 57 deletions pkg/eventlogger/logger.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package eventlogger

import (
"bytes"
"bufio"
"encoding/json"
"io/ioutil"
"strings"
"fmt"
"os"
"time"

log "github.com/sirupsen/logrus"
Expand All @@ -30,76 +30,56 @@ func (l *Logger) CloseWithOptions(options CloseOptions) error {
return l.Backend.CloseWithOptions(options)
}

/*
* Convert the JSON logs file into a plain text one.
* Note: the caller must delete the generated plain text file after it's done with it.
*/
func (l *Logger) GeneratePlainTextFile() (string, error) {
tmpFile, err := ioutil.TempFile("", "*.txt")
func (l *Logger) GeneratePlainTextFileIn(directory string) (string, error) {
tmpFile, err := os.CreateTemp(directory, "*.txt")
if err != nil {
return "", err
return "", fmt.Errorf("error creating plain text file: %v", err)
}

defer tmpFile.Close()

/*
* Since we are only doing this for possibly very big files,
* we read/write things in chunks to avoid keeping a lot of things in memory.
*/
startFrom := 0
var buf bytes.Buffer
for {
nextStartFrom, err := l.Backend.Read(startFrom, 20000, &buf)
if err != nil {
return "", err
}

if nextStartFrom == startFrom {
break
}

startFrom = nextStartFrom
logEvents := strings.Split(buf.String(), "\n")
logs, err := l.eventsToPlainLogLines(logEvents)
bufferedWriter := bufio.NewWriterSize(tmpFile, 64*1024)
err = l.Backend.Iterate(func(event []byte) error {
var object map[string]interface{}
err := json.Unmarshal(event, &object)
if err != nil {
return "", err
}

newLines := []byte(strings.Join(logs, ""))
err = ioutil.WriteFile(tmpFile.Name(), newLines, 0600)
if err != nil {
return "", err
}
}

return tmpFile.Name(), nil
}

func (l *Logger) eventsToPlainLogLines(logEvents []string) ([]string, error) {
lines := []string{}
var object map[string]interface{}

for _, logEvent := range logEvents {
if logEvent == "" {
continue
}

err := json.Unmarshal([]byte(logEvent), &object)
if err != nil {
return []string{}, err
return fmt.Errorf("error unmarshaling log event '%s': %v", string(event), err)
}

switch eventType := object["event"].(string); {
case eventType == "cmd_started":
lines = append(lines, object["directive"].(string)+"\n")
if _, err := bufferedWriter.WriteString(object["directive"].(string) + "\n"); err != nil {
return fmt.Errorf("error writing to output: %v", err)
}
case eventType == "cmd_output":
lines = append(lines, object["output"].(string))
if _, err := bufferedWriter.WriteString(object["output"].(string)); err != nil {
return fmt.Errorf("error writing to output: %v", err)
}
default:
// We can ignore all the other event types here
}

return nil
})

if err != nil {
return "", fmt.Errorf("error iterating on log backend: %v", err)
}

err = bufferedWriter.Flush()
if err != nil {
return "", fmt.Errorf("error flushing buffered writer: %v", err)
}

return lines, nil
return tmpFile.Name(), nil
}

/*
* Convert the JSON logs file into a plain text one.
* Note: the caller must delete the generated plain text file after it's done with it.
*/
func (l *Logger) GeneratePlainTextFile() (string, error) {
return l.GeneratePlainTextFileIn(os.TempDir())
}

func (l *Logger) LogJobStarted() {
Expand Down
60 changes: 58 additions & 2 deletions pkg/eventlogger/logger_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
package eventlogger

import (
"crypto/rand"
"encoding/base64"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func Test__GeneratePlainLogs(t *testing.T) {
Expand All @@ -23,7 +25,7 @@ func Test__GeneratePlainLogs(t *testing.T) {
assert.NoError(t, err)
assert.FileExists(t, file)

bytes, err := ioutil.ReadFile(file)
bytes, err := os.ReadFile(file)
assert.NoError(t, err)

lines := strings.Split(string(bytes), "\n")
Expand All @@ -45,3 +47,57 @@ func Test__GeneratePlainLogs(t *testing.T) {
assert.NoError(t, logger.Close())
os.Remove(file)
}

func Benchmark__GeneratePlainLogs(b *testing.B) {
//
// We do not want to account for this setup time in our benchmark
// so we stop the timer here, while we are creating the file backend
// and generating and writing the log events to it.
//
b.StopTimer()
tmpFileName := filepath.Join(os.TempDir(), fmt.Sprintf("logs_%d.json", time.Now().UnixNano()))
backend, _ := NewFileBackend(tmpFileName, DefaultMaxSizeInBytes)
require.Nil(b, backend.Open())
logger, _ := NewLogger(backend)

//
// Write a lot of log events into our file backend.
// In this case, 1M `cmd_output` log events with a random string in it.
//
buf := make([]byte, 45)
expected := []string{}
expected = append(expected, "echo hello")
generateLogEventsWithOutputGenerator(b, 1000000, backend, func() string {
// #nosec
_, err := rand.Read(buf)
require.NoError(b, err)
o := base64.URLEncoding.EncodeToString(buf)
expected = append(expected, o)
return o
})

expected = append(expected, "")

//
// Actually run the benchmark.
// We start the timer at the beginning of the iteration,
// and stop it right after logger.GeneratePlainTextFile() returns,
// because we only want to account for the amount of time it takes
// for that function to run, but we also want to assert the output is correct.
//
for i := 0; i < b.N; i++ {
b.StartTimer()
file, err := logger.GeneratePlainTextFile()

b.StopTimer()
require.NoError(b, err)
require.FileExists(b, file)
bytes, err := os.ReadFile(file)
require.NoError(b, err)
assert.Equal(b, expected, strings.Split(string(bytes), "\n"))

os.Remove(file)
}

require.NoError(b, logger.Close())
}
8 changes: 5 additions & 3 deletions pkg/jobs/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,16 +540,16 @@ func (job *Job) teardownWithNoCallbacks(result string) error {

func (job *Job) uploadLogsAsArtifact(trimmed bool) {
if job.UploadJobLogs == config.UploadJobLogsConditionNever {
log.Infof("upload-job-logs=never - not uploading job logs as job artifact.")
log.Info("upload-job-logs=never - not uploading job logs as job artifact.")
return
}

if job.UploadJobLogs == config.UploadJobLogsConditionWhenTrimmed && !trimmed {
log.Infof("upload-job-logs=when-trimmed - logs were not trimmed, not uploading job logs as job artifact.")
log.Info("upload-job-logs=when-trimmed - logs were not trimmed, not uploading job logs as job artifact.")
return
}

log.Infof("Uploading job logs as job artifact...")
log.Info("Converting job logs to plain-text format...")
file, err := job.Logger.GeneratePlainTextFile()
if err != nil {
log.Errorf("Error converting '%s' to plain text: %v", file, err)
Expand Down Expand Up @@ -584,6 +584,8 @@ func (job *Job) uploadLogsAsArtifact(trimmed bool) {
cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", "SEMAPHORE_ARTIFACT_TOKEN", token))
cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", "SEMAPHORE_JOB_ID", job.Request.JobID))
cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", "SEMAPHORE_ORGANIZATION_URL", orgURL))

log.Info("Uploading job logs as artifact...")
output, err := cmd.CombinedOutput()
if err != nil {
log.Errorf("Error uploading job logs as artifact: %v, %s", err, output)
Expand Down

0 comments on commit 58209f7

Please sign in to comment.