Skip to content

Commit

Permalink
feat: Adding the ability to attach to containers
Browse files Browse the repository at this point in the history
  • Loading branch information
clintjedwards committed May 2, 2023
1 parent 0dc073a commit 24851d2
Show file tree
Hide file tree
Showing 13 changed files with 1,979 additions and 1,076 deletions.
2 changes: 2 additions & 0 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ There are several useful things we can do with the concept of extensions:

### On the floor

- Attach should have a header when you attach to a container and a > prompt
- Document all attach stuff
- Force for both objects and secrets need to be checked. Some layers of the process does not respect it.

- Github Extension:
Expand Down
17 changes: 16 additions & 1 deletion containers/extensions/github/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,15 +261,30 @@ func (t *extension) ExternalEvent(ctx context.Context, request *proto.ExtensionE
return &proto.ExtensionExternalEventResponse{}, nil
}

// configID = "app_id"
// configInstallation = "app_installation"
// configKey = "app_key"
// configWebhookKey = "app_webhook_secret"

// InstallInstructions are Gofer's way to allowing the extension to guide Gofer administrators through their
// personal installation process. This is needed because some extensions might require special auth tokens and information
// in a way that might be confusing for extension administrators.
func installInstructions() sdk.InstallInstructions {
instructions := sdk.NewInstructionsBuilder()
// instructions = instructions.AddMessage(":: The Github extension allows users to trigger their pipelines based on Github"+
// " events.").
// AddMessage("").
// AddMessage("Because of Gofer's self-hosted architecture, The following steps will walk you through creating and" +
// " registering the Github extension as a Github app. This allows the Github extension to have to proper access in order to").
// AddMessage("First, let's prevent users from setting too low of an interval by setting a minimum duration. "+
// "Durations are set via Golang duration strings. For example, entering a duration of '10h' would be 10 hours, "+
// "meaning that users can only run their pipeline at most every 10 hours. "+
// "You can find more documentation on valid strings here: https://pkg.go.dev/time#ParseDuration.").
// AddQuery("Set a minimum duration for all pipelines", ConfigMinDuration)

return instructions
}

