Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add mime type in ExecuteResponse in runner service #549

Merged
merged 3 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
github.com/expr-lang/expr v1.16.1
github.com/fatih/color v1.16.0
github.com/fullstorydev/grpcurl v1.8.9
github.com/gabriel-vasile/mimetype v1.4.3
github.com/go-git/go-billy/v5 v5.5.0
github.com/gobwas/glob v0.2.3
github.com/golang/mock v1.6.0
Expand Down Expand Up @@ -52,7 +53,6 @@ require (
github.com/bufbuild/protocompile v0.6.0 // indirect
github.com/cyphar/filepath-securejoin v0.2.4 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.3 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/google/cel-go v0.20.1 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
Expand Down
5 changes: 5 additions & 0 deletions internal/api/runme/runner/v1/runner.proto
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,11 @@ message ExecuteResponse {
// pid contains the process' PID
// this is only sent once in an initial response for background processes.
ProcessPID pid = 4;

// mime_type is a detected MIME type of the stdout_data.
//
// This is only sent once in the first response containing stdout_data.
string mime_type = 5;
}

message ResolveProgramCommandList {
Expand Down
9 changes: 7 additions & 2 deletions internal/api/runme/runner/v2alpha1/runner.proto
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,14 @@ message ExecuteResponse {
bytes stderr_data = 3;

// pid contains the process' PID.
// This is only sent once in an initial response
// for background processes.
//
// This is only sent once in an initial response for background processes.
google.protobuf.UInt32Value pid = 4;

// mime_type is a detected MIME type of the stdout_data.
//
// This is only sent once in the first response containing stdout_data.
string mime_type = 5;
}

message ResolveProgramCommandList {
Expand Down
417 changes: 215 additions & 202 deletions internal/gen/proto/go/runme/runner/v1/runner.pb.go

Large diffs are not rendered by default.

267 changes: 140 additions & 127 deletions internal/gen/proto/go/runme/runner/v2alpha1/runner.pb.go

Large diffs are not rendered by default.

23 changes: 20 additions & 3 deletions internal/gen/proto/ts/runme/runner/v1/runner_pb.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,14 @@ export interface ExecuteResponse {
* @generated from protobuf field: runme.runner.v1.ProcessPID pid = 4;
*/
pid?: ProcessPID;
/**
* mime_type is a detected MIME type of the stdout_data.
*
* This is only sent once in the first response containing stdout_data.
*
* @generated from protobuf field: string mime_type = 5;
*/
mimeType: string;
}
/**
* @generated from protobuf message runme.runner.v1.ResolveProgramCommandList
Expand Down Expand Up @@ -817,9 +825,18 @@ declare class ProcessPID$Type extends MessageType<ProcessPID> {
/**
* @generated MessageType for protobuf message runme.runner.v1.ProcessPID
*/
export declare const ProcessPID: ProcessPID$Type;
declare class ExecuteResponse$Type extends MessageType<ExecuteResponse> {
constructor();
export const ProcessPID = new ProcessPID$Type();
// @generated message type with reflection information, may provide speed optimized methods
class ExecuteResponse$Type extends MessageType<ExecuteResponse> {
constructor() {
super("runme.runner.v1.ExecuteResponse", [
{ no: 1, name: "exit_code", kind: "message", T: () => UInt32Value },
{ no: 2, name: "stdout_data", kind: "scalar", T: 12 /*ScalarType.BYTES*/ },
{ no: 3, name: "stderr_data", kind: "scalar", T: 12 /*ScalarType.BYTES*/ },
{ no: 4, name: "pid", kind: "message", T: () => ProcessPID },
{ no: 5, name: "mime_type", kind: "scalar", T: 9 /*ScalarType.STRING*/ }
]);
}
}
/**
* @generated MessageType for protobuf message runme.runner.v1.ExecuteResponse
Expand Down
27 changes: 22 additions & 5 deletions internal/gen/proto/ts/runme/runner/v2alpha1/runner_pb.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -277,12 +277,20 @@ export interface ExecuteResponse {
stderrData: Uint8Array;
/**
* pid contains the process' PID.
* This is only sent once in an initial response
* for background processes.
*
* This is only sent once in an initial response for background processes.
*
* @generated from protobuf field: google.protobuf.UInt32Value pid = 4;
*/
pid?: UInt32Value;
/**
* mime_type is a detected MIME type of the stdout_data.
*
* This is only sent once in the first response containing stdout_data.
*
* @generated from protobuf field: string mime_type = 5;
*/
mimeType: string;
}
/**
* @generated from protobuf message runme.runner.v2alpha1.ResolveProgramCommandList
Expand Down Expand Up @@ -605,9 +613,18 @@ declare class ExecuteRequest$Type extends MessageType<ExecuteRequest> {
/**
* @generated MessageType for protobuf message runme.runner.v2alpha1.ExecuteRequest
*/
export declare const ExecuteRequest: ExecuteRequest$Type;
declare class ExecuteResponse$Type extends MessageType<ExecuteResponse> {
constructor();
export const ExecuteRequest = new ExecuteRequest$Type();
// @generated message type with reflection information, may provide speed optimized methods
class ExecuteResponse$Type extends MessageType<ExecuteResponse> {
constructor() {
super("runme.runner.v2alpha1.ExecuteResponse", [
{ no: 1, name: "exit_code", kind: "message", T: () => UInt32Value },
{ no: 2, name: "stdout_data", kind: "scalar", T: 12 /*ScalarType.BYTES*/ },
{ no: 3, name: "stderr_data", kind: "scalar", T: 12 /*ScalarType.BYTES*/ },
{ no: 4, name: "pid", kind: "message", T: () => UInt32Value },
{ no: 5, name: "mime_type", kind: "scalar", T: 9 /*ScalarType.STRING*/ }
]);
}
}
/**
* @generated MessageType for protobuf message runme.runner.v2alpha1.ExecuteResponse
Expand Down
20 changes: 18 additions & 2 deletions internal/runner/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/creack/pty"
"github.com/gabriel-vasile/mimetype"
"github.com/pkg/errors"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -435,16 +436,31 @@ func (r *runnerService) Execute(srv runnerv1.RunnerService_ExecuteServer) error
})

g.Go(func() error {
firstStdoutSent := false

for data := range datac {
logger.Debug("sending data", zap.Int("lenStdout", len(data.Stdout)), zap.Int("lenStderr", len(data.Stderr)))
err := srv.Send(&runnerv1.ExecuteResponse{

resp := &runnerv1.ExecuteResponse{
StdoutData: data.Stdout,
StderrData: data.Stderr,
})
}

if !firstStdoutSent && len(data.Stdout) > 0 {
if detected := mimetype.Detect(data.Stdout); detected != nil {
resp.MimeType = detected.String()
}
}

err := srv.Send(resp)
if err != nil {
return err
}

if len(resp.StdoutData) > 0 {
firstStdoutSent = true
}

if storeStdout && len(stdoutMem) < maxEnvSize {
// sanitize for environment variable
sanitized := bytes.ReplaceAll(data.Stdout, []byte{'\000'}, []byte{})
Expand Down
31 changes: 30 additions & 1 deletion internal/runner/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"testing"
"time"

runnerv1 "github.com/stateful/runme/v3/internal/gen/proto/go/runme/runner/v1"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
Expand All @@ -24,6 +23,8 @@ import (
"google.golang.org/grpc/status"
"google.golang.org/grpc/test/bufconn"
"google.golang.org/protobuf/proto"

runnerv1 "github.com/stateful/runme/v3/internal/gen/proto/go/runme/runner/v1"
)

func testCreateLogger(t *testing.T) *zap.Logger {
Expand Down Expand Up @@ -66,6 +67,7 @@ func testCreateRunnerServiceClient(
type executeResult struct {
Stdout []byte
Stderr []byte
MimeType string
ExitCode int
Err error
}
Expand All @@ -87,6 +89,9 @@ func getExecuteResult(
}
result.Stdout = append(result.Stdout, r.StdoutData...)
result.Stderr = append(result.Stderr, r.StderrData...)
if r.MimeType != "" {
result.MimeType = r.MimeType
}
if r.ExitCode != nil {
result.ExitCode = int(r.ExitCode.Value)
}
Expand Down Expand Up @@ -225,6 +230,30 @@ func Test_runnerService(t *testing.T) {
assert.EqualValues(t, 0, result.ExitCode)
})

t.Run("ExecuteBasicMimeType", func(t *testing.T) {
t.Parallel()

stream, err := client.Execute(context.Background())
require.NoError(t, err)

execResult := make(chan executeResult)
go getExecuteResult(stream, execResult)

err = stream.Send(&runnerv1.ExecuteRequest{
ProgramName: "bash",
CommandMode: runnerv1.CommandMode_COMMAND_MODE_INLINE_SHELL,
Commands: []string{`echo '{"field1": "value", "field2": 2}'`},
})
assert.NoError(t, err)

result := <-execResult

assert.NoError(t, result.Err)
assert.Equal(t, "{\"field1\": \"value\", \"field2\": 2}\n", string(result.Stdout))
assert.Contains(t, result.MimeType, "application/json")
assert.EqualValues(t, 0, result.ExitCode)
})

t.Run("Input", func(t *testing.T) {
t.Parallel()

Expand Down
41 changes: 38 additions & 3 deletions internal/runnerv2service/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os/exec"
"syscall"

"github.com/gabriel-vasile/mimetype"
"github.com/pkg/errors"
"go.uber.org/zap"

Expand Down Expand Up @@ -119,20 +120,50 @@ func (e *execution) Wait(ctx context.Context, sender sender) (int, error) {
lastStdout := rbuffer.NewRingBuffer(command.MaxEnvironSizInBytes)
defer e.storeLastOutput(lastStdout)

firstStdoutSent := false
errc := make(chan error, 2)

go func() {
errc <- readSendLoop(
e.stdout,
sender,
func(b []byte) *runnerv2alpha1.ExecuteResponse {
if len(b) == 0 {
return nil
}

_, _ = lastStdout.Write(b)
return &runnerv2alpha1.ExecuteResponse{StdoutData: b}

resp := &runnerv2alpha1.ExecuteResponse{StdoutData: b}

if !firstStdoutSent {
if detected := mimetype.Detect(b); detected != nil {
e.logger.Info("detected MIME type", zap.String("mime", detected.String()))
resp.MimeType = detected.String()
}
}

firstStdoutSent = true

e.logger.Debug("sending stdout data", zap.Any("resp", resp))

return resp
},
)
}()
go func() {
errc <- readSendLoop(e.stderr, sender, func(b []byte) *runnerv2alpha1.ExecuteResponse { return &runnerv2alpha1.ExecuteResponse{StderrData: b} })
errc <- readSendLoop(
e.stderr,
sender,
func(b []byte) *runnerv2alpha1.ExecuteResponse {
if len(b) == 0 {
return nil
}
resp := &runnerv2alpha1.ExecuteResponse{StderrData: b}
e.logger.Debug("sending stderr data", zap.Any("resp", resp))
return resp
},
)
}()

waitErr := e.Cmd.Wait()
Expand Down Expand Up @@ -278,7 +309,11 @@ func readSendLoop(reader io.Reader, sender sender, fn func([]byte) *runnerv2alph
continue
}

err = sender.Send(fn(buf[:n]))
msg := fn(buf[:n])
if msg == nil {
return nil
}
err = sender.Send(msg)
if err != nil {
return errors.WithStack(err)
}
Expand Down
Loading
Loading