Skip to content

Commit

Permalink
feat: enable streaming exec output in container engine [stream exec p…
Browse files Browse the repository at this point in the history
…t. 1] (#1043)

## Description:
Enables streaming output of exec commands out of docker and k8s as they
are being run on services.

## Is this change user facing?
NOT THIS PR (but stream exec pt. 2 will be)
  • Loading branch information
tedim52 authored Aug 8, 2023
1 parent 0a40a09 commit e8f34ef
Show file tree
Hide file tree
Showing 10 changed files with 499 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,6 @@ func (backend *DockerKurtosisBackend) GetUserServiceLogs(
return user_service_functions.GetUserServiceLogs(ctx, enclaveUuid, filters, shouldFollowLogs, backend.dockerManager)
}

// TODO Switch these to streaming so that huge command outputs don't blow up the API container memory
// NOTE: This function will block while the exec is ongoing; if we need more perf we can make it async
func (backend *DockerKurtosisBackend) RunUserServiceExecCommands(
ctx context.Context,
Expand All @@ -292,6 +291,15 @@ func (backend *DockerKurtosisBackend) RunUserServiceExecCommands(
return user_service_functions.RunUserServiceExecCommands(ctx, enclaveUuid, userServiceCommands, backend.dockerManager)
}

func (backend *DockerKurtosisBackend) RunUserServiceExecCommandWithStreamedOutput(
ctx context.Context,
enclaveUuid enclave.EnclaveUUID,
serviceUuid service.ServiceUUID,
cmd []string,
) (chan string, chan *exec_result.ExecResult, error) {
return user_service_functions.RunUserServiceExecCommandWithStreamedOutput(ctx, enclaveUuid, serviceUuid, cmd, backend.dockerManager)
}

func (backend *DockerKurtosisBackend) GetShellOnUserService(ctx context.Context, enclaveUuid enclave.EnclaveUUID, serviceUuid service.ServiceUUID) error {
return user_service_functions.GetShellOnUserService(ctx, enclaveUuid, serviceUuid, backend.dockerManager)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package user_service_functions

import (
"context"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_impls/docker/docker_kurtosis_backend/shared_helpers"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_impls/docker/docker_manager"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/enclave"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/exec_result"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/service"
"github.com/kurtosis-tech/stacktrace"
)

func RunUserServiceExecCommandWithStreamedOutput(
ctx context.Context,
enclaveId enclave.EnclaveUUID,
serviceUuid service.ServiceUUID,
cmd []string,
dockerManager *docker_manager.DockerManager,
) (chan string, chan *exec_result.ExecResult, error) {
userServiceUuids := map[service.ServiceUUID]bool{}
userServiceUuids[serviceUuid] = true
filters := &service.ServiceFilters{
Names: nil,
UUIDs: userServiceUuids,
Statuses: nil,
}
_, allDockerResources, err := shared_helpers.GetMatchingUserServiceObjsAndDockerResourcesNoMutex(ctx, enclaveId, filters, dockerManager)
if err != nil {
return nil, nil, stacktrace.Propagate(err, "An error occurred getting user services matching filters '%+v'", filters)
}

var userServiceDockerResource *shared_helpers.UserServiceDockerResources
if dockerResource, found := allDockerResources[serviceUuid]; found {
userServiceDockerResource = dockerResource
} else {
return nil, nil, stacktrace.NewError("No docker resources were found for the service with identifier: '%v'", serviceUuid)
}

userServiceDockerContainer := userServiceDockerResource.ServiceContainer

execOutputLinesChan, finalExecChan, err := dockerManager.RunExecCommandWithStreamedOutput(
ctx,
userServiceDockerContainer.GetId(),
cmd)
if err != nil {
return nil, nil, stacktrace.Propagate(err, "An error occurred attempting to stream exec output from docker.")
}

return execOutputLinesChan, finalExecChan, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package docker_manager

import (
"bufio"
"context"
"encoding/json"
"fmt"
Expand All @@ -21,6 +22,7 @@ import (
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_impls/docker/docker_kurtosis_backend/consts"
docker_manager_types "github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_impls/docker/docker_manager/types"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/compute_resources"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/exec_result"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/concurrent_writer"
"github.com/kurtosis-tech/stacktrace"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -116,7 +118,8 @@ const (

successfulExitCode = 0

emptyNetworkAlias = ""
emptyNetworkAlias = ""
streamOutputDelimiter = '\n'

isDockerNetworkAttachable = true

Expand Down Expand Up @@ -916,7 +919,7 @@ func (manager *DockerManager) RunExecCommand(context context.Context, containerI
}

execStartConfig := types.ExecStartCheck{
// Because detach is false, we'll block until the command comes back
// Can not be run in detached mode or else response from ContainerExecAttach doesn't return output
Detach: false,
Tty: false,
ConsoleSize: nil,
Expand All @@ -929,26 +932,20 @@ func (manager *DockerManager) RunExecCommand(context context.Context, containerI
// Therefore, we ONLY call Attach, without Start
attachResp, err := dockerClient.ContainerExecAttach(context, execId, execStartConfig)
if err != nil {
return 0, stacktrace.Propagate(
err,
"An error occurred starting/attaching to the exec command")
return 0, stacktrace.Propagate(err, "An error occurred starting/attaching to the exec command")
}
defer attachResp.Close()

// NOTE: We have to demultiplex the logs that come back
// This will keep reading until it receives EOF
concurrentWriter := concurrent_writer.NewConcurrentWriter(logOutput)
if _, err := stdcopy.StdCopy(concurrentWriter, concurrentWriter, attachResp.Reader); err != nil {
return 0, stacktrace.Propagate(
err,
"An error occurred copying the exec command output to the given output writer")
return 0, stacktrace.Propagate(err, "An error occurred copying the exec command output to the given output writer")
}

inspectResponse, err := dockerClient.ContainerExecInspect(context, execId)
if err != nil {
return 0, stacktrace.Propagate(
err,
"An error occurred inspecting the exec to get the response code")
return 0, stacktrace.Propagate(err, "An error occurred inspecting the exec to get the response code")
}
if inspectResponse.Running {
return 0, stacktrace.NewError("Expected exec to have stopped, but it's still running!")
Expand All @@ -961,6 +958,97 @@ func (manager *DockerManager) RunExecCommand(context context.Context, containerI
return int32ExitCode, nil
}

func (manager *DockerManager) RunExecCommandWithStreamedOutput(context context.Context, containerId string, command []string) (chan string, chan *exec_result.ExecResult, error) {
dockerClient := manager.dockerClient
execConfig := types.ExecConfig{
User: "",
Privileged: false,
Tty: false,
ConsoleSize: nil,
AttachStdin: false,
AttachStderr: true,
AttachStdout: true,
Detach: false,
DetachKeys: "",
Env: nil,
WorkingDir: "",
Cmd: command,
}

createResp, err := dockerClient.ContainerExecCreate(context, containerId, execConfig)
if err != nil {
return nil, nil, stacktrace.Propagate(err, "An error occurred creating the exec process")
}

execId := createResp.ID
if execId == "" {
return nil, nil, stacktrace.NewError("Got back an empty exec ID when running '%v' on container '%v'", command, containerId)
}

execStartConfig := types.ExecStartCheck{
// Can not be run in detached mode or else response from ContainerExecAttach doesn't return output
Detach: false,
Tty: false,
ConsoleSize: nil,
}

execOutputChan := make(chan string)
finalExecResultChan := make(chan *exec_result.ExecResult)
go func() {
defer func() {
close(execOutputChan)
close(finalExecResultChan)
}()

// IMPORTANT NOTE:
// You'd think that we'd need to call ContainerExecStart separately after this ContainerExecAttach....
// ...but ContainerExecAttach **actually starts the exec command!!!!**
// We used to be doing them both, but then we were hitting this occasional race condition: https://github.com/moby/moby/issues/42408
// Therefore, we ONLY call Attach, without Start
attachResp, err := dockerClient.ContainerExecAttach(context, execId, execStartConfig)
if err != nil {
execOutputChan <- err.Error()
return
}
defer attachResp.Close()

// Stream output from docker through output channel
reader := bufio.NewReader(attachResp.Reader)
for {
execOutputLine, err := reader.ReadString(streamOutputDelimiter)
if err != nil {
if err == io.EOF {
break
} else {
return
}
}

execOutputChan <- execOutputLine
}

inspectResponse, err := dockerClient.ContainerExecInspect(context, execId)
if err != nil {
execOutputChan <- err.Error()
return
}
if inspectResponse.Running {
execOutputChan <- stacktrace.NewError("Expected exec to have stopped, but it's still running!").Error()
return
}
unsizedExitCode := inspectResponse.ExitCode
if unsizedExitCode > math.MaxInt32 || unsizedExitCode < math.MinInt32 {
execOutputChan <- stacktrace.NewError("Could not cast unsized int '%v' to int32 because it does not fit", unsizedExitCode).Error()
return
}
int32ExitCode := int32(unsizedExitCode)

// Don't send output in final result because it was already streamed
finalExecResultChan <- exec_result.NewExecResult(int32ExitCode, "")
}()
return execOutputChan, finalExecResultChan, nil
}

/*
ConnectContainerToNetwork
Connects the container with the given container ID to the network with the given network ID, using the given IP address
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,6 @@ func (backend *KubernetesKurtosisBackend) GetUserServiceLogs(
backend.kubernetesManager)
}

// TODO Switch these to streaming methods, so that huge command outputs don't blow up the memory of the API container
func (backend *KubernetesKurtosisBackend) RunUserServiceExecCommands(
ctx context.Context,
enclaveUuid enclave.EnclaveUUID,
Expand All @@ -349,6 +348,23 @@ func (backend *KubernetesKurtosisBackend) RunUserServiceExecCommands(
backend.kubernetesManager)
}

func (backend *KubernetesKurtosisBackend) RunUserServiceExecCommandWithStreamedOutput(
ctx context.Context,
enclaveUuid enclave.EnclaveUUID,
serviceUuid service.ServiceUUID,
cmd []string,
) (chan string, chan *exec_result.ExecResult, error) {
return user_services_functions.RunUserServiceExecCommandWithStreamedOutput(
ctx,
enclaveUuid,
serviceUuid,
cmd,
backend.cliModeArgs,
backend.apiContainerModeArgs,
backend.engineServerModeArgs,
backend.kubernetesManager)
}

func (backend *KubernetesKurtosisBackend) GetShellOnUserService(ctx context.Context, enclaveUuid enclave.EnclaveUUID, serviceUuid service.ServiceUUID) (resultErr error) {
objectAndResources, err := shared_helpers.GetSingleUserServiceObjectsAndResources(ctx, enclaveUuid, serviceUuid, backend.cliModeArgs, backend.apiContainerModeArgs, backend.engineServerModeArgs, backend.kubernetesManager)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package user_services_functions

import (
"context"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_impls/kubernetes/kubernetes_kurtosis_backend/shared_helpers"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_impls/kubernetes/kubernetes_manager"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/container_status"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/enclave"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/exec_result"
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/backend_interface/objects/service"
"github.com/kurtosis-tech/stacktrace"
)

func RunUserServiceExecCommandWithStreamedOutput(
ctx context.Context,
enclaveId enclave.EnclaveUUID,
serviceUuid service.ServiceUUID,
cmd []string,
cliModeArgs *shared_helpers.CliModeArgs,
apiContainerModeArgs *shared_helpers.ApiContainerModeArgs,
engineServerModeArgs *shared_helpers.EngineServerModeArgs,
kubernetesManager *kubernetes_manager.KubernetesManager,
) (chan string, chan *exec_result.ExecResult, error) {
namespaceName, err := shared_helpers.GetEnclaveNamespaceName(ctx, enclaveId, cliModeArgs, apiContainerModeArgs, engineServerModeArgs, kubernetesManager)
if err != nil {
return nil, nil, stacktrace.Propagate(err, "An error occurred getting namespace name for enclave '%v'", enclaveId)
}

requestedGuids := map[service.ServiceUUID]bool{}
requestedGuids[serviceUuid] = true
matchingServicesFilters := &service.ServiceFilters{
Names: nil,
UUIDs: requestedGuids,
Statuses: nil,
}
matchingObjectsAndResources, err := shared_helpers.GetMatchingUserServiceObjectsAndKubernetesResources(ctx, enclaveId, matchingServicesFilters, cliModeArgs, apiContainerModeArgs, engineServerModeArgs, kubernetesManager)
if err != nil {
return nil, nil, stacktrace.Propagate(err, "An error occurred getting user services matching the requested UUIDs: %v", requestedGuids)
}

var userServiceKubernetesResource *shared_helpers.UserServiceObjectsAndKubernetesResources
if resource, found := matchingObjectsAndResources[serviceUuid]; found {
userServiceKubernetesResource = resource
} else {
return nil, nil, stacktrace.NewError(
"Cannot execute command '%+v' on service '%v' because no Kubernetes resources were found for it",
cmd,
serviceUuid)
}

userServiceKubernetesService := userServiceKubernetesResource.Service
if userServiceKubernetesService == nil {
return nil, nil, stacktrace.Propagate(err, "An error was found while running exec with streamed output over kubernetes for service '%s' and command '%v'.",
cmd,
serviceUuid)
}
if userServiceKubernetesService.GetStatus() != container_status.ContainerStatus_Running {
return nil, nil, stacktrace.NewError(
"Cannot execute command '%+v' on service '%v' because the service status is '%v'",
cmd,
serviceUuid,
userServiceKubernetesService.GetStatus().String())
}

userServiceKubernetesPod := userServiceKubernetesResource.KubernetesResources.Pod

execOutputLinesChan, finalResultChan, err := kubernetesManager.RunExecCommandWithStreamedOutput(
ctx,
namespaceName,
userServiceKubernetesPod.Name,
userServiceContainerName,
cmd)
if err != nil {
return nil, nil, stacktrace.Propagate(err, "An error occurred attempting to stream exec output from docker.")
}
return execOutputLinesChan, finalResultChan, nil
}
Loading

0 comments on commit e8f34ef

Please sign in to comment.