Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: compress job logs above a threshold before uploading it as an artifact #253

Merged
merged 4 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .semaphore/semaphore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ blocks:
- ssh_jump_points
- job_logs_as_artifact_default
- job_logs_as_artifact_always
- job_logs_as_artifact_compressed
- job_logs_as_artifact_never
- job_logs_as_artifact_not_trimmed
- job_logs_as_artifact_trimmed
Expand Down
40 changes: 40 additions & 0 deletions pkg/compression/gzip.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package compression

import (
"compress/gzip"
"fmt"
"io"
"os"
)

func Compress(rawFileName string) (string, error) {
// #nosec
rawFile, err := os.Open(rawFileName)
if err != nil {
return "", fmt.Errorf("error opening raw file %s: %v", rawFileName, err)
}

gzippedFileName := rawFileName + ".gz"

// #nosec
gzippedFile, err := os.Create(gzippedFileName)
if err != nil {
return "", fmt.Errorf("error creating file %s for compression: %v", gzippedFileName, err)
}

defer gzippedFile.Close()
gzipWriter := gzip.NewWriter(gzippedFile)
defer gzipWriter.Close()

_, err = io.Copy(gzipWriter, rawFile)
if err != nil {
return "", fmt.Errorf("error writing data into %s: %v", gzippedFileName, err)
}

err = gzipWriter.Flush()
if err != nil {
return "", fmt.Errorf("error flushing compressed data into %s: %v", gzippedFileName, err)
}

return gzippedFileName, nil
}
43 changes: 43 additions & 0 deletions pkg/compression/gzip_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package compression

import (
"compress/gzip"
"fmt"
"io"
"os"
"testing"

"github.com/stretchr/testify/require"
)

