Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add transport implementation for agent #734

Merged
merged 1 commit into from
Jun 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ KUSTOMIZE := $(GO) run sigs.k8s.io/kustomize/kustomize/v4@v4.5
SETUP_ENVTEST := $(GO) run sigs.k8s.io/controller-runtime/tools/setup-envtest@v0.0.0-20220304125252-9ee63fc65a97
GOLANGCI_LINT := $(GO) run github.com/golangci/golangci-lint/cmd/golangci-lint@v1.52
YAMLFMT := $(GO) run github.com/google/yamlfmt/cmd/yamlfmt@v0.6
MOQ := $(GO) run github.com/matryer/moq@v0.3

# Installed tools
PROTOC_GEN_GO_GRPC := google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2
Expand Down Expand Up @@ -94,11 +95,18 @@ e2e-test: ## Run e2e tests
$(SETUP_ENVTEST) use
source <($(SETUP_ENVTEST) use -p env) && $(GO) test -v ./internal/e2e/... -tags=e2e

mocks:
$(MOQ) -fmt goimpots -rm -out ./internal/proto/workflow/v2/mock.go ./internal/proto/workflow/v2 WorkflowServiceClient WorkflowService_GetWorkflowsClient
$(MOQ) -fmt goimports -rm -out ./internal/agent/transport/mock.go ./internal/agent/transport WorkflowHandler
$(MOQ) -fmt goimports -rm -out ./internal/agent/mock.go ./internal/agent Transport ContainerRuntime
$(MOQ) -fmt goimports -rm -out ./internal/agent/event/mock.go ./internal/agent/event Recorder

