Skip to content

Commit

Permalink
fix(server): serve artifacts directly from disk to support large arti…
Browse files Browse the repository at this point in the history
…facts

When serving very large artifacts, first loading them into memory can potentially
cause the pod to go OOM/crash depending on how much memory is available and what
limits have been set.  Rather than loading it into memory, we can serve files
directly from disk.

Fixes argoproj#4588

Signed-off-by: Daniel Herman <dherman@factset.com>
  • Loading branch information
dcherman committed Nov 24, 2020
1 parent e3aaf2f commit c774944
Showing 1 changed file with 29 additions and 25 deletions.
54 changes: 29 additions & 25 deletions server/artifacts/artifact_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"net/http"
"os"
"path"
"strconv"
"strings"
"time"

log "github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -62,15 +64,12 @@ func (a *ArtifactServer) GetArtifact(w http.ResponseWriter, r *http.Request) {
return
}

data, filename, err := a.getArtifact(ctx, wf, nodeId, artifactName)
err = a.returnArtifact(ctx, w, r, wf, nodeId, artifactName)

if err != nil {
a.serverInternalError(err, w)
return
}

w.Header().Add("Content-Disposition", fmt.Sprintf(`filename="%s"`, filename))
a.ok(w, data)
}

func (a *ArtifactServer) GetArtifactByUID(w http.ResponseWriter, r *http.Request) {
Expand All @@ -96,15 +95,12 @@ func (a *ArtifactServer) GetArtifactByUID(w http.ResponseWriter, r *http.Request
return
}

data, filename, err := a.getArtifact(ctx, wf, nodeId, artifactName)
err = a.returnArtifact(ctx, w, r, wf, nodeId, artifactName)

if err != nil {
a.serverInternalError(err, w)
return
}

w.Header().Add("Content-Disposition", fmt.Sprintf(`filename="%s"`, filename))
a.ok(w, data)
}

func (a *ArtifactServer) gateKeeping(r *http.Request) (context.Context, error) {
Expand All @@ -123,50 +119,58 @@ func (a *ArtifactServer) gateKeeping(r *http.Request) (context.Context, error) {
return a.gatekeeper.Context(ctx)
}

func (a *ArtifactServer) ok(w http.ResponseWriter, data []byte) {
w.WriteHeader(200)
_, err := w.Write(data)
if err != nil {
a.serverInternalError(err, w)
}
}

func (a *ArtifactServer) serverInternalError(err error, w http.ResponseWriter) {
w.WriteHeader(500)
_, _ = w.Write([]byte(err.Error()))
}

func (a *ArtifactServer) getArtifact(ctx context.Context, wf *wfv1.Workflow, nodeId, artifactName string) ([]byte, string, error) {
func (a *ArtifactServer) returnArtifact(ctx context.Context, w http.ResponseWriter, r *http.Request, wf *wfv1.Workflow, nodeId, artifactName string) error {
kubeClient := auth.GetKubeClient(ctx)

art := wf.Status.Nodes[nodeId].Outputs.GetArtifactByName(artifactName)
if art == nil {
return nil, "", fmt.Errorf("artifact not found")
return fmt.Errorf("artifact not found")
}

driver, err := a.artDriverFactory(art, resources{kubeClient, wf.Namespace})
if err != nil {
return nil, "", err
return err
}
tmp, err := ioutil.TempFile("/tmp", "artifact")
if err != nil {
return nil, "", err
return err
}
tmpPath := tmp.Name()
defer func() { _ = os.Remove(tmpPath) }()

err = driver.Load(art, tmpPath)
if err != nil {
return nil, "", err
return err
}

file, err := os.Open(tmpPath)

if err != nil {
return err
}

file, err := ioutil.ReadFile(tmpPath)
defer file.Close()

stats, err := file.Stat()

if err != nil {
return nil, "", err
return err
}
log.WithFields(log.Fields{"size": len(file)}).Debug("Artifact file size")

return file, path.Base(art.GetKey()), nil
contentLength := strconv.FormatInt(stats.Size(), 10)
log.WithFields(log.Fields{"size": contentLength}).Debug("Artifact file size")

w.Header().Add("Content-Disposition", fmt.Sprintf(`filename="%s"`, path.Base(art.GetKey())))
w.WriteHeader(200)

http.ServeContent(w, r, "", time.Time{}, file)

return nil
}

func (a *ArtifactServer) getWorkflowAndValidate(ctx context.Context, namespace string, workflowName string) (*wfv1.Workflow, error) {
Expand Down

0 comments on commit c774944

Please sign in to comment.