func Test__Compress(t *testing.T) {
// create text file for compression
raw, err := os.CreateTemp(os.TempDir(), "*.txt")
require.NoError(t, err)

defer os.Remove(raw.Name())

content := ""
for i := 0; i < 100; i++ {
l := fmt.Sprintf("[%d] abcdefghijklmnopqrstuvwxyz\n", i)
raw.WriteString(l)
content += l
}

require.NoError(t, raw.Close())

// compress file
compressed, err := Compress(raw.Name())
require.NoError(t, err)

defer os.Remove(compressed)

// decompress and assert its contents are correct
f, err := os.Open(compressed)
require.NoError(t, err)
gzipReader, err := gzip.NewReader(f)
require.NoError(t, err)
text, err := io.ReadAll(gzipReader)
require.NoError(t, err)
require.Equal(t, content, string(text))
}
118 changes: 109 additions & 9 deletions pkg/jobs/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import (
"os"
"os/exec"
"runtime"
"strconv"
"strings"
"time"

api "github.com/semaphoreci/agent/pkg/api"
"github.com/semaphoreci/agent/pkg/compression"
"github.com/semaphoreci/agent/pkg/config"
eventlogger "github.com/semaphoreci/agent/pkg/eventlogger"
executors "github.com/semaphoreci/agent/pkg/executors"
Expand All @@ -26,6 +28,13 @@ const JobPassed = "passed"
const JobFailed = "failed"
const JobStopped = "stopped"

// By default, we will only compress the job logs before uploading them as an artifact
// if their size goes above 100MB. However, the SEMAPHORE_AGENT_LOGS_COMPRESSION_SIZE environment
// variable can be used to configure that value, with anything between 1MB and 1GB being possible.
const MinSizeForCompression = 1024 * 1024
const DefaultSizeForCompression = 1024 * 1024 * 100
const MaxSizeForCompression = 1024 * 1024 * 1024

type Job struct {
Client *http.Client
Request *api.JobRequest
Expand Down Expand Up @@ -545,6 +554,100 @@ func (job *Job) teardownWithNoCallbacks(result string) error {
return nil
}

func (job *Job) minSizeForCompression() int64 {
fromEnv := os.Getenv("SEMAPHORE_AGENT_LOGS_COMPRESSION_SIZE")
if fromEnv == "" {
return DefaultSizeForCompression
}

n, err := strconv.ParseInt(fromEnv, 10, 64)
if err != nil {
log.Errorf(
"Error parsing SEMAPHORE_AGENT_LOGS_COMPRESSION_SIZE: %v - using default of %d",
err,
DefaultSizeForCompression,
)

return DefaultSizeForCompression
}

if n < MinSizeForCompression || n > MaxSizeForCompression {
log.Errorf(
"Invalid SEMAPHORE_AGENT_LOGS_COMPRESSION_SIZE %d, not in range %d-%d, using default %d",
n,
MinSizeForCompression,
MaxSizeForCompression,
DefaultSizeForCompression,
)

return DefaultSizeForCompression
}

return n
}

// #nosec
func (job *Job) findFileSize(fileName string) (int64, error) {
file, err := os.Open(fileName)
if err != nil {
return 0, fmt.Errorf("error opening %s: %v", fileName, err)
}

defer file.Close()
fileInfo, err := file.Stat()
if err != nil {
return 0, fmt.Errorf("error determining size for file %s: %v", fileName, err)
}

return fileInfo.Size(), nil
}

func (job *Job) prepareArtifactForUpload() (string, error) {
log.Info("Converting job logs to plain-text format...")
rawFileName, err := job.Logger.GeneratePlainTextFile()
if err != nil {
return "", fmt.Errorf("error converting '%s' to plain text: %v", rawFileName, err)
}

//
// If an error happens determing the size of the raw logs file,
// we should still try to upload the raw logs,
// so we don't return an error here.
//
rawFileSize, err := job.findFileSize(rawFileName)
if err != nil {
log.Errorf("Error determining size for %s: %v", rawFileName, err)
return rawFileName, nil
}

// If the size of the file is below our threshold for compression, we upload the raw file.
minSizeForCompression := job.minSizeForCompression()
if rawFileSize < minSizeForCompression {
log.Infof("Logs are below the minimum size for compression - size=%d, minimum=%d", rawFileSize, minSizeForCompression)
return rawFileName, nil
}

log.Info("Compressing job logs")

//
// If an error happens compressing the logs,
// we should still try to upload the raw logs,
// so we don't return an error here as well.
//
compressedFile, err := compression.Compress(rawFileName)
if err != nil {
log.Errorf("Error compressing job logs %s: %v - using raw file", rawFileName, err)
return rawFileName, nil
}

// Remove the raw file since we are using the compressed one now.
if err := os.Remove(rawFileName); err != nil {
log.Errorf("Error removing file %s: %v", rawFileName, err)
}

return compressedFile, nil
}

func (job *Job) uploadLogsAsArtifact(trimmed bool) {
if job.UploadJobLogs == config.UploadJobLogsConditionNever {
log.Info("upload-job-logs=never - not uploading job logs as job artifact.")
Expand All @@ -556,15 +659,6 @@ func (job *Job) uploadLogsAsArtifact(trimmed bool) {
return
}

log.Info("Converting job logs to plain-text format...")
file, err := job.Logger.GeneratePlainTextFile()
if err != nil {
log.Errorf("Error converting '%s' to plain text: %v", file, err)
return
}

defer os.Remove(file)

token, err := job.Request.FindEnvVar("SEMAPHORE_ARTIFACT_TOKEN")
if err != nil {
log.Error("Error uploading job logs as artifact - no SEMAPHORE_ARTIFACT_TOKEN available")
Expand All @@ -583,6 +677,12 @@ func (job *Job) uploadLogsAsArtifact(trimmed bool) {
return
}

file, err := job.prepareArtifactForUpload()
if err != nil {
log.Errorf("Error preparing artifact for upload: %v", err)
return
}

args := []string{"push", "job", file, "-d", "agent/job_logs.txt"}

// #nosec
Expand Down
19 changes: 19 additions & 0 deletions test/e2e.rb
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,25 @@ def wait_for_agent_to_shutdown
$strategy.wait_for_agent_to_shutdown
end

def assert_artifact_is_compressed
puts "Checking if artifact is available and compressed"

# We give 20s for the artifact to appear here, to give the agent enough time
# to realize the "archivator" has reached out for the logs, and can close the logger.
Timeout.timeout(20) do
loop do
`artifact pull job agent/job_logs.txt -f -d job_logs.gz && (gunzip -c job_logs.gz | tail -n1 | grep -q "Exporting SEMAPHORE_JOB_RESULT")`
if $?.exitstatus == 0
puts "sucess: agent/job_logs.txt exists and is compressed!"
break
else
print "."
sleep 2
end
end
end
end

def assert_artifact_is_available
puts "Checking if artifact is available"

Expand Down
62 changes: 62 additions & 0 deletions test/e2e/hosted/job_logs_as_artifact_compressed.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#!/bin/ruby
# rubocop:disable all

require_relative '../../e2e'

# Here, we use the SEMAPHORE_JOB_ID as the job ID for this test.
$JOB_ID = ENV["SEMAPHORE_JOB_ID"]

# Additionally, we pass the artifact related environment variables
# to the job, so that it can upload the job logs as an artifact after the job is done.
start_job <<-JSON
{
"job_id": "#{$JOB_ID}",
"executor": "shell",
"env_vars": [
{ "name": "SEMAPHORE_JOB_ID", "value": "#{Base64.strict_encode64(ENV["SEMAPHORE_JOB_ID"])}" },
{ "name": "SEMAPHORE_ORGANIZATION_URL", "value": "#{Base64.strict_encode64(ENV["SEMAPHORE_ORGANIZATION_URL"])}" },
{ "name": "SEMAPHORE_ARTIFACT_TOKEN", "value": "#{Base64.strict_encode64(ENV["SEMAPHORE_ARTIFACT_TOKEN"])}" },
{ "name": "SEMAPHORE_AGENT_UPLOAD_JOB_LOGS", "value": "#{Base64.strict_encode64("always")}" }
],
"files": [],
"commands": [
{ "directive": "for i in $(seq 1 5000); do echo \\\"${i} $(LC_ALL=C tr -dc 'A-Za-z0-9!#$%&()*+,-./:;<=>?@^_{|}~' </dev/urandom | head -c 256)\\\"; done" }
],
"epilogue_always_commands": [],
"callbacks": {
"finished": "#{finished_callback_url}",
"teardown_finished": "#{teardown_callback_url}"
},
"logger": {
"method": "pull"
}
}
JSON

wait_for_job_to_finish

assert_job_log <<-LOG
{"event":"job_started", "timestamp":"*"}

{"event":"cmd_started", "timestamp":"*", "directive":"Exporting environment variables"}
{"event":"cmd_output", "timestamp":"*", "output":"Exporting SEMAPHORE_AGENT_UPLOAD_JOB_LOGS\\n"}
{"event":"cmd_output", "timestamp":"*", "output":"Exporting SEMAPHORE_ARTIFACT_TOKEN\\n"}
{"event":"cmd_output", "timestamp":"*", "output":"Exporting SEMAPHORE_JOB_ID\\n"}
{"event":"cmd_output", "timestamp":"*", "output":"Exporting SEMAPHORE_ORGANIZATION_URL\\n"}
{"event":"cmd_finished", "timestamp":"*", "directive":"Exporting environment variables","exit_code":0,"finished_at":"*","started_at":"*"}

{"event":"cmd_started", "timestamp":"*", "directive":"Injecting Files"}
{"event":"cmd_finished", "timestamp":"*", "directive":"Injecting Files","exit_code":0,"finished_at":"*","started_at":"*"}

{"event":"cmd_started", "timestamp":"*", "directive":"for i in $(seq 1 5000); do echo \\\"${i} $(LC_ALL=C tr -dc 'A-Za-z0-9!#$%&()*+,-./:;<=>?@^_{|}~' </dev/urandom | head -c 256)\\\"; done"}
*** LONG_OUTPUT ***
{"event":"cmd_finished", "timestamp":"*", "directive":"for i in $(seq 1 5000); do echo \\\"${i} $(LC_ALL=C tr -dc 'A-Za-z0-9!#$%&()*+,-./:;<=>?@^_{|}~' </dev/urandom | head -c 256)\\\"; done","exit_code":0,"finished_at":"*","started_at":"*"}

{"event":"cmd_started", "timestamp":"*", "directive":"Exporting environment variables"}
{"event":"cmd_output", "timestamp":"*", "output":"Exporting SEMAPHORE_JOB_RESULT\\n"}
{"event":"cmd_finished", "timestamp":"*", "directive":"Exporting environment variables","exit_code":0,"started_at":"*","finished_at":"*"}

{"event":"job_finished", "timestamp":"*", "result":"passed"}
LOG

assert_artifact_is_compressed
2 changes: 1 addition & 1 deletion test/e2e_support/api_mode.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def boot_up_agent
system "docker stop $(docker ps -q)"
system "docker rm $(docker ps -qa)"
system "docker build -t agent -f Dockerfile.test ."
system "docker run --privileged --device /dev/ptmx --network=host -v /tmp/agent-temp-directory/:/tmp/agent-temp-directory -v /var/run/docker.sock:/var/run/docker.sock --name agent -tdi agent bash -c \"service ssh restart && export SEMAPHORE_AGENT_LOG_LEVEL=debug && nohup ./agent serve --port 30000 --auth-token-secret 'TzRVcspTmxhM9fUkdi1T/0kVXNETCi8UdZ8dLM8va4E' & sleep infinity\""
system "docker run --privileged --device /dev/ptmx --network=host -v /tmp/agent-temp-directory/:/tmp/agent-temp-directory -v /var/run/docker.sock:/var/run/docker.sock --name agent -tdi agent bash -c \"service ssh restart && export SEMAPHORE_AGENT_LOG_LEVEL=debug SEMAPHORE_AGENT_LOGS_COMPRESSION_SIZE=1048576 && nohup ./agent serve --port 30000 --auth-token-secret 'TzRVcspTmxhM9fUkdi1T/0kVXNETCi8UdZ8dLM8va4E' & sleep infinity\""

pingable = nil
until pingable
Expand Down
Loading