// TODO():
func main() {
extension, err := newExtension()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion containers/extensions/interval/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func (e *extension) Shutdown(ctx context.Context, request *proto.ExtensionShutdo
// in a way that might be confusing for extension administrators.
func installInstructions() sdk.InstallInstructions {
instructions := sdk.NewInstructionsBuilder()
instructions = instructions.AddMessage(":: The interval extension allows users to extension their pipelines on the passage"+
instructions = instructions.AddMessage(":: The interval extension allows users to run their pipelines on the passage"+
" of time by setting a particular duration.").
AddMessage("").
AddMessage("First, let's prevent users from setting too low of an interval by setting a minimum duration. "+
Expand Down
49 changes: 49 additions & 0 deletions internal/api/taskRun.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package api

import (
"time"
"unicode/utf8"

"github.com/clintjedwards/gofer/internal/models"
"github.com/clintjedwards/gofer/internal/scheduler"
Expand Down Expand Up @@ -30,3 +31,51 @@ func (api *API) cancelTaskRun(taskRun *models.TaskRun, force bool) error {

return nil
}

// scanWordsWithWhitespace is a split function for a Scanner that returns each
// space-separated word of text. The definition of space is set by unicode.IsSpace.
func scanWordsWithWhitespace(data []byte, atEOF bool) (advance int, token []byte, err error) {
start := 0

// Scan until space, marking end of word.
for width, i := 0, start; i < len(data); i += width {
var r rune
r, width = utf8.DecodeRune(data[i:])
if isSpace(r) {
return i + width, data[start : i+1], nil
}
}

// If we're at EOF, we have a final, non-empty, non-terminated word. Return it.
if atEOF && len(data) > start {
return len(data), data[start:], nil
}

// Request more data.
return start, nil, nil
}

// isSpace reports whether the character is a Unicode white space character.
// We avoid dependency on the unicode package, but check validity of the implementation
// in the tests.
func isSpace(r rune) bool {
if r <= '\u00FF' {
// Obvious ASCII ones: \t through \r plus space. Plus two Latin-1 oddballs.
switch r {
case ' ', '\t', '\n', '\v', '\f', '\r':
return true
case '\u0085', '\u00A0':
return true
}
return false
}
// High-valued ones.
if '\u2000' <= r && r <= '\u200a' {
return true
}
switch r {
case '\u1680', '\u2028', '\u2029', '\u202f', '\u205f', '\u3000':
return true
}
return false
}
219 changes: 219 additions & 0 deletions internal/api/taskRunHandlers.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package api

import (
"bufio"
"context"
"errors"
"os"
"strings"

"github.com/clintjedwards/gofer/events"
"github.com/clintjedwards/gofer/internal/models"
"github.com/clintjedwards/gofer/internal/scheduler"
"github.com/clintjedwards/gofer/internal/storage"
proto "github.com/clintjedwards/gofer/proto/go"

Expand Down Expand Up @@ -119,6 +123,221 @@ func (api *API) CancelTaskRun(ctx context.Context, request *proto.CancelTaskRunR
return &proto.CancelTaskRunResponse{}, nil
}

func (api *API) AttachToTaskRun(stream proto.Gofer_AttachToTaskRunServer) error {
// Get the first message so we can attempt to set up the connection with the proper docker container.
initMessageRaw, err := stream.Recv()
if err != nil {
log.Error().Err(err).Msg("could not set up stream")
return status.Errorf(codes.Internal, "could not set up stream: %v", err)
}

initMessage, ok := initMessageRaw.RequestType.(*proto.AttachToTaskRunRequest_Init)
if !ok {
return status.Error(codes.FailedPrecondition, "first message must be init message, received input message")
}

// Validate input
if initMessage.Init.Id == "" {
return status.Error(codes.FailedPrecondition, "id required")
}

if initMessage.Init.PipelineId == "" {
return status.Error(codes.FailedPrecondition, "pipeline id required")
}

if initMessage.Init.RunId == 0 {
return status.Error(codes.FailedPrecondition, "run id required")
}

namespace, err := api.resolveNamespace(stream.Context(), initMessage.Init.NamespaceId)
if err != nil {
return status.Errorf(codes.FailedPrecondition, "error retrieving namespace %q; %v",
initMessage.Init.NamespaceId, err.Error())
}

if !hasAccess(stream.Context(), namespace) {
return status.Error(codes.PermissionDenied, "access denied")
}

taskRun, err := api.db.GetPipelineTaskRun(api.db, namespace, initMessage.Init.PipelineId,
initMessage.Init.RunId, initMessage.Init.Id)
if err != nil {
if errors.Is(err, storage.ErrEntityNotFound) {
return status.Error(codes.FailedPrecondition, "task run not found")
}
log.Error().Err(err).Msg("could not get task run")
return status.Error(codes.Internal, "failed to retrieve task run from database")
}

// Attempt to drop the user into a shell if the user hasn't entered any explicit command
cmd := []string{"sh"}
if len(initMessage.Init.Command) != 0 {
cmd = initMessage.Init.Command
}

// A channel to buffer the messages incoming from the container.
incomingMsgChannel := make(chan string)

// A general channel that means we should stop what we're doing and cleanly exit.
stopChan := make(chan struct{})

resp, err := api.scheduler.AttachContainer(scheduler.AttachContainerRequest{
ID: taskContainerID(namespace, taskRun.Pipeline, taskRun.Run, taskRun.ID),
Command: cmd,
})
if err != nil {
return status.Errorf(codes.Internal, "could not connect to specified container; %v", err)
}
defer resp.Conn.Close()

// Start a goroutine to receive incoming messages from the client and insert them into the container.
go func() {
for {
select {
case <-stopChan:
close(incomingMsgChannel)
return
case <-stream.Context().Done():
close(incomingMsgChannel)
return
case <-api.context.ctx.Done():
close(incomingMsgChannel)
return
default:
msgRaw, err := stream.Recv()
if err != nil {
// If the context was cancelled, that means that the client abandoned the connect; exit cleanly.
if strings.Contains(err.Error(), "context canceled") {
close(incomingMsgChannel)
return
}

// If the client disconnected, exit cleanly.
if strings.Contains(err.Error(), "client disconnected") {
close(incomingMsgChannel)
close(stopChan)
return
}

log.Error().Err(err).Msg("encountered error while streaming messages during task run attach")
close(incomingMsgChannel)
close(stopChan)
return
}

msg, ok := msgRaw.RequestType.(*proto.AttachToTaskRunRequest_Input)
if !ok {
log.Error().Msg("skipping incorrect message type encountered while streaming messages during task run attach")
continue
}

incomingMsgChannel <- msg.Input.Input
}
}
}()

taskRunCompletedEvents, err := api.events.Subscribe(events.EventTypeTaskRunCompleted)
if err != nil {
// We don't actually have to fail here since the worse that happens is that that user gets
// a confusing EOF error instead.if err != nil {
log.Error().Err(err).Str("namespace", namespace).
Str("pipeline", initMessage.Init.PipelineId).
Int64("run", initMessage.Init.RunId).
Str("task_run_id", initMessage.Init.Id).
Msg("could not listen for task run completed events")
} else {
go func() {
for {
select {
case <-stopChan:
return
case <-stream.Context().Done():
return
case <-api.context.ctx.Done():
return
case event := <-taskRunCompletedEvents.Events:
evt, ok := event.Details.(events.EventTaskRunCompleted)
if !ok {
continue
}

if evt.NamespaceID == namespace &&
evt.PipelineID == initMessage.Init.PipelineId &&
evt.RunID == initMessage.Init.RunId &&
evt.TaskRunID == initMessage.Init.Id {

close(stopChan)

log.Debug().Str("namespace", namespace).
Str("pipeline", initMessage.Init.PipelineId).
Int64("run", initMessage.Init.RunId).
Str("task_run_id", initMessage.Init.Id).Msg("closed task run attach connection due to task run being complete")
return
}
}
}
}()
}

// Start a goroutine to send messages that we get back from the container to the client.
// Unfortunately it's a known problem that this leaks goroutines because io.Reader doesn't have a close
// method.
//
// https://benjamincongdon.me/blog/2020/04/23/Cancelable-Reads-in-Go/
go func() {
scanner := bufio.NewScanner(resp.Reader)
scanner.Split(scanWordsWithWhitespace)

for scanner.Scan() {
select {
case <-stopChan:
return
case <-stream.Context().Done():
return
case <-api.context.ctx.Done():
return
default:
chunk := strings.ToValidUTF8(scanner.Text(), "")

err := stream.Send(&proto.AttachToTaskRunOutput{
Output: chunk,
})
if err != nil {
log.Error().Err(err).Str("last_line", chunk).
Msg("encountered error while sending messages to container during task run attach")
return
}
}
}

if err := scanner.Err(); err != nil {
if strings.Contains(err.Error(), "use of closed network connection") {
return
}
log.Error().Err(err).
Msg("encountered error while reading messages from container during task run attach")
return
}
}()

for {
select {
case <-stopChan:
return nil
case <-stream.Context().Done():
return nil
case <-api.context.ctx.Done():
return nil
case input := <-incomingMsgChannel:
_, err := resp.Conn.Write([]byte(input))
if err != nil {
log.Error().Err(err).Msg("encountered error while writing messages during task run attach")
return err
}
}
}
}

func (api *API) GetTaskRunLogs(request *proto.GetTaskRunLogsRequest, stream proto.Gofer_GetTaskRunLogsServer) error {
if request.Id == "" {
return status.Error(codes.FailedPrecondition, "id required")
Expand Down
Loading

0 comments on commit 24851d2

Please sign in to comment.