.PHONY: generate-proto
generate-proto: buf.gen.yaml buf.lock $(shell git ls-files '**/*.proto') _protoc
$(BUF) mod update
$(BUF) generate
$(GOFUMPT) -w internal/proto/*.pb.*
$(GOFUMPT) -w internal/proto/workflow/v2/*.pb.*

.PHONY: generate
generate: generate-proto generate-go generate-manifests ## Generate code, manifests etc.
Expand Down Expand Up @@ -241,4 +249,4 @@ yamllint: $(YAMLLINT_BIN)
.PHONY: _protoc ## Install all required tools for use with this Makefile.
_protoc:
GOBIN=$${PWD}/bin $(GO) install $(PROTOC_GEN_GO)
GOBIN=$${PWD}/bin $(GO) install $(PROTOC_GEN_GO_GRPC)
GOBIN=$${PWD}/bin $(GO) install $(PROTOC_GEN_GO_GRPC)
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/go-logr/zerologr v1.2.3
github.com/google/go-cmp v0.5.9
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/kr/pretty v0.3.1
github.com/onsi/ginkgo/v2 v2.11.0
github.com/onsi/gomega v1.27.8
github.com/opencontainers/image-spec v1.1.0-rc.3
Expand Down Expand Up @@ -78,6 +79,7 @@ require (
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.12 // indirect
Expand All @@ -94,6 +96,7 @@ require (
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/procfs v0.10.1 // indirect
github.com/rogpeppe/go-internal v1.10.0 // indirect
github.com/spf13/afero v1.9.5 // indirect
github.com/spf13/cast v1.5.1 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,7 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
Expand Down Expand Up @@ -692,6 +693,7 @@ github.com/pelletier/go-toml/v2 v2.0.8/go.mod h1:vuYfssBdrU2XDZ9bYydBu6t+6a6PYNc
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc=
github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down Expand Up @@ -740,7 +742,9 @@ github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqn
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rs/zerolog v1.29.1 h1:cO+d60CHkknCbvzEWxP0S9K6KqyTjrCNUy1LdQLCGPc=
github.com/rs/zerolog v1.29.1/go.mod h1:Le6ESbR7hc+DP6Lt1THiV8CQSdkkNrd3R0XbEgp3ZBU=
Expand Down
82 changes: 71 additions & 11 deletions internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package agent
import (
"context"
"errors"
"sync"

"github.com/go-logr/logr"
"github.com/tinkerbell/tink/internal/agent/event"
Expand All @@ -27,7 +28,12 @@ type Agent struct {
// Runtime is the container runtime used to execute workflow actions.
Runtime ContainerRuntime

// sem ensure we handle a single workflow at a time.
sem chan struct{}

// executionContext tracks the currently executing workflow.
executionContext *executionContext
mtx sync.RWMutex
}

// Start finalizes the Agent configuration and starts the configured Transport so it is ready
Expand All @@ -43,40 +49,94 @@ func (agent *Agent) Start(ctx context.Context) error {
}

if agent.Runtime == nil {
//nolint:stylecheck // Specifying field on data structure
//nolint:stylecheck // Runtime is a field of agent.
return errors.New("Runtime field must be set before calling Start()")
}

if agent.Log.GetSink() == nil {
agent.Log = logr.Discard()
}

agent.Log = agent.Log.WithValues("agent_id", agent.ID)

// Initialize the semaphore and add a resource to it ensuring we can run 1 workflow at a time.
agent.sem = make(chan struct{}, 1)
agent.sem <- struct{}{}

agent.Log.Info("Starting agent")
return agent.Transport.Start(ctx, agent.ID, agent)
}

// HandleWorkflow satisfies transport.
func (agent *Agent) HandleWorkflow(ctx context.Context, wflw workflow.Workflow, events event.Recorder) error {
// sem isn't protected by a synchronization data structure so this is technically invoking
// undefined behavior - consider this a best effort to ensuring Start() has been called.
func (agent *Agent) HandleWorkflow(ctx context.Context, wflw workflow.Workflow, events event.Recorder) {
if agent.sem == nil {
return errors.New("agent must have Start() called before calling HandleWorkflow()")
agent.Log.Info("Agent must have Start() called before calling HandleWorkflow()")
}

select {
case <-agent.sem:
// Replenish the semaphore on exit so we can pick up another workflow.
defer func() { agent.sem <- struct{}{} }()
return agent.run(ctx, wflw, events)
// Ensure we configure the current workflow and cancellation func before we launch the
// goroutine to avoid a race with CancelWorkflow.
agent.mtx.Lock()
defer agent.mtx.Unlock()

ctx, cancel := context.WithCancel(ctx)
agent.executionContext = &executionContext{
Workflow: wflw,
Cancel: cancel,
}

go func() {
// Replenish the semaphore on exit so we can pick up another workflow.
defer func() { agent.sem <- struct{}{} }()

agent.run(ctx, wflw, events)

// Nilify the execution context after running so cancellation requests are ignored.
agent.mtx.Lock()
defer agent.mtx.Unlock()
agent.executionContext = nil
}()

default:
log := agent.Log.WithValues("workflow_id", wflw.ID)

reject := event.WorkflowRejected{
ID: wflw.ID,
Message: "workflow already in progress",
}
events.RecordEvent(ctx, reject)
return nil

if err := events.RecordEvent(ctx, reject); err != nil {
log.Error(err, "Failed to record workflow rejection event")
return
}

log.Info("Workflow already executing; dropping request")
}
}

func (agent *Agent) CancelWorkflow(workflowID string) {
agent.mtx.RLock()
defer agent.mtx.RUnlock()

if agent.executionContext == nil {
agent.Log.Info("No workflow running; ignoring cancellation request", "workflow_id", workflowID)
return
}

if agent.executionContext.Workflow.ID != workflowID {
jacobweinstock marked this conversation as resolved.
Show resolved Hide resolved
agent.Log.Info(
"Incorrect workflow ID in cancellation request; ignoring cancellation request",
"workflow_id", workflowID,
"running_workflow_id", agent.executionContext.Workflow.ID,
)
return
}

agent.Log.Info("Cancel workflow", "workflow_id", workflowID)
agent.executionContext.Cancel()
}

type executionContext struct {
Workflow workflow.Workflow
Cancel context.CancelFunc
}
Loading