Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ephemeral CI run API #80

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
255 changes: 179 additions & 76 deletions cmd/omegaup-grader/ci.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
package main

import (
"archive/zip"
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"math/big"
"net/http"
"os"
"path"
"regexp"
"time"

"github.com/google/uuid"
git "github.com/libgit2/git2go/v33"

base "github.com/omegaup/go-base/v3"
Expand All @@ -20,20 +24,22 @@ import (
)

var (
ciURLRegexp = regexp.MustCompile(`^/ci/problem/([a-zA-Z0-9-_]+)/([0-9a-f]{40})/$`)
ciGitURLRegexp = regexp.MustCompile(`^/ci/problem/([a-zA-Z0-9-_]+)/([0-9a-f]{40})/$`)
ciEphemeralURLRegexp = regexp.MustCompile(`^/ci/ephemeral/(([0-9a-f-]{36})/)?$`)
)

type reportWithPath struct {
report *ci.Report
path string
type startedCIRequest struct {
report *ci.Report
path string
problemFiles *common.ProblemFiles
}

type ciHandler struct {
ephemeralRunManager *grader.EphemeralRunManager
ctx *grader.Context
lruCache *ci.LRUCache
stopChan chan struct{}
reportChan chan *reportWithPath
requestChan chan *startedCIRequest
doneChan chan struct{}
}

Expand All @@ -47,32 +53,52 @@ func (h *ciHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
},
)

if r.Method != http.MethodGet {
w.WriteHeader(http.StatusMethodNotAllowed)
return
report := &ci.Report{
StartTime: time.Now(),
State: ci.StateWaiting,
}

match := ciURLRegexp.FindStringSubmatch(r.URL.Path)
if match == nil {
var expectedMethod string

if match := ciGitURLRegexp.FindStringSubmatch(r.URL.Path); match != nil {

report.IsEphemeral = false
report.Problem = match[1]
report.Hash = match[2]
expectedMethod = http.MethodGet

} else if match := ciEphemeralURLRegexp.FindStringSubmatch(r.URL.Path); match != nil {

report.IsEphemeral = true
report.Problem = "+ephemeral+"

if len(match) > 3 {
expectedMethod = http.MethodGet
report.Hash = match[3]
} else {
expectedMethod = http.MethodPost
report.Hash = uuid.New().String()
}

} else {
w.WriteHeader(http.StatusNotFound)
return
}

report := &ci.Report{
Problem: match[1],
CommitHash: match[2],
StartTime: time.Now(),
State: ci.StateWaiting,
if r.Method != expectedMethod {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}

reportPath := path.Join(
ctx.Config.Grader.RuntimePath,
"ci",
report.Problem,
report.CommitHash[:2],
report.CommitHash[2:],
report.Hash[:2],
report.Hash[2:],
"report.json.gz",
)

if fd, err := os.Open(reportPath); err == nil {
defer fd.Close()

Expand All @@ -89,6 +115,11 @@ func (h *ciHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}

if r.Method == http.MethodPost {
w.WriteHeader(http.StatusBadRequest)
return
}

w.Header().Add("Content-Type", "application/json")
w.Header().Add("Content-Encoding", "gzip")
http.ServeContent(w, r, reportPath, st.ModTime(), fd)
Expand All @@ -97,49 +128,104 @@ func (h *ciHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

// Do the barest minimum checks before fully committing to making this CI
// run.
repository, err := git.OpenRepository(grader.GetRepositoryPath(
ctx.Config.Grader.RuntimePath,
report.Problem,
))
if err != nil {
ctx.Log.Error(
"failed to open repository",
map[string]interface{}{
"filename": reportPath,
"err": err,
},
)
w.WriteHeader(http.StatusNotFound)
return
}
defer repository.Free()
commitID, err := git.NewOid(report.CommitHash)
if err != nil {
ctx.Log.Error(
"failed to parse commit",
map[string]interface{}{
"filename": reportPath,
"commit": report.CommitHash,
"err": err,
},
)
w.WriteHeader(http.StatusNotFound)
return
}
commit, err := repository.LookupCommit(commitID)
if err != nil {
ctx.Log.Error(
"failed to lookup commit",
map[string]interface{}{
"filename": reportPath,
"commit": report.CommitHash,
"err": err,
},
)
w.WriteHeader(http.StatusNotFound)
return

// todo(frcepeda): check with frontend for rate limiting

var problemFiles *common.ProblemFiles

if report.IsEphemeral {
r.ParseMultipartForm((base.Byte(150) * base.Mebibyte).Bytes())
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo poner estos en un lugar más mono, pero no sé donde 🙃

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

esto va a timeoutear bien recio :/ tenemos problemas para subir zips grandes, así que esto va a estar súper flaky.

además que andar mandando zips de más de 100 MiB nos va a salir bastante caro ^^;; así que realísticamente sólo podríamos permitir como uno de estos por minuto, lo cual suena medio inútil para fines de CI.

problemZip, _, err := r.FormFile("problem.zip")

if err != nil {
ctx.Log.Error(
"failed to load problem package",
map[string]interface{}{
"filename": reportPath,
"err": err,
},
)
w.WriteHeader(http.StatusInternalServerError)
return
}
defer problemZip.Close()

limit := (base.Byte(100) * base.Mebibyte).Bytes()
buf := make([]byte, limit)
bufSize, err := io.ReadFull(io.LimitReader(problemZip, limit), buf)

if err != io.ErrUnexpectedEOF {
ctx.Log.Error(
"problem package too large",
map[string]interface{}{
"filename": reportPath,
"err": err,
},
)
w.WriteHeader(http.StatusRequestEntityTooLarge)
return
}

zipReader, err := zip.NewReader(bytes.NewReader(buf), int64(bufSize))
if err != nil {
ctx.Log.Error(
"failed to load problem package",
map[string]interface{}{
"filename": reportPath,
"err": err,
},
)
w.WriteHeader(http.StatusInternalServerError)
return
}

p := common.NewProblemFilesFromZip(zipReader, "<memory>")
problemFiles = &p
} else {
repository, err := git.OpenRepository(grader.GetRepositoryPath(
ctx.Config.Grader.RuntimePath,
report.Problem,
))
if err != nil {
ctx.Log.Error(
"failed to open repository",
map[string]interface{}{
"filename": reportPath,
"err": err,
},
)
w.WriteHeader(http.StatusNotFound)
return
}
defer repository.Free()
commitID, err := git.NewOid(report.Hash)
if err != nil {
ctx.Log.Error(
"failed to parse commit",
map[string]interface{}{
"filename": reportPath,
"commit": report.Hash,
"err": err,
},
)
w.WriteHeader(http.StatusNotFound)
return
}
commit, err := repository.LookupCommit(commitID)
if err != nil {
ctx.Log.Error(
"failed to lookup commit",
map[string]interface{}{
"filename": reportPath,
"commit": report.Hash,
"err": err,
},
)
w.WriteHeader(http.StatusNotFound)
return
}
defer commit.Free()
}
defer commit.Free()

ctx.Metrics.CounterAdd("grader_ci_jobs_total", 1)

Expand Down Expand Up @@ -200,9 +286,10 @@ func (h *ciHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

// Transfer the run to processCIRequest.
h.reportChan <- &reportWithPath{
report: report,
path: reportPath,
h.requestChan <- &startedCIRequest{
report: report,
path: reportPath,
problemFiles: problemFiles,
}
}

Expand Down Expand Up @@ -376,6 +463,7 @@ func (h *ciHandler) runTest(
func (h *ciHandler) processCIRequest(
report *ci.Report,
reportPath string,
problemFiles *common.ProblemFiles,
runs *grader.Queue,
) {
ctx := h.ctx.Wrap(context.TODO())
Expand All @@ -385,16 +473,28 @@ func (h *ciHandler) processCIRequest(
"report": report,
},
)
problemFiles, err := common.NewProblemFilesFromGit(
grader.GetRepositoryPath(
ctx.Config.Grader.RuntimePath,
report.Problem,
),
report.CommitHash,
)

var err error

if report.IsEphemeral {
if problemFiles == nil {
err = fmt.Errorf("Missing problem files for ephemeral run")
}
} else {
p, tmpE := common.NewProblemFilesFromGit(
grader.GetRepositoryPath(
ctx.Config.Grader.RuntimePath,
report.Problem,
),
report.Hash,
)
problemFiles = &p
err = tmpE
}

if err != nil {
ctx.Log.Error(
"Failed to validate commit",
"Failed to validate problem files",
map[string]interface{}{
"err": err,
},
Expand All @@ -418,10 +518,12 @@ func (h *ciHandler) processCIRequest(
}
return
}
ciRunConfig, err := ci.NewRunConfig(problemFiles, false)

ciRunConfig, err := ci.NewRunConfig(*problemFiles, false)

if err != nil {
ctx.Log.Error(
"Failed to validate commit",
"Failed to validate problem",
map[string]interface{}{
"err": err,
},
Expand Down Expand Up @@ -449,6 +551,7 @@ func (h *ciHandler) processCIRequest(
}
return
}

for _, testConfig := range ciRunConfig.TestConfigs {
report.Tests = append(report.Tests, testConfig.Test)
}
Expand Down Expand Up @@ -519,7 +622,7 @@ func (h *ciHandler) processCIRequest(

h.lruCache.AddRun(
path.Dir(reportPath),
fmt.Sprintf("%s/%s", report.Problem, report.CommitHash),
fmt.Sprintf("%s/%s", report.Problem, report.Hash),
)
}

Expand Down Expand Up @@ -554,8 +657,8 @@ func (h *ciHandler) run() {
close(h.doneChan)
return

case report := <-h.reportChan:
h.processCIRequest(report.report, report.path, runs)
case request := <-h.requestChan:
h.processCIRequest(request.report, request.path, request.problemFiles, runs)
}
}
}
Expand All @@ -581,7 +684,7 @@ func registerCIHandlers(
ctx: ctx,
lruCache: ci.NewLRUCache(ctx.Config.Grader.CI.CISizeLimit, ctx.Log),
stopChan: make(chan struct{}),
reportChan: make(chan *reportWithPath, 128),
requestChan: make(chan *startedCIRequest, 128),
doneChan: make(chan struct{}),
}
mux.Handle(ctx.Tracing.WrapHandle("/ci/", ciHandler))
Expand Down
Loading