Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

K8s runner fetch workspace tgz #551

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion pkg/build/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package build

import (
"archive/tar"
"compress/gzip"
"context"
"errors"
"fmt"
Expand Down Expand Up @@ -1939,9 +1940,17 @@ func (b *Build) RetrieveWorkspace(ctx context.Context) error {
r, err := b.Runner.WorkspaceTar(ctx, b.containerConfig)
if err != nil {
return err
} else if r == nil {
return nil
}
defer r.Close()
tr := tar.NewReader(r)

gr, err := gzip.NewReader(r)
if err != nil {
return err
}
defer gr.Close()
tr := tar.NewReader(gr)

fs := apkofs.DirFS(b.WorkspaceDir)
for {
Expand Down
4 changes: 1 addition & 3 deletions pkg/container/bubblewrap_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package container

import (
"archive/tar"
"bytes"
"context"
"fmt"
"io"
Expand Down Expand Up @@ -123,8 +122,7 @@ func (bw *bubblewrap) TerminatePod(ctx context.Context, cfg *Config) error {
// WorkspaceTar implements Runner
// This is a noop for Bubblewrap, which uses bind-mounts to manage the workspace
func (bw *bubblewrap) WorkspaceTar(ctx context.Context, cfg *Config) (io.ReadCloser, error) {
var buffer bytes.Buffer
return io.NopCloser(&buffer), nil
return nil, nil
}

type bubblewrapOCILoader struct{}
Expand Down
4 changes: 1 addition & 3 deletions pkg/container/docker_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package container

import (
"bytes"
"context"
"fmt"
"io"
Expand Down Expand Up @@ -249,8 +248,7 @@ func (dk *docker) Run(ctx context.Context, cfg *Config, args ...string) error {
// WorkspaceTar implements Runner
// This is a noop for Docker, which uses bind-mounts to manage the workspace
func (d *docker) WorkspaceTar(ctx context.Context, cfg *Config) (io.ReadCloser, error) {
var buffer bytes.Buffer
return io.NopCloser(&buffer), nil
return nil, nil
}

type dockerLoader struct{}
Expand Down
114 changes: 98 additions & 16 deletions pkg/container/kubernetes_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type k8s struct {
clientset kubernetes.Interface
restConfig *rest.Config
logger log.Logger
pod *corev1.Pod
}

func KubernetesRunner(_ context.Context, logger log.Logger) (Runner, error) {
Expand Down Expand Up @@ -145,6 +146,7 @@ func (k *k8s) StartPod(ctx context.Context, cfg *Config) error {
}
k.logger.Infof("pod [%s/%s] is ready", pod.Namespace, pod.Name)

k.pod = pod
cfg.PodID = pod.Name
return nil
}
Expand Down Expand Up @@ -206,6 +208,7 @@ func (k *k8s) TerminatePod(ctx context.Context, cfg *Config) error {
}

cfg.PodID = ""
k.pod = nil
return nil
}

Expand All @@ -231,21 +234,11 @@ func (k *k8s) TestUsability(ctx context.Context) bool {

// WorkspaceTar implements Runner
func (k *k8s) WorkspaceTar(ctx context.Context, cfg *Config) (io.ReadCloser, error) {
cmd := []string{"tar", "cf", "-", "-C", runnerWorkdir, "melange-out"}
r, w := io.Pipe()

go func() {
defer w.Close()

if err := k.Exec(ctx, cfg.PodID, cmd, remotecommand.StreamOptions{
Stdout: w,
Stderr: os.Stderr,
}); err != nil {
panic(err)
}
}()

return r, nil
fetcher, err := newK8sTarFetcher(k.restConfig, k.pod)
if err != nil {
return nil, fmt.Errorf("creating k8s tar fetcher: %v", err)
}
return fetcher.Fetch(ctx)
}

// OCIImageLoader implements Runner
Expand Down Expand Up @@ -289,7 +282,7 @@ cd '%s'

// Backoff up to 4 times with a 1 second initial delay, tripling each time
backoff := wait.Backoff{
Steps: 4,
Steps: 6,
Duration: 1 * time.Second,
Factor: 3,
Jitter: 0.1,
Expand Down Expand Up @@ -633,3 +626,92 @@ func (k *k8sLoader) LoadImage(ctx context.Context, layer ggcrv1.Layer, arch apko

return ref.String(), nil
}

type k8sTarFetcher struct {
client kubernetes.Interface
restconfig *rest.Config
pod metav1.Object
}

func newK8sTarFetcher(restconfig *rest.Config, pod metav1.Object) (*k8sTarFetcher, error) {
client, err := kubernetes.NewForConfig(restconfig)
if err != nil {
return nil, err
}

return &k8sTarFetcher{
client: client,
restconfig: restconfig,
pod: pod,
}, nil
}

func (f *k8sTarFetcher) Fetch(ctx context.Context) (io.ReadCloser, error) {
readAt := func(w io.Writer, offset uint64) error {
req := f.client.CoreV1().RESTClient().Post().Resource("pods").Name(f.pod.GetName()).Namespace(f.pod.GetNamespace()).SubResource("exec").VersionedParams(&corev1.PodExecOptions{
Container: kubernetesBuilderPodWorkspaceContainerName,
Command: []string{
"/bin/sh", "-c",
// Write a gzip compressed tar stream to stdout, starting at the given offset (n)
fmt.Sprintf("([ -f /tmp/melange-out.tar.gz ] || tar -czf /tmp/melange-out.tar.gz -C %s melange-out) && cat /tmp/melange-out.tar.gz | tail -c+%d", runnerWorkdir, offset),
},
Stdout: true,
Stderr: true,
}, scheme.ParameterCodec)

exec, err := remotecommand.NewSPDYExecutor(f.restconfig, "POST", req.URL())
if err != nil {
return err
}
return exec.StreamWithContext(ctx, remotecommand.StreamOptions{
Stdout: w,
Stderr: os.Stderr,
})
}
tp := &retryableTarPipe{
MaxRetries: 5,
ReadAt: readAt,
}
tp.initReadFrom(0)
return tp, nil
}

type retryableTarPipe struct {
ReadAt func(w io.Writer, n uint64) error
MaxRetries int

reader *io.PipeReader
out *io.PipeWriter
progress uint64
retries int
}

func (p *retryableTarPipe) Close() error {
return p.out.Close()
}

func (p *retryableTarPipe) initReadFrom(n uint64) {
p.reader, p.out = io.Pipe()

go func() {
defer p.out.Close()
err := p.ReadAt(p.out, n)
if err != nil {
fmt.Println("failed to read: ", err)
}
}()
}

func (p *retryableTarPipe) Read(data []byte) (n int, err error) {
n, err = p.reader.Read(data)
if err != nil {
if p.retries < p.MaxRetries {
p.retries++
p.initReadFrom(p.progress + 1)
err = nil
}
} else {
p.progress += uint64(n)
}
return
}
2 changes: 1 addition & 1 deletion pkg/container/lima_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func (l *lima) WorkspaceTar(ctx context.Context, cfg *Config) (io.ReadCloser, er
go func() {
defer pw.Close()

if err := l.nerdctl(ctx, melangeVMName, nil, pw, nil, "exec", "-i", cfg.PodID, "tar", "cf", "-", "-C", runnerWorkdir, "melange-out"); err != nil {
if err := l.nerdctl(ctx, melangeVMName, nil, pw, nil, "exec", "-i", cfg.PodID, "tar", "czf", "-", "-C", runnerWorkdir, "melange-out"); err != nil {
pw.CloseWithError(fmt.Errorf("failed to tar workspace: %w", err))
}
}()
Expand Down
Loading