Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix artifact v4 upload above 8MB #31664

Merged
merged 18 commits into from
Sep 22, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 49 additions & 1 deletion routers/api/actions/artifacts_chunks.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,54 @@ func listChunksByRunID(st storage.ObjectStorage, runID int64) (map[int64][]*chun
return chunksMap, nil
}

func listChunksByRunIDV4(st storage.ObjectStorage, runID, artifactID int64, blist *BlockList) ([]*chunkFileItem, error) {
storageDir := fmt.Sprintf("tmpv4%d", runID)
var chunks []*chunkFileItem
chunkMap := map[string]*chunkFileItem{}
dummy := &chunkFileItem{}
for _, name := range blist.Latest {
chunkMap[name] = dummy
}
if err := st.IterateObjects(storageDir, func(fpath string, obj storage.Object) error {
lunny marked this conversation as resolved.
Show resolved Hide resolved
baseName := filepath.Base(fpath)
if !strings.HasPrefix(baseName, "block-") {
return nil
}
// when read chunks from storage, it only contains storage dir and basename,
// no matter the subdirectory setting in storage config
item := chunkFileItem{Path: storageDir + "/" + baseName, ArtifactID: artifactID}
var size int64
var b64chunkName string
if _, err := fmt.Sscanf(baseName, "block-%d-%d-%s", &item.RunID, &size, &b64chunkName); err != nil {
return fmt.Errorf("parse content range error: %v", err)
}
rchunkName, err := base64.URLEncoding.DecodeString(b64chunkName)
if err != nil {
return fmt.Errorf("failed to parse chunkName: %v", err)
}
chunkName := string(rchunkName)
item.End = item.Start + size - 1
if _, ok := chunkMap[chunkName]; ok {
chunkMap[chunkName] = &item
}
return nil
}); err != nil {
return nil, err
}
for i, name := range blist.Latest {
chunk, ok := chunkMap[name]
if !ok || chunk.Path == "" {
return nil, fmt.Errorf("missing Chunk (%d/%d): %s", i, len(blist.Latest), name)
}
chunks = append(chunks, chunk)
if i > 0 {
chunk.Start = chunkMap[blist.Latest[i-1]].End + 1
chunk.End += chunk.Start
}
}
return chunks, nil
}

