Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Switch from containers to tasks #13

Merged
merged 9 commits into from
Jan 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added demo/baz
Empty file.
100 changes: 40 additions & 60 deletions internal/proc/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,52 +21,38 @@ import (
)

type container struct {
types.Spec
types.Container
cli *client.Client
types.PodSpec
types.Task
}

func (h *container) Init(ctx context.Context) error {
func (c *container) Run(ctx context.Context, stdout, stderr io.Writer) error {
cli, err := client.NewClientWithOpts(client.FromEnv)
if err != nil {
return err
}
h.cli = cli
return nil
}
defer cli.Close()

func (h *container) Build(ctx context.Context, stdout, stderr io.Writer) error {

if build := h.Container.Build; build != nil {
if build.HasMutex() {
if _, err := stdout.Write([]byte(fmt.Sprintf("waiting for mutex %q to unlock...\n", build.Mutex))); err != nil {
return err
}
mutex := KeyLock(build.Mutex)
mutex.Lock()
defer mutex.Unlock()
if _, err := stdout.Write([]byte(fmt.Sprintf("locked mutex %q\n", build.Mutex))); err != nil {
return err
}
}
if err := c.remove(ctx, cli); err != nil {
return err
}
dockerfile := filepath.Join(h.Image, "Dockerfile")

dockerfile := filepath.Join(c.Image, "Dockerfile")
if _, err := os.Stat(dockerfile); err == nil {
r, err := archive.TarWithOptions(filepath.Dir(dockerfile), &archive.TarOptions{})
if err != nil {
return err
}
defer r.Close()
resp, err := h.cli.ImageBuild(ctx, r, dockertypes.ImageBuildOptions{Dockerfile: filepath.Base(dockerfile), Tags: []string{h.Name}})
resp, err := cli.ImageBuild(ctx, r, dockertypes.ImageBuildOptions{Dockerfile: filepath.Base(dockerfile), Tags: []string{c.Name}})
if err != nil {
return err
}
defer resp.Body.Close()
if _, err = io.Copy(stdout, resp.Body); err != nil {
return err
}
} else if h.ImagePullPolicy != "Never" {
r, err := h.cli.ImagePull(ctx, h.Image, dockertypes.ImagePullOptions{})
} else if c.ImagePullPolicy != "Never" {
r, err := cli.ImagePull(ctx, c.Image, dockertypes.ImagePullOptions{})
if err != nil {
return err
}
Expand All @@ -77,51 +63,45 @@ func (h *container) Build(ctx context.Context, stdout, stderr io.Writer) error {
return err
}
}
return nil
}

func (h *container) Run(ctx context.Context, stdout, stderr io.Writer) error {
if err := h.remove(ctx); err != nil {
return err
}
portSet, portBindings, err := h.createPorts()
portSet, portBindings, err := c.createPorts()
if err != nil {
return err
}
binds, err := h.createBinds()
binds, err := c.createBinds()
if err != nil {
return err
}
image := h.Image
if _, err := os.Stat(filepath.Join(h.Image, "Dockerfile")); err == nil {
image = h.Name
image := c.Image
if _, err := os.Stat(filepath.Join(c.Image, "Dockerfile")); err == nil {
image = c.Name
}
created, err := h.cli.ContainerCreate(ctx, &dockercontainer.Config{
Hostname: h.Name,
created, err := cli.ContainerCreate(ctx, &dockercontainer.Config{
Hostname: c.Name,
ExposedPorts: portSet,
Tty: h.TTY,
Env: h.Env.Environ(),
Cmd: h.Args,
Tty: c.TTY,
Env: c.Env.Environ(),
Cmd: c.Args,
Image: image,
WorkingDir: h.WorkingDir,
WorkingDir: c.WorkingDir,
// TODO support entrypoint
Entrypoint: h.Command,
Labels: map[string]string{"name": h.Name},
Entrypoint: c.Command,
Labels: map[string]string{"name": c.Name},
}, &dockercontainer.HostConfig{
PortBindings: portBindings,
Binds: binds,
}, &network.NetworkingConfig{}, &v1.Platform{}, h.Name)
}, &network.NetworkingConfig{}, &v1.Platform{}, c.Name)
if err != nil {
return err
}
if err = h.cli.ContainerStart(ctx, created.ID, dockertypes.ContainerStartOptions{}); err != nil {
if err = cli.ContainerStart(ctx, created.ID, dockertypes.ContainerStartOptions{}); err != nil {
return err
}
go func() {
<-ctx.Done()
h.remove(context.Background())
c.remove(context.Background(), cli)
}()
logs, err := h.cli.ContainerLogs(ctx, h.Name, dockertypes.ContainerLogsOptions{
logs, err := cli.ContainerLogs(ctx, c.Name, dockertypes.ContainerLogsOptions{
ShowStdout: true,
ShowStderr: true,
Follow: true,
Expand All @@ -133,7 +113,7 @@ func (h *container) Run(ctx context.Context, stdout, stderr io.Writer) error {
if _, err = stdcopy.StdCopy(stdout, stderr, logs); err != nil {
return err
}
inspect, err := h.cli.ContainerInspect(ctx, created.ID)
inspect, err := cli.ContainerInspect(ctx, created.ID)
if err != nil {
return err
}
Expand All @@ -143,10 +123,10 @@ func (h *container) Run(ctx context.Context, stdout, stderr io.Writer) error {
return nil
}

func (h *container) createPorts() (nat.PortSet, map[nat.Port][]nat.PortBinding, error) {
func (c *container) createPorts() (nat.PortSet, map[nat.Port][]nat.PortBinding, error) {
portSet := nat.PortSet{}
portBindings := map[nat.Port][]nat.PortBinding{}
for _, p := range h.Ports {
for _, p := range c.Ports {
port, err := nat.NewPort("tcp", fmt.Sprint(p.ContainerPort))
if err != nil {
return nil, nil, err
Expand All @@ -160,10 +140,10 @@ func (h *container) createPorts() (nat.PortSet, map[nat.Port][]nat.PortBinding,
return portSet, portBindings, nil
}

func (h *container) createBinds() ([]string, error) {
func (c *container) createBinds() ([]string, error) {
var binds []string
for _, mount := range h.VolumeMounts {
for _, volume := range h.Spec.Volumes {
for _, mount := range c.VolumeMounts {
for _, volume := range c.PodSpec.Volumes {
if volume.Name == mount.Name {
abs, err := filepath.Abs(volume.HostPath.Path)
if err != nil {
Expand All @@ -176,16 +156,16 @@ func (h *container) createBinds() ([]string, error) {
return binds, nil
}

func (h *container) remove(ctx context.Context) error {
list, err := h.cli.ContainerList(ctx, dockertypes.ContainerListOptions{All: true})
func (c *container) remove(ctx context.Context, cli *client.Client) error {
list, err := cli.ContainerList(ctx, dockertypes.ContainerListOptions{All: true})
if err != nil {
return err
}
grace := h.Spec.GetTerminationGracePeriod()
grace := c.PodSpec.GetTerminationGracePeriod()
for _, existing := range list {
if existing.Labels["name"] == h.Name {
_ = h.cli.ContainerStop(ctx, existing.ID, &grace)
if err := h.cli.ContainerRemove(ctx, existing.ID, dockertypes.ContainerRemoveOptions{Force: true}); err != nil {
if existing.Labels["name"] == c.Name {
_ = cli.ContainerStop(ctx, existing.ID, &grace)
if err := cli.ContainerRemove(ctx, existing.ID, dockertypes.ContainerRemoveOptions{Force: true}); err != nil {
return err
}
}
Expand Down
58 changes: 16 additions & 42 deletions internal/proc/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,54 +13,26 @@ import (
)

type host struct {
types.Spec
types.Container
types.PodSpec
types.Task
}

func (h *host) Init(ctx context.Context) error {
return nil
}

func (h *host) Build(ctx context.Context, stdout, stderr io.Writer) error {
func (h *host) Run(ctx context.Context, stdout, stderr io.Writer) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
if build := h.Container.Build; build != nil {
if build.HasMutex() {
if _, err := stdout.Write([]byte(fmt.Sprintf("waiting for mutex %q to unlock...\n", build.Mutex))); err != nil {
return err
}
mutex := KeyLock(build.Mutex)
mutex.Lock()
defer mutex.Unlock()
if _, err := stdout.Write([]byte(fmt.Sprintf("locked mutex %q\n", build.Mutex))); err != nil {
return err
}
}
cmd := exec.CommandContext(ctx, build.Command[0], append(build.Command[1:], build.Args...)...)
cmd.Dir = build.WorkingDir
cmd.Stdin = os.Stdin
cmd.Stdout = stdout
cmd.Stderr = stderr
cmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid: true,
if h.Task.HasMutex() {
if _, err := stdout.Write([]byte(fmt.Sprintf("waiting for mutex %q to unlock...\n", h.Mutex))); err != nil {
return err
}
cmd.Env = append(os.Environ(), build.Env.Environ()...)
if err := cmd.Start(); err != nil {
mutex := KeyLock(h.Mutex)
mutex.Lock()
defer mutex.Unlock()
if _, err := stdout.Write([]byte(fmt.Sprintf("locked mutex %q\n", h.Mutex))); err != nil {
return err
}
go func() {
<-ctx.Done()
h.stop(cmd.Process.Pid)
}()
return cmd.Wait()
}
return nil
}

func (h *host) Run(ctx context.Context, stdout, stderr io.Writer) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
cmd := exec.CommandContext(ctx, h.Command[0], append(h.Command[1:], h.Args...)...)
cmd := exec.Command(h.Command[0], append(h.Command[1:], h.Args...)...)
cmd.Dir = h.WorkingDir
cmd.Stdin = os.Stdin
cmd.Stdout = stdout
Expand All @@ -74,17 +46,19 @@ func (h *host) Run(ctx context.Context, stdout, stderr io.Writer) error {
}
go func() {
<-ctx.Done()
h.stop(cmd.Process.Pid)
if err := h.stop(cmd.Process.Pid, stdout); err != nil {
_, _ = fmt.Fprintln(stderr, err.Error())
}
}()
return cmd.Wait()
}

func (h *host) stop(pid int) error {
func (h *host) stop(pid int, stdout io.Writer) error {
pgid, _ := syscall.Getpgid(pid)
if err := syscall.Kill(-pgid, syscall.SIGTERM); err == nil || isNotPermitted(err) {
return nil
}
time.Sleep(h.Spec.GetTerminationGracePeriod())
time.Sleep(h.PodSpec.GetTerminationGracePeriod())
if err := syscall.Kill(-pgid, syscall.SIGKILL); err == nil || isNotPermitted(err) {
return nil
} else {
Expand Down
12 changes: 4 additions & 8 deletions internal/proc/proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,14 @@ import (
)

type Interface interface {
// Init performs are one-time initialization.
Init(ctx context.Context) error
// Build does any build steps needed.
Build(ctx context.Context, stdout, stderr io.Writer) error
// Run runs the process.
Run(ctx context.Context, stdout, stderr io.Writer) error
}

func New(c types.Container, spec types.Spec) Interface {
if c.Image == "" {
return &host{Container: c, Spec: spec}
func New(t types.Task, spec types.PodSpec) Interface {
if t.Image == "" {
return &host{Task: t, PodSpec: spec}
} else {
return &container{Container: c, Spec: spec}
return &container{Task: t, PodSpec: spec}
}
}
13 changes: 4 additions & 9 deletions internal/types/log_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,13 @@ package types
import (
"io"
"strings"

"github.com/fatih/color"
)

type LogEntry struct {
Level string `json:"level"`
Msg string `json:"msg"`
}

func (e *LogEntry) String() string {
if e.Level == "error" {
return color.YellowString(e.Msg)
}
return e.Msg
}

func (s *LogEntry) Stdout() io.Writer {
return writeFunc(func(p []byte) (n int, err error) {
*s = LogEntry{"info", last(p)}
Expand All @@ -37,3 +28,7 @@ func (s *LogEntry) Stderr() io.Writer {
return len(p), nil
})
}

func (s *LogEntry) IsError() bool {
return s.Level == "error"
}
6 changes: 6 additions & 0 deletions internal/types/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package types

type Metadata struct {
Name string `json:"name"`
Annotations map[string]string `json:"annotations,omitempty"`
}
Loading