Skip to content

Commit

Permalink
Asset server implementation (nektos#677)
Browse files Browse the repository at this point in the history
* Add asset server and upload handling of binary files

Co-authored-by: Björn Brauer <zaubernerd@zaubernerd.de>

* Add asset download parts to the asset server

Co-authored-by: Björn Brauer <zaubernerd@zaubernerd.de>

* Add artifact-server-path flag

If the flag is not defined, the artifact server isn't started.
This includes the configuration of ACTIONS_RUNTIME_URL and
ACTIONS_RUNTIME_TOKEN which are set if the server is started.

Co-authored-by: Björn Brauer <zaubernerd@zaubernerd.de>

* Move ACTIONS_RUNTIME_* vars into the withGithubEnv setup

Co-authored-by: Björn Brauer <zaubernerd@zaubernerd.de>

* feat: add artifact server port as flag

This commits adds a flag to define the artifact server port.
If not given, the port defaults to 34567.

Co-authored-by: Björn Brauer <bjoern.brauer@new-work.se>

* test: add artifact server tests

Co-authored-by: Björn Brauer <bjoern.brauer@new-work.se>

* refactor: use fs.FS

This allows to add tests with in-memory file system

* feat: add support for gzip encoded uploads

Co-authored-by: Björn Brauer <bjoern.brauer@new-work.se>

* test: add artifact integration test

* chore: run act tests with asset server path

Co-authored-by: Björn Brauer <bjoern.brauer@new-work.se>

* docs: add new cli flags

Co-authored-by: Björn Brauer <bjoern.brauer@new-work.se>

* test: add test workflow to testdata

* feat: add log output

* refactor: log shutdown error instead of panic

* feat: use outbound ip for the asset server

This change should allow to use the host ip in macos and windows.
Since docker is running in an intermediate vm, localhost is not
sufficient to have the artifacts in the host system.

* fix: do not use canceled context

To shutdown artifact server, we should not use the already canceled
context but the parent context instead.

Co-authored-by: Björn Brauer <zaubernerd@zaubernerd.de>

* feat: shutdown artifact server at end of pipeline

When the pipeline is done the asset server should be shut down
gracefully.

Co-authored-by: Björn Brauer <zaubernerd@zaubernerd.de>

* fix: close server if graceful shutdown failed

Co-authored-by: Björn Brauer <zaubernerd@zaubernerd.de>

* fix: ignore server closed error from listen call

Co-authored-by: Björn Brauer <zaubernerd@zaubernerd.de>

Co-authored-by: Björn Brauer <zaubernerd@zaubernerd.de>
Co-authored-by: Björn Brauer <bjoern.brauer@new-work.se>
  • Loading branch information
3 people authored Nov 10, 2021
1 parent f8b3563 commit 11f6ee3
Show file tree
Hide file tree
Showing 12 changed files with 775 additions and 2 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ It will save that information to `~/.actrc`, please refer to [Configuration](#co

```none
-a, --actor string user that triggered the event (default "nektos/act")
--artifact-server-path string Defines the path where the artifact server stores uploads and retrieves downloads from.
If not specified the artifact server will not start.
--artifact-server-port string Defines the port where the artifact server listens (will only bind to localhost). (default "34567")
-b, --bind bind working directory to container, rather than copy
--container-architecture string Architecture which should be used to run containers, e.g.: linux/amd64. If not specified, will use host default architecture. Requires Docker server API Version 1.41+. Ignored on earlier Docker server platforms.
--container-daemon-socket string Path to Docker daemon socket which will be mounted to containers (default "/var/run/docker.sock")
Expand Down
2 changes: 2 additions & 0 deletions cmd/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type Input struct {
containerCapAdd []string
containerCapDrop []string
autoRemove bool
artifactServerPath string
artifactServerPort string
}

func (i *Input) resolve(path string) string {
Expand Down
13 changes: 12 additions & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"

"github.com/nektos/act/pkg/artifacts"
"github.com/nektos/act/pkg/common"
"github.com/nektos/act/pkg/model"
"github.com/nektos/act/pkg/runner"
Expand Down Expand Up @@ -66,6 +67,8 @@ func Execute(ctx context.Context, version string) {
rootCmd.PersistentFlags().StringVarP(&input.containerArchitecture, "container-architecture", "", "", "Architecture which should be used to run containers, e.g.: linux/amd64. If not specified, will use host default architecture. Requires Docker server API Version 1.41+. Ignored on earlier Docker server platforms.")
rootCmd.PersistentFlags().StringVarP(&input.containerDaemonSocket, "container-daemon-socket", "", "/var/run/docker.sock", "Path to Docker daemon socket which will be mounted to containers")
rootCmd.PersistentFlags().StringVarP(&input.githubInstance, "github-instance", "", "github.com", "GitHub instance to use. Don't use this if you are not using GitHub Enterprise Server.")
rootCmd.PersistentFlags().StringVarP(&input.artifactServerPath, "artifact-server-path", "", "", "Defines the path where the artifact server stores uploads and retrieves downloads from. If not specified the artifact server will not start.")
rootCmd.PersistentFlags().StringVarP(&input.artifactServerPort, "artifact-server-port", "", "34567", "Defines the port where the artifact server listens (will only bind to localhost).")
rootCmd.SetArgs(args())

if err := rootCmd.Execute(); err != nil {
Expand Down Expand Up @@ -274,20 +277,28 @@ func newRunCommand(ctx context.Context, input *Input) func(*cobra.Command, []str
ContainerCapAdd: input.containerCapAdd,
ContainerCapDrop: input.containerCapDrop,
AutoRemove: input.autoRemove,
ArtifactServerPath: input.artifactServerPath,
ArtifactServerPort: input.artifactServerPort,
}
r, err := runner.New(config)
if err != nil {
return err
}

cancel := artifacts.Serve(ctx, input.artifactServerPath, input.artifactServerPort)

ctx = common.WithDryrun(ctx, input.dryrun)
if watch, err := cmd.Flags().GetBool("watch"); err != nil {
return err
} else if watch {
return watchAndRun(ctx, r.NewPlanExecutor(plan))
}

return r.NewPlanExecutor(plan)(ctx)
executor := r.NewPlanExecutor(plan).Finally(func(ctx context.Context) error {
cancel()
return nil
})
return executor(ctx)
}
}

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510
github.com/joho/godotenv v1.4.0
github.com/julienschmidt/httprouter v1.3.0
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51
github.com/kevinburke/ssh_config v1.1.0 // indirect
github.com/mattn/go-colorable v0.1.9 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,8 @@ github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4dN7jwJOQ1U=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs=
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8=
github.com/kevinburke/ssh_config v0.0.0-20201106050909-4977a11b4351/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF4nAY/ojJ6r6mM=
Expand Down
278 changes: 278 additions & 0 deletions pkg/artifacts/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
package artifacts

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"io/fs"
"net/http"
"os"
"path"
"path/filepath"
"strings"

"github.com/julienschmidt/httprouter"
"github.com/nektos/act/pkg/common"
log "github.com/sirupsen/logrus"
)

type FileContainerResourceURL struct {
FileContainerResourceURL string `json:"fileContainerResourceUrl"`
}

type NamedFileContainerResourceURL struct {
Name string `json:"name"`
FileContainerResourceURL string `json:"fileContainerResourceUrl"`
}

type NamedFileContainerResourceURLResponse struct {
Count int `json:"count"`
Value []NamedFileContainerResourceURL `json:"value"`
}

type ContainerItem struct {
Path string `json:"path"`
ItemType string `json:"itemType"`
ContentLocation string `json:"contentLocation"`
}

type ContainerItemResponse struct {
Value []ContainerItem `json:"value"`
}

type ResponseMessage struct {
Message string `json:"message"`
}

type MkdirFS interface {
fs.FS
MkdirAll(path string, perm fs.FileMode) error
Open(name string) (fs.File, error)
}

type MkdirFsImpl struct {
dir string
fs.FS
}

func (fsys MkdirFsImpl) MkdirAll(path string, perm fs.FileMode) error {
return os.MkdirAll(fsys.dir+"/"+path, perm)
}

func (fsys MkdirFsImpl) Open(name string) (fs.File, error) {
return os.OpenFile(fsys.dir+"/"+name, os.O_CREATE|os.O_RDWR, 0644)
}

var gzipExtension = ".gz__"

func uploads(router *httprouter.Router, fsys MkdirFS) {
router.POST("/_apis/pipelines/workflows/:runId/artifacts", func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
runID := params.ByName("runId")

json, err := json.Marshal(FileContainerResourceURL{
FileContainerResourceURL: fmt.Sprintf("http://%s/upload/%s", req.Host, runID),
})
if err != nil {
panic(err)
}

_, err = w.Write(json)
if err != nil {
panic(err)
}
})

router.PUT("/upload/:runId", func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
itemPath := req.URL.Query().Get("itemPath")
runID := params.ByName("runId")

if req.Header.Get("Content-Encoding") == "gzip" {
itemPath += gzipExtension
}

filePath := fmt.Sprintf("%s/%s", runID, itemPath)

err := fsys.MkdirAll(path.Dir(filePath), os.ModePerm)
if err != nil {
panic(err)
}

file, err := fsys.Open(filePath)
if err != nil {
panic(err)
}
defer file.Close()

writer, ok := file.(io.Writer)
if !ok {
panic(errors.New("File is not writable"))
}

if req.Body == nil {
panic(errors.New("No body given"))
}

_, err = io.Copy(writer, req.Body)
if err != nil {
panic(err)
}

json, err := json.Marshal(ResponseMessage{
Message: "success",
})
if err != nil {
panic(err)
}

_, err = w.Write(json)
if err != nil {
panic(err)
}
})

router.PATCH("/_apis/pipelines/workflows/:runId/artifacts", func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
json, err := json.Marshal(ResponseMessage{
Message: "success",
})
if err != nil {
panic(err)
}

_, err = w.Write(json)
if err != nil {
panic(err)
}
})
}

func downloads(router *httprouter.Router, fsys fs.FS) {
router.GET("/_apis/pipelines/workflows/:runId/artifacts", func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
runID := params.ByName("runId")

entries, err := fs.ReadDir(fsys, runID)
if err != nil {
panic(err)
}

var list []NamedFileContainerResourceURL
for _, entry := range entries {
list = append(list, NamedFileContainerResourceURL{
Name: entry.Name(),
FileContainerResourceURL: fmt.Sprintf("http://%s/download/%s", req.Host, runID),
})
}

json, err := json.Marshal(NamedFileContainerResourceURLResponse{
Count: len(list),
Value: list,
})
if err != nil {
panic(err)
}

_, err = w.Write(json)
if err != nil {
panic(err)
}
})

router.GET("/download/:container", func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
container := params.ByName("container")
itemPath := req.URL.Query().Get("itemPath")
dirPath := fmt.Sprintf("%s/%s", container, itemPath)

var files []ContainerItem
err := fs.WalkDir(fsys, dirPath, func(path string, entry fs.DirEntry, err error) error {
if !entry.IsDir() {
rel, err := filepath.Rel(dirPath, path)
if err != nil {
panic(err)
}

// if it was upload as gzip
rel = strings.TrimSuffix(rel, gzipExtension)

files = append(files, ContainerItem{
Path: fmt.Sprintf("%s/%s", itemPath, rel),
ItemType: "file",
ContentLocation: fmt.Sprintf("http://%s/artifact/%s/%s/%s", req.Host, container, itemPath, rel),
})
}
return nil
})
if err != nil {
panic(err)
}

json, err := json.Marshal(ContainerItemResponse{
Value: files,
})
if err != nil {
panic(err)
}

_, err = w.Write(json)
if err != nil {
panic(err)
}
})

router.GET("/artifact/*path", func(w http.ResponseWriter, req *http.Request, params httprouter.Params) {
path := params.ByName("path")[1:]

file, err := fsys.Open(path)
if err != nil {
// try gzip file
file, err = fsys.Open(path + gzipExtension)
if err != nil {
panic(err)
}
w.Header().Add("Content-Encoding", "gzip")
}

_, err = io.Copy(w, file)
if err != nil {
panic(err)
}
})
}

func Serve(ctx context.Context, artifactPath string, port string) context.CancelFunc {
serverContext, cancel := context.WithCancel(ctx)

if artifactPath == "" {
return cancel
}

router := httprouter.New()

log.Debugf("Artifacts base path '%s'", artifactPath)
fs := os.DirFS(artifactPath)
uploads(router, MkdirFsImpl{artifactPath, fs})
downloads(router, fs)
ip := common.GetOutboundIP().String()

server := &http.Server{Addr: fmt.Sprintf("%s:%s", ip, port), Handler: router}

// run server
go func() {
log.Infof("Start server on http://%s:%s", ip, port)
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatal(err)
}
}()

// wait for cancel to gracefully shutdown server
go func() {
<-serverContext.Done()

if err := server.Shutdown(ctx); err != nil {
log.Errorf("Failed shutdown gracefully - force shutdown: %v", err)
server.Close()
}
}()

return cancel
}
Loading

0 comments on commit 11f6ee3

Please sign in to comment.