func mergeChunksForRun(ctx *ArtifactContext, st storage.ObjectStorage, runID int64, artifactName string) error {
// read all db artifacts by name
artifacts, err := db.Find[actions.ActionArtifact](ctx, actions.FindArtifactsOptions{
Expand Down Expand Up @@ -230,7 +278,7 @@ func mergeChunksForArtifact(ctx *ArtifactContext, chunks []*chunkFileItem, st st
rawChecksum := hash.Sum(nil)
actualChecksum := hex.EncodeToString(rawChecksum)
if !strings.HasSuffix(checksum, actualChecksum) {
return fmt.Errorf("update artifact error checksum is invalid")
return fmt.Errorf("update artifact error checksum is invalid %v vs %v", checksum, actualChecksum)
}
}

Expand Down
145 changes: 106 additions & 39 deletions routers/api/actions/artifactsv4.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,15 @@ package actions
// PUT: http://localhost:3000/twirp/github.actions.results.api.v1.ArtifactService/UploadArtifact?sig=mO7y35r4GyjN7fwg0DTv3-Fv1NDXD84KLEgLpoPOtDI=&expires=2024-01-23+21%3A48%3A37.20833956+%2B0100+CET&artifactName=test&taskID=75&comp=block
// 1.3. Continue Upload Zip Content to Blobstorage (unauthenticated request), repeat until everything is uploaded
// PUT: http://localhost:3000/twirp/github.actions.results.api.v1.ArtifactService/UploadArtifact?sig=mO7y35r4GyjN7fwg0DTv3-Fv1NDXD84KLEgLpoPOtDI=&expires=2024-01-23+21%3A48%3A37.20833956+%2B0100+CET&artifactName=test&taskID=75&comp=appendBlock
// 1.4. Unknown xml payload to Blobstorage (unauthenticated request), ignored for now
// 1.4. BlockList xml payload to Blobstorage (unauthenticated request)
// Files of about 800MB are parallel in parallel and / or out of order, this file is needed to enshure the correct order
// PUT: http://localhost:3000/twirp/github.actions.results.api.v1.ArtifactService/UploadArtifact?sig=mO7y35r4GyjN7fwg0DTv3-Fv1NDXD84KLEgLpoPOtDI=&expires=2024-01-23+21%3A48%3A37.20833956+%2B0100+CET&artifactName=test&taskID=75&comp=blockList
// Request
// <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
// <BlockList>
// <Latest>blockId1</Latest>
// <Latest>blockId2</Latest>
// </BlockList>
// 1.5. FinalizeArtifact
// Post: /twirp/github.actions.results.api.v1.ArtifactService/FinalizeArtifact
// Request
Expand Down Expand Up @@ -82,6 +89,7 @@ import (
"crypto/hmac"
"crypto/sha256"
"encoding/base64"
"encoding/xml"
"fmt"
"io"
"net/http"
Expand Down Expand Up @@ -152,31 +160,34 @@ func ArtifactsV4Routes(prefix string) *web.Router {
return m
}

func (r artifactV4Routes) buildSignature(endp, expires, artifactName string, taskID int64) []byte {
func (r artifactV4Routes) buildSignature(endp, expires, artifactName string, taskID, artifactID int64) []byte {
mac := hmac.New(sha256.New, setting.GetGeneralTokenSigningSecret())
mac.Write([]byte(endp))
mac.Write([]byte(expires))
mac.Write([]byte(artifactName))
mac.Write([]byte(fmt.Sprint(taskID)))
mac.Write([]byte(fmt.Sprint(artifactID)))
return mac.Sum(nil)
}

func (r artifactV4Routes) buildArtifactURL(ctx *ArtifactContext, endp, artifactName string, taskID int64) string {
func (r artifactV4Routes) buildArtifactURL(ctx *ArtifactContext, endp, artifactName string, taskID, artifactID int64) string {
expires := time.Now().Add(60 * time.Minute).Format("2006-01-02 15:04:05.999999999 -0700 MST")
uploadURL := strings.TrimSuffix(httplib.GuessCurrentAppURL(ctx), "/") + strings.TrimSuffix(r.prefix, "/") +
"/" + endp + "?sig=" + base64.URLEncoding.EncodeToString(r.buildSignature(endp, expires, artifactName, taskID)) + "&expires=" + url.QueryEscape(expires) + "&artifactName=" + url.QueryEscape(artifactName) + "&taskID=" + fmt.Sprint(taskID)
"/" + endp + "?sig=" + base64.URLEncoding.EncodeToString(r.buildSignature(endp, expires, artifactName, taskID, artifactID)) + "&expires=" + url.QueryEscape(expires) + "&artifactName=" + url.QueryEscape(artifactName) + "&taskID=" + fmt.Sprint(taskID) + "&artifactID=" + fmt.Sprint(artifactID)
return uploadURL
}

func (r artifactV4Routes) verifySignature(ctx *ArtifactContext, endp string) (*actions.ActionTask, string, bool) {
rawTaskID := ctx.Req.URL.Query().Get("taskID")
rawArtifactID := ctx.Req.URL.Query().Get("artifactID")
sig := ctx.Req.URL.Query().Get("sig")
expires := ctx.Req.URL.Query().Get("expires")
artifactName := ctx.Req.URL.Query().Get("artifactName")
dsig, _ := base64.URLEncoding.DecodeString(sig)
taskID, _ := strconv.ParseInt(rawTaskID, 10, 64)
artifactID, _ := strconv.ParseInt(rawArtifactID, 10, 64)

expecedsig := r.buildSignature(endp, expires, artifactName, taskID)
expecedsig := r.buildSignature(endp, expires, artifactName, taskID, artifactID)
if !hmac.Equal(dsig, expecedsig) {
log.Error("Error unauthorized")
ctx.Error(http.StatusUnauthorized, "Error unauthorized")
Expand Down Expand Up @@ -271,6 +282,8 @@ func (r *artifactV4Routes) createArtifact(ctx *ArtifactContext) {
return
}
artifact.ContentEncoding = ArtifactV4ContentEncoding
artifact.FileSize = 0
artifact.FileCompressedSize = 0
if err := actions.UpdateArtifactByID(ctx, artifact.ID, artifact); err != nil {
log.Error("Error UpdateArtifactByID: %v", err)
ctx.Error(http.StatusInternalServerError, "Error UpdateArtifactByID")
Expand All @@ -279,7 +292,7 @@ func (r *artifactV4Routes) createArtifact(ctx *ArtifactContext) {

respData := CreateArtifactResponse{
Ok: true,
SignedUploadUrl: r.buildArtifactURL(ctx, "UploadArtifact", artifactName, ctx.ActionTask.ID),
SignedUploadUrl: r.buildArtifactURL(ctx, "UploadArtifact", artifactName, ctx.ActionTask.ID, artifact.ID),
}
r.sendProtbufBody(ctx, &respData)
}
Expand All @@ -293,38 +306,76 @@ func (r *artifactV4Routes) uploadArtifact(ctx *ArtifactContext) {
comp := ctx.Req.URL.Query().Get("comp")
switch comp {
case "block", "appendBlock":
// get artifact by name
artifact, err := r.getArtifactByName(ctx, task.Job.RunID, artifactName)
if err != nil {
log.Error("Error artifact not found: %v", err)
ctx.Error(http.StatusNotFound, "Error artifact not found")
return
blockid := ctx.Req.URL.Query().Get("blockid")
if blockid == "" {
// get artifact by name
artifact, err := r.getArtifactByName(ctx, task.Job.RunID, artifactName)
if err != nil {
log.Error("Error artifact not found: %v", err)
ctx.Error(http.StatusNotFound, "Error artifact not found")
return
}

_, err = appendUploadChunk(r.fs, ctx, artifact, artifact.FileSize, ctx.Req.ContentLength, artifact.RunID)
if err != nil {
log.Error("Error runner api getting task: task is not running")
ctx.Error(http.StatusInternalServerError, "Error runner api getting task: task is not running")
return
}
artifact.FileCompressedSize += ctx.Req.ContentLength
artifact.FileSize += ctx.Req.ContentLength
if err := actions.UpdateArtifactByID(ctx, artifact.ID, artifact); err != nil {
log.Error("Error UpdateArtifactByID: %v", err)
ctx.Error(http.StatusInternalServerError, "Error UpdateArtifactByID")
return
}
} else {
_, err := r.fs.Save(fmt.Sprintf("tmpv4%d/block-%d-%d-%s", task.Job.RunID, task.Job.RunID, ctx.Req.ContentLength, base64.URLEncoding.EncodeToString([]byte(blockid))), ctx.Req.Body, -1)
if err != nil {
log.Error("Error runner api getting task: task is not running")
ctx.Error(http.StatusInternalServerError, "Error runner api getting task: task is not running")
return
}
}

if comp == "block" {
artifact.FileSize = 0
artifact.FileCompressedSize = 0
}

_, err = appendUploadChunk(r.fs, ctx, artifact, artifact.FileSize, ctx.Req.ContentLength, artifact.RunID)
ctx.JSON(http.StatusCreated, "appended")
case "blocklist":
rawArtifactID := ctx.Req.URL.Query().Get("artifactID")
artifactID, _ := strconv.ParseInt(rawArtifactID, 10, 64)
_, err := r.fs.Save(fmt.Sprintf("tmpv4%d/%d-%d-blocklist", task.Job.RunID, task.Job.RunID, artifactID), ctx.Req.Body, -1)
if err != nil {
log.Error("Error runner api getting task: task is not running")
ctx.Error(http.StatusInternalServerError, "Error runner api getting task: task is not running")
return
}
artifact.FileCompressedSize += ctx.Req.ContentLength
artifact.FileSize += ctx.Req.ContentLength
if err := actions.UpdateArtifactByID(ctx, artifact.ID, artifact); err != nil {
log.Error("Error UpdateArtifactByID: %v", err)
ctx.Error(http.StatusInternalServerError, "Error UpdateArtifactByID")
return
}
ctx.JSON(http.StatusCreated, "appended")
case "blocklist":
ctx.JSON(http.StatusCreated, "created")
}
}

type BlockList struct {
Latest []string `xml:"Latest"`
}

type Latest struct {
Value string `xml:",chardata"`
}

func (r *artifactV4Routes) readBlockList(runID, artifactID int64) (*BlockList, error) {
blockListName := fmt.Sprintf("tmpv4%d/%d-%d-blocklist", runID, runID, artifactID)
s, err := r.fs.Open(blockListName)
if err != nil {
return nil, err
}
err = r.fs.Delete(blockListName)
if err != nil {
log.Warn("Failed to delete blockList %s: %v", blockListName, err)
}

xdec := xml.NewDecoder(s)
blockList := &BlockList{}
err = xdec.Decode(blockList)
return blockList, err
}

func (r *artifactV4Routes) finalizeArtifact(ctx *ArtifactContext) {
var req FinalizeArtifactRequest

Expand All @@ -343,18 +394,34 @@ func (r *artifactV4Routes) finalizeArtifact(ctx *ArtifactContext) {
ctx.Error(http.StatusNotFound, "Error artifact not found")
return
}
chunkMap, err := listChunksByRunID(r.fs, runID)

var chunks []*chunkFileItem
blockList, err := r.readBlockList(runID, artifact.ID)
if err != nil {
log.Error("Error merge chunks: %v", err)
ctx.Error(http.StatusInternalServerError, "Error merge chunks")
return
}
chunks, ok := chunkMap[artifact.ID]
if !ok {
log.Error("Error merge chunks")
ctx.Error(http.StatusInternalServerError, "Error merge chunks")
return
log.Warn("Failed to read BlockList, fallback to old behavior: %v", err)
chunkMap, err := listChunksByRunID(r.fs, runID)
if err != nil {
log.Error("Error merge chunks: %v", err)
ctx.Error(http.StatusInternalServerError, "Error merge chunks")
return
}
chunks, ok = chunkMap[artifact.ID]
if !ok {
log.Error("Error merge chunks")
ctx.Error(http.StatusInternalServerError, "Error merge chunks")
return
}
} else {
chunks, err = listChunksByRunIDV4(r.fs, runID, artifact.ID, blockList)
if err != nil {
log.Error("Error merge chunks: %v", err)
ctx.Error(http.StatusInternalServerError, "Error merge chunks")
return
}
artifact.FileSize = chunks[len(chunks)-1].End + 1
artifact.FileCompressedSize = chunks[len(chunks)-1].End + 1
}

checksum := ""
if req.Hash != nil {
checksum = req.Hash.Value
Expand Down Expand Up @@ -455,7 +522,7 @@ func (r *artifactV4Routes) getSignedArtifactURL(ctx *ArtifactContext) {
}
}
if respData.SignedUrl == "" {
respData.SignedUrl = r.buildArtifactURL(ctx, "DownloadArtifact", artifactName, ctx.ActionTask.ID)
respData.SignedUrl = r.buildArtifactURL(ctx, "DownloadArtifact", artifactName, ctx.ActionTask.ID, artifact.ID)
}
r.sendProtbufBody(ctx, &respData)
}
Expand Down