From d85772619c7b741da41d42b86ff99656ee164e41 Mon Sep 17 00:00:00 2001 From: Chris Doherty Date: Sun, 14 May 2023 19:34:08 -0500 Subject: [PATCH] Add agent transport Signed-off-by: Chris Doherty --- Makefile | 10 +- go.mod | 3 + go.sum | 4 + internal/agent/agent.go | 82 +- internal/agent/agent_test.go | 104 +- internal/agent/event/action.go | 2 +- internal/agent/event/event.go | 2 +- internal/agent/event/fake.go | 2 +- internal/agent/event/mock.go | 81 ++ internal/agent/event/mocks.go | 177 --- internal/agent/{mocks.go => mock.go} | 24 +- internal/agent/run.go | 73 +- internal/agent/runtime/MACOS_TESTING.md | 9 + internal/agent/runtime/docker_test.go | 3 +- internal/agent/transport.go | 4 +- internal/agent/transport/fake.go | 9 +- internal/agent/transport/grpc.go | 170 +++ internal/agent/transport/grpc_test.go | 76 ++ internal/agent/transport/handler.go | 20 + internal/agent/transport/mock.go | 134 ++ internal/agent/workflow/handler.go | 15 - internal/proto/workflow/v2/mock.go | 440 ++++++ internal/proto/workflow/v2/workflow.pb.go | 1214 +++++++++++++++++ internal/proto/workflow/v2/workflow.proto | 117 ++ .../proto/workflow/v2/workflow_grpc.pb.go | 174 +++ 25 files changed, 2648 insertions(+), 301 deletions(-) create mode 100644 internal/agent/event/mock.go delete mode 100644 internal/agent/event/mocks.go rename internal/agent/{mocks.go => mock.go} (88%) create mode 100644 internal/agent/runtime/MACOS_TESTING.md create mode 100644 internal/agent/transport/grpc.go create mode 100644 internal/agent/transport/grpc_test.go create mode 100644 internal/agent/transport/handler.go create mode 100644 internal/agent/transport/mock.go delete mode 100644 internal/agent/workflow/handler.go create mode 100644 internal/proto/workflow/v2/mock.go create mode 100644 internal/proto/workflow/v2/workflow.pb.go create mode 100644 internal/proto/workflow/v2/workflow.proto create mode 100644 internal/proto/workflow/v2/workflow_grpc.pb.go diff --git a/Makefile b/Makefile index cf7c5fb79..c5a80329e 100644 --- a/Makefile +++ b/Makefile @@ -27,6 +27,7 @@ KUSTOMIZE := $(GO) run sigs.k8s.io/kustomize/kustomize/v4@v4.5 SETUP_ENVTEST := $(GO) run sigs.k8s.io/controller-runtime/tools/setup-envtest@v0.0.0-20220304125252-9ee63fc65a97 GOLANGCI_LINT := $(GO) run github.com/golangci/golangci-lint/cmd/golangci-lint@v1.52 YAMLFMT := $(GO) run github.com/google/yamlfmt/cmd/yamlfmt@v0.6 +MOQ := $(GO) run github.com/matryer/moq@v0.3 # Installed tools PROTOC_GEN_GO_GRPC := google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2 @@ -94,11 +95,18 @@ e2e-test: ## Run e2e tests $(SETUP_ENVTEST) use source <($(SETUP_ENVTEST) use -p env) && $(GO) test -v ./internal/e2e/... -tags=e2e +mocks: + $(MOQ) -fmt goimpots -rm -out ./internal/proto/workflow/v2/mock.go ./internal/proto/workflow/v2 WorkflowServiceClient WorkflowService_GetWorkflowsClient + $(MOQ) -fmt goimports -rm -out ./internal/agent/transport/mock.go ./internal/agent/transport WorkflowHandler + $(MOQ) -fmt goimports -rm -out ./internal/agent/mock.go ./internal/agent Transport ContainerRuntime + $(MOQ) -fmt goimports -rm -out ./internal/agent/event/mock.go ./internal/agent/event Recorder + .PHONY: generate-proto generate-proto: buf.gen.yaml buf.lock $(shell git ls-files '**/*.proto') _protoc $(BUF) mod update $(BUF) generate $(GOFUMPT) -w internal/proto/*.pb.* + $(GOFUMPT) -w internal/proto/workflow/v2/*.pb.* .PHONY: generate generate: generate-proto generate-go generate-manifests ## Generate code, manifests etc. @@ -241,4 +249,4 @@ yamllint: $(YAMLLINT_BIN) .PHONY: _protoc ## Install all required tools for use with this Makefile. _protoc: GOBIN=$${PWD}/bin $(GO) install $(PROTOC_GEN_GO) - GOBIN=$${PWD}/bin $(GO) install $(PROTOC_GEN_GO_GRPC) + GOBIN=$${PWD}/bin $(GO) install $(PROTOC_GEN_GO_GRPC) \ No newline at end of file diff --git a/go.mod b/go.mod index b2792228e..e495d4802 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/go-logr/zerologr v1.2.3 github.com/google/go-cmp v0.5.9 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 + github.com/kr/pretty v0.3.1 github.com/onsi/ginkgo/v2 v2.11.0 github.com/onsi/gomega v1.27.8 github.com/opencontainers/image-spec v1.1.0-rc.3 @@ -78,6 +79,7 @@ require ( github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/kr/text v0.2.0 // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-colorable v0.1.12 // indirect @@ -94,6 +96,7 @@ require ( github.com/prometheus/client_model v0.4.0 // indirect github.com/prometheus/common v0.42.0 // indirect github.com/prometheus/procfs v0.10.1 // indirect + github.com/rogpeppe/go-internal v1.10.0 // indirect github.com/spf13/afero v1.9.5 // indirect github.com/spf13/cast v1.5.1 // indirect github.com/spf13/jwalterweatherman v1.1.0 // indirect diff --git a/go.sum b/go.sum index 6738a3bed..aeadba560 100644 --- a/go.sum +++ b/go.sum @@ -600,6 +600,7 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= @@ -692,6 +693,7 @@ github.com/pelletier/go-toml/v2 v2.0.8/go.mod h1:vuYfssBdrU2XDZ9bYydBu6t+6a6PYNc github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc= github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -740,7 +742,9 @@ github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqn github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/rs/zerolog v1.29.1 h1:cO+d60CHkknCbvzEWxP0S9K6KqyTjrCNUy1LdQLCGPc= github.com/rs/zerolog v1.29.1/go.mod h1:Le6ESbR7hc+DP6Lt1THiV8CQSdkkNrd3R0XbEgp3ZBU= diff --git a/internal/agent/agent.go b/internal/agent/agent.go index b7fa59ebb..af49fd549 100644 --- a/internal/agent/agent.go +++ b/internal/agent/agent.go @@ -3,6 +3,7 @@ package agent import ( "context" "errors" + "sync" "github.com/go-logr/logr" "github.com/tinkerbell/tink/internal/agent/event" @@ -27,7 +28,12 @@ type Agent struct { // Runtime is the container runtime used to execute workflow actions. Runtime ContainerRuntime + // sem ensure we handle a single workflow at a time. sem chan struct{} + + // executionContext tracks the currently executing workflow. + executionContext *executionContext + mtx sync.RWMutex } // Start finalizes the Agent configuration and starts the configured Transport so it is ready @@ -43,40 +49,94 @@ func (agent *Agent) Start(ctx context.Context) error { } if agent.Runtime == nil { - //nolint:stylecheck // Specifying field on data structure + //nolint:stylecheck // Runtime is a field of agent. return errors.New("Runtime field must be set before calling Start()") } + if agent.Log.GetSink() == nil { + agent.Log = logr.Discard() + } + agent.Log = agent.Log.WithValues("agent_id", agent.ID) // Initialize the semaphore and add a resource to it ensuring we can run 1 workflow at a time. agent.sem = make(chan struct{}, 1) agent.sem <- struct{}{} - agent.Log.Info("Starting agent") return agent.Transport.Start(ctx, agent.ID, agent) } // HandleWorkflow satisfies transport. -func (agent *Agent) HandleWorkflow(ctx context.Context, wflw workflow.Workflow, events event.Recorder) error { - // sem isn't protected by a synchronization data structure so this is technically invoking - // undefined behavior - consider this a best effort to ensuring Start() has been called. +func (agent *Agent) HandleWorkflow(ctx context.Context, wflw workflow.Workflow, events event.Recorder) { if agent.sem == nil { - return errors.New("agent must have Start() called before calling HandleWorkflow()") + agent.Log.Info("Agent must have Start() called before calling HandleWorkflow()") } select { case <-agent.sem: - // Replenish the semaphore on exit so we can pick up another workflow. - defer func() { agent.sem <- struct{}{} }() - return agent.run(ctx, wflw, events) + // Ensure we configure the current workflow and cancellation func before we launch the + // goroutine to avoid a race with CancelWorkflow. + agent.mtx.Lock() + defer agent.mtx.Unlock() + + ctx, cancel := context.WithCancel(ctx) + agent.executionContext = &executionContext{ + Workflow: wflw, + Cancel: cancel, + } + + go func() { + // Replenish the semaphore on exit so we can pick up another workflow. + defer func() { agent.sem <- struct{}{} }() + + agent.run(ctx, wflw, events) + + // Nilify the execution context after running so cancellation requests are ignored. + agent.mtx.Lock() + defer agent.mtx.Unlock() + agent.executionContext = nil + }() default: + log := agent.Log.WithValues("workflow_id", wflw.ID) + reject := event.WorkflowRejected{ ID: wflw.ID, Message: "workflow already in progress", } - events.RecordEvent(ctx, reject) - return nil + + if err := events.RecordEvent(ctx, reject); err != nil { + log.Error(err, "Failed to record workflow rejection event") + return + } + + log.Info("Workflow already executing; dropping request") + } +} + +func (agent *Agent) CancelWorkflow(workflowID string) { + agent.mtx.RLock() + defer agent.mtx.RUnlock() + + if agent.executionContext == nil { + agent.Log.Info("No workflow running; ignoring cancellation request", "workflow_id", workflowID) + return } + + if agent.executionContext.Workflow.ID != workflowID { + agent.Log.Info( + "Incorrect workflow ID in cancellation request; ignoring cancellation request", + "workflow_id", workflowID, + "running_workflow_id", agent.executionContext.Workflow.ID, + ) + return + } + + agent.Log.Info("Cancel workflow", "workflow_id", workflowID) + agent.executionContext.Cancel() +} + +type executionContext struct { + Workflow workflow.Workflow + Cancel context.CancelFunc } diff --git a/internal/agent/agent_test.go b/internal/agent/agent_test.go index 6c412b3d4..83ddd6dfd 100644 --- a/internal/agent/agent_test.go +++ b/internal/agent/agent_test.go @@ -4,6 +4,7 @@ import ( "context" "strings" "testing" + "time" "github.com/go-logr/logr" "github.com/go-logr/zapr" @@ -20,12 +21,12 @@ import ( func TestAgent_InvalidStart(t *testing.T) { cases := []struct { Name string - Agent agent.Agent + Agent *agent.Agent Error string }{ { Name: "MissingAgentID", - Agent: agent.Agent{ + Agent: &agent.Agent{ Log: logr.Discard(), Transport: transport.Noop(), Runtime: runtime.Noop(), @@ -34,7 +35,7 @@ func TestAgent_InvalidStart(t *testing.T) { }, { Name: "MissingRuntime", - Agent: agent.Agent{ + Agent: &agent.Agent{ Log: logr.Discard(), ID: "1234", Transport: transport.Noop(), @@ -43,7 +44,7 @@ func TestAgent_InvalidStart(t *testing.T) { }, { Name: "MissingTransport", - Agent: agent.Agent{ + Agent: &agent.Agent{ Log: logr.Discard(), ID: "1234", Runtime: runtime.Noop(), @@ -52,13 +53,21 @@ func TestAgent_InvalidStart(t *testing.T) { }, { Name: "InitializedCorrectly", - Agent: agent.Agent{ + Agent: &agent.Agent{ Log: logr.Discard(), ID: "1234", Transport: transport.Noop(), Runtime: runtime.Noop(), }, }, + { + Name: "NoLogger", + Agent: &agent.Agent{ + ID: "1234", + Transport: transport.Noop(), + Runtime: runtime.Noop(), + }, + }, } for _, tc := range cases { @@ -76,16 +85,13 @@ func TestAgent_InvalidStart(t *testing.T) { } } +// The goal of this test is to ensure the agent rejects concurrent workflows. func TestAgent_ConcurrentWorkflows(t *testing.T) { - // The goal of this test is to ensure the agent rejects concurrent workflows. We have to - // build a valid agent because it will also reject calls to HandleWorkflow without first - // starting the Agent. - logger := zapr.NewLogger(zap.Must(zap.NewDevelopment())) - recorder := event.NoopRecorder() + trnport := transport.Noop() - wrkflow := workflow.Workflow{ + wflw := workflow.Workflow{ ID: "1234", Actions: []workflow.Action{ { @@ -96,14 +102,8 @@ func TestAgent_ConcurrentWorkflows(t *testing.T) { }, } - trnport := agent.TransportMock{ - StartFunc: func(ctx context.Context, agentID string, handler workflow.Handler) error { - return handler.HandleWorkflow(ctx, wrkflow, recorder) - }, - } - + // Started is used to indicate the runtime has received the workflow. started := make(chan struct{}) - rntime := agent.ContainerRuntimeMock{ RunFunc: func(ctx context.Context, action workflow.Action) error { started <- struct{}{} @@ -119,26 +119,28 @@ func TestAgent_ConcurrentWorkflows(t *testing.T) { ID: "1234", } - // Build a cancellable context so we can tear the goroutine down. + // HandleWorkflow will reject us if we haven't started the agent first. + if err := agnt.Start(context.Background()); err != nil { + t.Fatal(err) + } - errs := make(chan error) - ctx, cancel := context.WithCancel(context.Background()) + // Build a cancellable context so we can tear everything down. The timeout is guesswork but + // this test shouldn't take long. + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - go func() { errs <- agnt.Start(ctx) }() + // Handle the first workflow and wait for it to start. + agnt.HandleWorkflow(ctx, wflw, recorder) - // Await either an error or the mock runtime to tell us its stated. + // Wait for the container runtime to start. select { - case err := <-errs: - t.Fatalf("Unexpected error: %v", err) case <-started: + case <-ctx.Done(): + t.Fatal(ctx.Err()) } - // Attempt to fire off another workflow. - err := agnt.HandleWorkflow(context.Background(), wrkflow, recorder) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } + // Attempt to fire off a second workflow. + agnt.HandleWorkflow(ctx, wflw, recorder) // Ensure the latest event recorded is a event.WorkflowRejected. calls := recorder.RecordEventCalls() @@ -153,7 +155,7 @@ func TestAgent_ConcurrentWorkflows(t *testing.T) { } expectEvent := event.WorkflowRejected{ - ID: wrkflow.ID, + ID: wflw.ID, Message: "workflow already in progress", } if !cmp.Equal(expectEvent, ev) { @@ -407,13 +409,7 @@ message`, for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { - recorder := event.NoopRecorder() - - trnport := agent.TransportMock{ - StartFunc: func(ctx context.Context, agentID string, handler workflow.Handler) error { - return handler.HandleWorkflow(ctx, tc.Workflow, recorder) - }, - } + trnport := transport.Noop() rntime := agent.ContainerRuntimeMock{ RunFunc: func(ctx context.Context, action workflow.Action) error { @@ -424,18 +420,44 @@ message`, }, } + // The event recorder is what tells us the workflow has finished executing so we use it + // to check for the last expected action. + lastEventReceived := make(chan struct{}) + recorder := event.RecorderMock{ + RecordEventFunc: func(contextMoqParam context.Context, event event.Event) error { + if cmp.Equal(event, tc.Events[len(tc.Events)-1]) { + lastEventReceived <- struct{}{} + } + return nil + }, + } + + // Create and start the agent as start is a prereq to calling HandleWorkflow(). agnt := agent.Agent{ Log: logger, Transport: &trnport, Runtime: &rntime, ID: "1234", } - - err := agnt.Start(context.Background()) - if err != nil { + if err := agnt.Start(context.Background()); err != nil { t.Fatalf("Unexpected error: %v", err) } + // Configure a timeout of 5 seconds, this test shouldn't take long. + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // Handle the workflow + agnt.HandleWorkflow(ctx, tc.Workflow, &recorder) + + // Wait for the last expected event or timeout. + select { + case <-lastEventReceived: + case <-ctx.Done(): + t.Fatal(ctx.Err()) + } + + // Validate all events received are what we expected. calls := recorder.RecordEventCalls() if len(calls) != len(tc.Events) { t.Fatalf("Expected %v events; Received %v\n%+v", len(tc.Events), len(calls), calls) diff --git a/internal/agent/event/action.go b/internal/agent/event/action.go index 3f2df4f9a..6548bcf1f 100644 --- a/internal/agent/event/action.go +++ b/internal/agent/event/action.go @@ -49,5 +49,5 @@ func (ActionFailed) GetName() Name { } func (e ActionFailed) String() string { - return fmt.Sprintf("workflow=%v action=%v reason=%v", e.WorkflowID, e.ActionID, e.Reason) + return fmt.Sprintf("workflow='%v' action='%v' reason='%v'", e.WorkflowID, e.ActionID, e.Reason) } diff --git a/internal/agent/event/event.go b/internal/agent/event/event.go index a6448e1ad..3331beb34 100644 --- a/internal/agent/event/event.go +++ b/internal/agent/event/event.go @@ -18,5 +18,5 @@ type Event interface { // Recorder provides event recording methods. type Recorder interface { - RecordEvent(context.Context, Event) + RecordEvent(context.Context, Event) error } diff --git a/internal/agent/event/fake.go b/internal/agent/event/fake.go index 3aba2c7de..338602ef3 100644 --- a/internal/agent/event/fake.go +++ b/internal/agent/event/fake.go @@ -5,6 +5,6 @@ import "context" // NoopRecorder retrieves a nooping fake recorder. func NoopRecorder() *RecorderMock { return &RecorderMock{ - RecordEventFunc: func(context.Context, Event) {}, + RecordEventFunc: func(context.Context, Event) error { return nil }, } } diff --git a/internal/agent/event/mock.go b/internal/agent/event/mock.go new file mode 100644 index 000000000..82596ee0b --- /dev/null +++ b/internal/agent/event/mock.go @@ -0,0 +1,81 @@ +// Code generated by moq; DO NOT EDIT. +// github.com/matryer/moq + +package event + +import ( + "context" + "sync" +) + +// Ensure, that RecorderMock does implement Recorder. +// If this is not the case, regenerate this file with moq. +var _ Recorder = &RecorderMock{} + +// RecorderMock is a mock implementation of Recorder. +// +// func TestSomethingThatUsesRecorder(t *testing.T) { +// +// // make and configure a mocked Recorder +// mockedRecorder := &RecorderMock{ +// RecordEventFunc: func(contextMoqParam context.Context, event Event) error { +// panic("mock out the RecordEvent method") +// }, +// } +// +// // use mockedRecorder in code that requires Recorder +// // and then make assertions. +// +// } +type RecorderMock struct { + // RecordEventFunc mocks the RecordEvent method. + RecordEventFunc func(contextMoqParam context.Context, event Event) error + + // calls tracks calls to the methods. + calls struct { + // RecordEvent holds details about calls to the RecordEvent method. + RecordEvent []struct { + // ContextMoqParam is the contextMoqParam argument value. + ContextMoqParam context.Context + // Event is the event argument value. + Event Event + } + } + lockRecordEvent sync.RWMutex +} + +// RecordEvent calls RecordEventFunc. +func (mock *RecorderMock) RecordEvent(contextMoqParam context.Context, event Event) error { + if mock.RecordEventFunc == nil { + panic("RecorderMock.RecordEventFunc: method is nil but Recorder.RecordEvent was just called") + } + callInfo := struct { + ContextMoqParam context.Context + Event Event + }{ + ContextMoqParam: contextMoqParam, + Event: event, + } + mock.lockRecordEvent.Lock() + mock.calls.RecordEvent = append(mock.calls.RecordEvent, callInfo) + mock.lockRecordEvent.Unlock() + return mock.RecordEventFunc(contextMoqParam, event) +} + +// RecordEventCalls gets all the calls that were made to RecordEvent. +// Check the length with: +// +// len(mockedRecorder.RecordEventCalls()) +func (mock *RecorderMock) RecordEventCalls() []struct { + ContextMoqParam context.Context + Event Event +} { + var calls []struct { + ContextMoqParam context.Context + Event Event + } + mock.lockRecordEvent.RLock() + calls = mock.calls.RecordEvent + mock.lockRecordEvent.RUnlock() + return calls +} diff --git a/internal/agent/event/mocks.go b/internal/agent/event/mocks.go deleted file mode 100644 index b3ae81dc0..000000000 --- a/internal/agent/event/mocks.go +++ /dev/null @@ -1,177 +0,0 @@ -// Code generated by moq; DO NOT EDIT. -// github.com/matryer/moq - -package event - -import ( - "context" - "sync" -) - -// Ensure, that RecorderMock does implement Recorder. -// If this is not the case, regenerate this file with moq. -var _ Recorder = &RecorderMock{} - -// RecorderMock is a mock implementation of Recorder. -// -// func TestSomethingThatUsesRecorder(t *testing.T) { -// -// // make and configure a mocked Recorder -// mockedRecorder := &RecorderMock{ -// RecordEventFunc: func(contextMoqParam context.Context, event Event) { -// panic("mock out the RecordEvent method") -// }, -// } -// -// // use mockedRecorder in code that requires Recorder -// // and then make assertions. -// -// } -type RecorderMock struct { - // RecordEventFunc mocks the RecordEvent method. - RecordEventFunc func(contextMoqParam context.Context, event Event) - - // calls tracks calls to the methods. - calls struct { - // RecordEvent holds details about calls to the RecordEvent method. - RecordEvent []struct { - // ContextMoqParam is the contextMoqParam argument value. - ContextMoqParam context.Context - // Event is the event argument value. - Event Event - } - } - lockRecordEvent sync.RWMutex -} - -// RecordEvent calls RecordEventFunc. -func (mock *RecorderMock) RecordEvent(contextMoqParam context.Context, event Event) { - if mock.RecordEventFunc == nil { - panic("RecorderMock.RecordEventFunc: method is nil but Recorder.RecordEvent was just called") - } - callInfo := struct { - ContextMoqParam context.Context - Event Event - }{ - ContextMoqParam: contextMoqParam, - Event: event, - } - mock.lockRecordEvent.Lock() - mock.calls.RecordEvent = append(mock.calls.RecordEvent, callInfo) - mock.lockRecordEvent.Unlock() - mock.RecordEventFunc(contextMoqParam, event) -} - -// RecordEventCalls gets all the calls that were made to RecordEvent. -// Check the length with: -// -// len(mockedRecorder.RecordEventCalls()) -func (mock *RecorderMock) RecordEventCalls() []struct { - ContextMoqParam context.Context - Event Event -} { - var calls []struct { - ContextMoqParam context.Context - Event Event - } - mock.lockRecordEvent.RLock() - calls = mock.calls.RecordEvent - mock.lockRecordEvent.RUnlock() - return calls -} - -// Ensure, that EventMock does implement Event. -// If this is not the case, regenerate this file with moq. -var _ Event = &EventMock{} - -// EventMock is a mock implementation of Event. -// -// func TestSomethingThatUsesEvent(t *testing.T) { -// -// // make and configure a mocked Event -// mockedEvent := &EventMock{ -// GetNameFunc: func() Name { -// panic("mock out the GetName method") -// }, -// isEventFromThisPackageFunc: func() { -// panic("mock out the isEventFromThisPackage method") -// }, -// } -// -// // use mockedEvent in code that requires Event -// // and then make assertions. -// -// } -type EventMock struct { - // GetNameFunc mocks the GetName method. - GetNameFunc func() Name - - // isEventFromThisPackageFunc mocks the isEventFromThisPackage method. - isEventFromThisPackageFunc func() - - // calls tracks calls to the methods. - calls struct { - // GetName holds details about calls to the GetName method. - GetName []struct { - } - // isEventFromThisPackage holds details about calls to the isEventFromThisPackage method. - isEventFromThisPackage []struct { - } - } - lockGetName sync.RWMutex - lockisEventFromThisPackage sync.RWMutex -} - -// GetName calls GetNameFunc. -func (mock *EventMock) GetName() Name { - if mock.GetNameFunc == nil { - panic("EventMock.GetNameFunc: method is nil but Event.GetName was just called") - } - callInfo := struct { - }{} - mock.lockGetName.Lock() - mock.calls.GetName = append(mock.calls.GetName, callInfo) - mock.lockGetName.Unlock() - return mock.GetNameFunc() -} - -// GetNameCalls gets all the calls that were made to GetName. -// Check the length with: -// -// len(mockedEvent.GetNameCalls()) -func (mock *EventMock) GetNameCalls() []struct { -} { - var calls []struct { - } - mock.lockGetName.RLock() - calls = mock.calls.GetName - mock.lockGetName.RUnlock() - return calls -} - -// isEventFromThisPackage calls isEventFromThisPackageFunc. -func (mock *EventMock) isEventFromThisPackage() { - if mock.isEventFromThisPackageFunc == nil { - panic("EventMock.isEventFromThisPackageFunc: method is nil but Event.isEventFromThisPackage was just called") - } - callInfo := struct { - }{} - mock.lockisEventFromThisPackage.Lock() - mock.calls.isEventFromThisPackage = append(mock.calls.isEventFromThisPackage, callInfo) - mock.lockisEventFromThisPackage.Unlock() - mock.isEventFromThisPackageFunc() -} - -// isEventFromThisPackageCalls gets all the calls that were made to isEventFromThisPackage. -// Check the length with: -// -// len(mockedEvent.isEventFromThisPackageCalls()) -func (mock *EventMock) isEventFromThisPackageCalls() []struct { -} { - var calls []struct { - } - mock.lockisEventFromThisPackage.RLock() - calls = mock.calls.isEventFromThisPackage - mock.lockisEventFromThisPackage.RUnlock() - return calls -} diff --git a/internal/agent/mocks.go b/internal/agent/mock.go similarity index 88% rename from internal/agent/mocks.go rename to internal/agent/mock.go index f7b8ddb6d..c7d50b390 100644 --- a/internal/agent/mocks.go +++ b/internal/agent/mock.go @@ -5,8 +5,10 @@ package agent import ( "context" - "github.com/tinkerbell/tink/internal/agent/workflow" "sync" + + "github.com/tinkerbell/tink/internal/agent/transport" + "github.com/tinkerbell/tink/internal/agent/workflow" ) // Ensure, that TransportMock does implement Transport. @@ -19,7 +21,7 @@ var _ Transport = &TransportMock{} // // // make and configure a mocked Transport // mockedTransport := &TransportMock{ -// StartFunc: func(contextMoqParam context.Context, agentID string, handler workflow.Handler) error { +// StartFunc: func(contextMoqParam context.Context, agentID string, workflowHandler transport.WorkflowHandler) error { // panic("mock out the Start method") // }, // } @@ -30,7 +32,7 @@ var _ Transport = &TransportMock{} // } type TransportMock struct { // StartFunc mocks the Start method. - StartFunc func(contextMoqParam context.Context, agentID string, handler workflow.Handler) error + StartFunc func(contextMoqParam context.Context, agentID string, workflowHandler transport.WorkflowHandler) error // calls tracks calls to the methods. calls struct { @@ -40,31 +42,31 @@ type TransportMock struct { ContextMoqParam context.Context // AgentID is the agentID argument value. AgentID string - // Handler is the handler argument value. - Handler workflow.Handler + // WorkflowHandler is the workflowHandler argument value. + WorkflowHandler transport.WorkflowHandler } } lockStart sync.RWMutex } // Start calls StartFunc. -func (mock *TransportMock) Start(contextMoqParam context.Context, agentID string, handler workflow.Handler) error { +func (mock *TransportMock) Start(contextMoqParam context.Context, agentID string, workflowHandler transport.WorkflowHandler) error { if mock.StartFunc == nil { panic("TransportMock.StartFunc: method is nil but Transport.Start was just called") } callInfo := struct { ContextMoqParam context.Context AgentID string - Handler workflow.Handler + WorkflowHandler transport.WorkflowHandler }{ ContextMoqParam: contextMoqParam, AgentID: agentID, - Handler: handler, + WorkflowHandler: workflowHandler, } mock.lockStart.Lock() mock.calls.Start = append(mock.calls.Start, callInfo) mock.lockStart.Unlock() - return mock.StartFunc(contextMoqParam, agentID, handler) + return mock.StartFunc(contextMoqParam, agentID, workflowHandler) } // StartCalls gets all the calls that were made to Start. @@ -74,12 +76,12 @@ func (mock *TransportMock) Start(contextMoqParam context.Context, agentID string func (mock *TransportMock) StartCalls() []struct { ContextMoqParam context.Context AgentID string - Handler workflow.Handler + WorkflowHandler transport.WorkflowHandler } { var calls []struct { ContextMoqParam context.Context AgentID string - Handler workflow.Handler + WorkflowHandler transport.WorkflowHandler } mock.lockStart.RLock() calls = mock.calls.Start diff --git a/internal/agent/run.go b/internal/agent/run.go index 8d7afdf93..8543edf41 100644 --- a/internal/agent/run.go +++ b/internal/agent/run.go @@ -18,74 +18,81 @@ const ReasonRuntimeError = "RuntimeError" // ReasonInvalid indicates a reason provided by the runtime was invalid. const ReasonInvalid = "InvalidReason" -// Consistent logging keys. -const ( - logErrorKey = "error" - logReasonKey = "reason" -) - // validReasonRegex defines the regex for a valid action failure reason. var validReasonRegex = regexp.MustCompile(`^[a-zA-Z]+$`) -// run executes the workflow using the runtime configured on the Agent. -func (agent *Agent) run(ctx context.Context, wflw workflow.Workflow, events event.Recorder) error { - logger := agent.Log.WithValues("workflow", wflw) - logger.Info("Starting workflow") +// run executes the workflow using the runtime configured on agent. +func (agent *Agent) run(ctx context.Context, wflw workflow.Workflow, events event.Recorder) { + log := agent.Log.WithValues("workflow_id", wflw.ID) + + workflowStart := time.Now() + log.Info("Start workflow") for _, action := range wflw.Actions { - logger := logger.WithValues("action_id", action.ID, "action_name", action.Name) + log := log.WithValues("action_id", action.ID, "action_name", action.Name) - start := time.Now() - logger.Info("Starting action") + actionStart := time.Now() + log.Info("Start action") - events.RecordEvent(ctx, event.ActionStarted{ + started := event.ActionStarted{ ActionID: action.ID, WorkflowID: wflw.ID, - }) + } + if err := events.RecordEvent(ctx, started); err != nil { + log.Error(err, "Record action start event") + return + } if err := agent.Runtime.Run(ctx, action); err != nil { - reason := extractReason(logger, err) + reason := extractReason(log, err) // We consider newlines in the failure message invalid because it upsets formatting. // The failure message is vital to easy debugability so we force the string into // something we're happy with and communicate that. message := strings.ReplaceAll(err.Error(), "\n", `\n`) - logger.Info("Action failed - terminating workflow", - logErrorKey, err, - logReasonKey, reason, - "duration", time.Since(start).String(), + log.Info("Action failed; terminating workflow", + "error", err, + "reason", reason, + "duration", time.Since(actionStart).String(), ) - events.RecordEvent(ctx, event.ActionFailed{ + + failed := event.ActionFailed{ ActionID: action.ID, WorkflowID: wflw.ID, Reason: reason, Message: message, - }) - return nil + } + if err := events.RecordEvent(ctx, failed); err != nil { + log.Error(err, "Record failed action event", "event", failed) + } + + return } - events.RecordEvent(ctx, event.ActionSucceeded{ + succeed := event.ActionSucceeded{ ActionID: action.ID, WorkflowID: wflw.ID, - }) + } + if err := events.RecordEvent(ctx, succeed); err != nil { + log.Error(err, "Record succeeded action event") + return + } - logger.Info("Finished action", "duration", time.Since(start).String()) + log.Info("Finish action", "duration", time.Since(actionStart).String()) } - logger.Info("Finished workflow") - - return nil + log.Info("Finish workflow", "duration", time.Since(workflowStart).String()) } -func extractReason(logger logr.Logger, err error) string { +func extractReason(log logr.Logger, err error) string { reason := ReasonRuntimeError if r, ok := failure.Reason(err); ok { reason = r if !validReasonRegex.MatchString(reason) { - logger.Info( - "Received invalid reason for action failure; using InvalidReason instead", - logReasonKey, reason, + log.Info( + "Received invalid reason for action failure; using InvalidReason", + "invalid_reason", reason, ) reason = ReasonInvalid } diff --git a/internal/agent/runtime/MACOS_TESTING.md b/internal/agent/runtime/MACOS_TESTING.md new file mode 100644 index 000000000..1c6e304fc --- /dev/null +++ b/internal/agent/runtime/MACOS_TESTING.md @@ -0,0 +1,9 @@ +# MacOS Testing + +When developing on MacOS it may be necessary to create a symlink to `/var/run/docker.sock`. First, +validate `/var/run/docker.sock` does not exist. If it does not exist, verify the socket exists at +`$HOME/.docker/run/docker.sock` and create a symlink. + +``` +sudo ln -s $HOME/.docker/run/docker.sock /var/run/docker.sock +``` \ No newline at end of file diff --git a/internal/agent/runtime/docker_test.go b/internal/agent/runtime/docker_test.go index f7cae2dcc..4237d6cbe 100644 --- a/internal/agent/runtime/docker_test.go +++ b/internal/agent/runtime/docker_test.go @@ -22,11 +22,10 @@ import ( ) func TestDockerImageNotPresent(t *testing.T) { - clnt, err := client.NewClientWithOpts(client.FromEnv) + clnt, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) if err != nil { t.Fatalf("Received unexpected error: %v", err) } - clnt.NegotiateAPIVersion(context.Background()) image := "alpine" diff --git a/internal/agent/transport.go b/internal/agent/transport.go index 6aaf616a2..bf204f367 100644 --- a/internal/agent/transport.go +++ b/internal/agent/transport.go @@ -3,7 +3,7 @@ package agent import ( "context" - "github.com/tinkerbell/tink/internal/agent/workflow" + "github.com/tinkerbell/tink/internal/agent/transport" ) // Transport is a transport mechanism for communicating workflows to the agent. @@ -11,5 +11,5 @@ type Transport interface { // Start is a blocking call that starts the transport and begins retrieving workflows for the // given agentID. The transport should pass workflows to the Handler. The transport // should block until its told to cancel via the context. - Start(_ context.Context, agentID string, _ workflow.Handler) error + Start(_ context.Context, agentID string, _ transport.WorkflowHandler) error } diff --git a/internal/agent/transport/fake.go b/internal/agent/transport/fake.go index 1379c3b21..9e329910f 100644 --- a/internal/agent/transport/fake.go +++ b/internal/agent/transport/fake.go @@ -19,16 +19,15 @@ type Fake struct { Workflows []workflow.Workflow } -func (f Fake) Start(ctx context.Context, _ string, runner workflow.Handler) error { +func (f Fake) Start(ctx context.Context, _ string, handler WorkflowHandler) error { f.Log.Info("Starting fake transport") for _, w := range f.Workflows { - if err := runner.HandleWorkflow(ctx, w, f); err != nil { - f.Log.Error(err, "Running workflow", "workflow", w) - } + handler.HandleWorkflow(ctx, w, f) } return nil } -func (f Fake) RecordEvent(_ context.Context, e event.Event) { +func (f Fake) RecordEvent(_ context.Context, e event.Event) error { f.Log.Info("Recording event", "event", e.GetName()) + return nil } diff --git a/internal/agent/transport/grpc.go b/internal/agent/transport/grpc.go new file mode 100644 index 000000000..d0b2c8806 --- /dev/null +++ b/internal/agent/transport/grpc.go @@ -0,0 +1,170 @@ +package transport + +import ( + "context" + "errors" + "io" + + "github.com/avast/retry-go" + "github.com/go-logr/logr" + "github.com/tinkerbell/tink/internal/agent/event" + "github.com/tinkerbell/tink/internal/agent/workflow" + workflowproto "github.com/tinkerbell/tink/internal/proto/workflow/v2" +) + +var _ event.Recorder = &GRPC{} + +func NewGRPC(log logr.Logger, client workflowproto.WorkflowServiceClient) *GRPC { + return &GRPC{ + log: log, + client: client, + } +} + +type GRPC struct { + log logr.Logger + client workflowproto.WorkflowServiceClient +} + +func (g *GRPC) Start(ctx context.Context, agentID string, handler WorkflowHandler) error { + stream, err := g.client.GetWorkflows(ctx, &workflowproto.GetWorkflowsRequest{ + AgentId: agentID, + }) + if err != nil { + return err + } + + for { + request, err := stream.Recv() + switch { + case errors.Is(err, io.EOF): + return nil + case err != nil: + return err + } + + switch request.GetCmd().(type) { + case *workflowproto.GetWorkflowsResponse_StartWorkflow_: + grpcWorkflow := request.GetStartWorkflow().GetWorkflow() + + if err := validateGRPCWorkflow(grpcWorkflow); err != nil { + g.log.Info( + "Dropping request to start workflow; invalid payload", + "error", err, + "payload", grpcWorkflow, + ) + continue + } + + handler.HandleWorkflow(ctx, toWorkflow(grpcWorkflow), g) + + case *workflowproto.GetWorkflowsResponse_StopWorkflow_: + if request.GetStopWorkflow().WorkflowId == "" { + g.log.Info("Dropping request to cancel workflow; missing workflow ID") + continue + } + + handler.CancelWorkflow(request.GetStopWorkflow().WorkflowId) + } + } +} + +func (g *GRPC) RecordEvent(ctx context.Context, e event.Event) error { + evnt, err := toGRPC(e) + if err != nil { + return err + } + + publish := func() error { + payload := workflowproto.PublishEventRequest{Event: evnt} + if _, err := g.client.PublishEvent(ctx, &payload); err != nil { + return err + } + return nil + } + + return retry.Do(publish, retry.Attempts(5), retry.DelayType(retry.BackOffDelay)) +} + +func validateGRPCWorkflow(wflw *workflowproto.Workflow) error { + if wflw == nil { + return errors.New("workflow must not be nil") + } + + for _, action := range wflw.Actions { + if action == nil { + return errors.New("workflow actions must not be nil") + } + } + + return nil +} + +func toWorkflow(wflw *workflowproto.Workflow) workflow.Workflow { + return workflow.Workflow{ + ID: wflw.WorkflowId, + Actions: toActions(wflw.GetActions()), + } +} + +func toActions(a []*workflowproto.Workflow_Action) []workflow.Action { + var actions []workflow.Action + for _, action := range a { + actions = append(actions, workflow.Action{ + ID: action.GetId(), + Name: action.GetName(), + Image: action.GetImage(), + Cmd: action.GetCmd(), + Args: action.GetArgs(), + Env: action.GetEnv(), + Volumes: action.GetVolumes(), + NetworkNamespace: action.GetNetworkNamespace(), + }) + } + return actions +} + +func toGRPC(e event.Event) (*workflowproto.Event, error) { + switch v := e.(type) { + case event.ActionStarted: + return &workflowproto.Event{ + WorkflowId: v.WorkflowID, + Event: &workflowproto.Event_ActionStarted_{ + ActionStarted: &workflowproto.Event_ActionStarted{ + ActionId: v.ActionID, + }, + }, + }, nil + case event.ActionSucceeded: + return &workflowproto.Event{ + WorkflowId: v.WorkflowID, + Event: &workflowproto.Event_ActionSucceeded_{ + ActionSucceeded: &workflowproto.Event_ActionSucceeded{ + ActionId: v.ActionID, + }, + }, + }, nil + case event.ActionFailed: + return &workflowproto.Event{ + WorkflowId: v.WorkflowID, + Event: &workflowproto.Event_ActionFailed_{ + ActionFailed: &workflowproto.Event_ActionFailed{ + ActionId: v.ActionID, + FailureReason: &v.Reason, + FailureMessage: &v.Message, + }, + }, + }, nil + case event.WorkflowRejected: + return &workflowproto.Event{ + WorkflowId: v.ID, + Event: &workflowproto.Event_WorkflowRejected_{ + WorkflowRejected: &workflowproto.Event_WorkflowRejected{ + Message: v.Message, + }, + }, + }, nil + } + + return nil, event.IncompatibleError{Event: e} +} diff --git a/internal/agent/transport/grpc_test.go b/internal/agent/transport/grpc_test.go new file mode 100644 index 000000000..42200ee45 --- /dev/null +++ b/internal/agent/transport/grpc_test.go @@ -0,0 +1,76 @@ +package transport_test + +import ( + "context" + "fmt" + "io" + "sync" + "testing" + + "github.com/go-logr/zerologr" + "github.com/kr/pretty" + "github.com/rs/zerolog" + "github.com/tinkerbell/tink/internal/agent/event" + "github.com/tinkerbell/tink/internal/agent/transport" + "github.com/tinkerbell/tink/internal/agent/workflow" + workflowproto "github.com/tinkerbell/tink/internal/proto/workflow/v2" + "google.golang.org/grpc" +) + +func TestGRPC(t *testing.T) { + logger := zerolog.New(zerolog.NewConsoleWriter()) + type streamResponse struct { + Workflow *workflowproto.GetWorkflowsResponse + Error error + } + responses := make(chan streamResponse, 2) + responses <- streamResponse{ + Workflow: &workflowproto.GetWorkflowsResponse{ + Cmd: &workflowproto.GetWorkflowsResponse_StartWorkflow_{ + StartWorkflow: &workflowproto.GetWorkflowsResponse_StartWorkflow{ + Workflow: &workflowproto.Workflow{}, + }, + }, + }, + } + responses <- streamResponse{ + Error: io.EOF, + } + + stream := &workflowproto.WorkflowService_GetWorkflowsClientMock{ + RecvFunc: func() (*workflowproto.GetWorkflowsResponse, error) { + r, ok := <-responses + if !ok { + return nil, io.EOF + } + return r.Workflow, r.Error + }, + ContextFunc: context.Background, + } + client := &workflowproto.WorkflowServiceClientMock{ + GetWorkflowsFunc: func(ctx context.Context, in *workflowproto.GetWorkflowsRequest, opts ...grpc.CallOption) (workflowproto.WorkflowService_GetWorkflowsClient, error) { + return stream, nil + }, + } + + var wg sync.WaitGroup + wg.Add(1) + handler := &transport.WorkflowHandlerMock{ + HandleWorkflowFunc: func(contextMoqParam context.Context, workflow workflow.Workflow, recorder event.Recorder) { + defer wg.Done() + fmt.Println("handling") + close(responses) + }, + } + + g := transport.NewGRPC(zerologr.New(&logger), client) + + err := g.Start(context.Background(), "id", handler) + if err != nil { + t.Fatal(err) + } + + wg.Wait() + + pretty.Println(handler.HandleWorkflowCalls()) +} diff --git a/internal/agent/transport/handler.go b/internal/agent/transport/handler.go new file mode 100644 index 000000000..9bbf99f2c --- /dev/null +++ b/internal/agent/transport/handler.go @@ -0,0 +1,20 @@ +package transport + +import ( + "context" + + "github.com/tinkerbell/tink/internal/agent/event" + "github.com/tinkerbell/tink/internal/agent/workflow" +) + +// WorkflowHandler is responsible for workflow execution. +type WorkflowHandler interface { + // HandleWorkflow executes the given workflow. The event.Recorder can be used to publish events + // as the workflow transits its lifecycle. HandleWorkflow should not block and should be efficient + // in handing off workflow processing. + HandleWorkflow(context.Context, workflow.Workflow, event.Recorder) + + // CancelWorkflow cancels a workflow identified by workflowID. It should not block and should + // be efficient in handing off the cancellation request. + CancelWorkflow(workflowID string) +} diff --git a/internal/agent/transport/mock.go b/internal/agent/transport/mock.go new file mode 100644 index 000000000..e73ba35f4 --- /dev/null +++ b/internal/agent/transport/mock.go @@ -0,0 +1,134 @@ +// Code generated by moq; DO NOT EDIT. +// github.com/matryer/moq + +package transport + +import ( + "context" + "sync" + + "github.com/tinkerbell/tink/internal/agent/event" + "github.com/tinkerbell/tink/internal/agent/workflow" +) + +// Ensure, that WorkflowHandlerMock does implement WorkflowHandler. +// If this is not the case, regenerate this file with moq. +var _ WorkflowHandler = &WorkflowHandlerMock{} + +// WorkflowHandlerMock is a mock implementation of WorkflowHandler. +// +// func TestSomethingThatUsesWorkflowHandler(t *testing.T) { +// +// // make and configure a mocked WorkflowHandler +// mockedWorkflowHandler := &WorkflowHandlerMock{ +// CancelWorkflowFunc: func(workflowID string) { +// panic("mock out the CancelWorkflow method") +// }, +// HandleWorkflowFunc: func(contextMoqParam context.Context, workflowMoqParam workflow.Workflow, recorder event.Recorder) { +// panic("mock out the HandleWorkflow method") +// }, +// } +// +// // use mockedWorkflowHandler in code that requires WorkflowHandler +// // and then make assertions. +// +// } +type WorkflowHandlerMock struct { + // CancelWorkflowFunc mocks the CancelWorkflow method. + CancelWorkflowFunc func(workflowID string) + + // HandleWorkflowFunc mocks the HandleWorkflow method. + HandleWorkflowFunc func(contextMoqParam context.Context, workflowMoqParam workflow.Workflow, recorder event.Recorder) + + // calls tracks calls to the methods. + calls struct { + // CancelWorkflow holds details about calls to the CancelWorkflow method. + CancelWorkflow []struct { + // WorkflowID is the workflowID argument value. + WorkflowID string + } + // HandleWorkflow holds details about calls to the HandleWorkflow method. + HandleWorkflow []struct { + // ContextMoqParam is the contextMoqParam argument value. + ContextMoqParam context.Context + // WorkflowMoqParam is the workflowMoqParam argument value. + WorkflowMoqParam workflow.Workflow + // Recorder is the recorder argument value. + Recorder event.Recorder + } + } + lockCancelWorkflow sync.RWMutex + lockHandleWorkflow sync.RWMutex +} + +// CancelWorkflow calls CancelWorkflowFunc. +func (mock *WorkflowHandlerMock) CancelWorkflow(workflowID string) { + if mock.CancelWorkflowFunc == nil { + panic("WorkflowHandlerMock.CancelWorkflowFunc: method is nil but WorkflowHandler.CancelWorkflow was just called") + } + callInfo := struct { + WorkflowID string + }{ + WorkflowID: workflowID, + } + mock.lockCancelWorkflow.Lock() + mock.calls.CancelWorkflow = append(mock.calls.CancelWorkflow, callInfo) + mock.lockCancelWorkflow.Unlock() + mock.CancelWorkflowFunc(workflowID) +} + +// CancelWorkflowCalls gets all the calls that were made to CancelWorkflow. +// Check the length with: +// +// len(mockedWorkflowHandler.CancelWorkflowCalls()) +func (mock *WorkflowHandlerMock) CancelWorkflowCalls() []struct { + WorkflowID string +} { + var calls []struct { + WorkflowID string + } + mock.lockCancelWorkflow.RLock() + calls = mock.calls.CancelWorkflow + mock.lockCancelWorkflow.RUnlock() + return calls +} + +// HandleWorkflow calls HandleWorkflowFunc. +func (mock *WorkflowHandlerMock) HandleWorkflow(contextMoqParam context.Context, workflowMoqParam workflow.Workflow, recorder event.Recorder) { + if mock.HandleWorkflowFunc == nil { + panic("WorkflowHandlerMock.HandleWorkflowFunc: method is nil but WorkflowHandler.HandleWorkflow was just called") + } + callInfo := struct { + ContextMoqParam context.Context + WorkflowMoqParam workflow.Workflow + Recorder event.Recorder + }{ + ContextMoqParam: contextMoqParam, + WorkflowMoqParam: workflowMoqParam, + Recorder: recorder, + } + mock.lockHandleWorkflow.Lock() + mock.calls.HandleWorkflow = append(mock.calls.HandleWorkflow, callInfo) + mock.lockHandleWorkflow.Unlock() + mock.HandleWorkflowFunc(contextMoqParam, workflowMoqParam, recorder) +} + +// HandleWorkflowCalls gets all the calls that were made to HandleWorkflow. +// Check the length with: +// +// len(mockedWorkflowHandler.HandleWorkflowCalls()) +func (mock *WorkflowHandlerMock) HandleWorkflowCalls() []struct { + ContextMoqParam context.Context + WorkflowMoqParam workflow.Workflow + Recorder event.Recorder +} { + var calls []struct { + ContextMoqParam context.Context + WorkflowMoqParam workflow.Workflow + Recorder event.Recorder + } + mock.lockHandleWorkflow.RLock() + calls = mock.calls.HandleWorkflow + mock.lockHandleWorkflow.RUnlock() + return calls +} diff --git a/internal/agent/workflow/handler.go b/internal/agent/workflow/handler.go deleted file mode 100644 index e63f7c8e6..000000000 --- a/internal/agent/workflow/handler.go +++ /dev/null @@ -1,15 +0,0 @@ -package workflow - -import ( - "context" - - "github.com/tinkerbell/tink/internal/agent/event" -) - -// Handler is responsible for handling workflow execution. -type Handler interface { - // HandleWorkflow begins executing the given workflow. The event recorder can be used to - // indicate the progress of a workflow. If the given context becomes cancelled, the workflow - // handler should stop workflow execution. - HandleWorkflow(context.Context, Workflow, event.Recorder) error -} diff --git a/internal/proto/workflow/v2/mock.go b/internal/proto/workflow/v2/mock.go new file mode 100644 index 000000000..729c998b1 --- /dev/null +++ b/internal/proto/workflow/v2/mock.go @@ -0,0 +1,440 @@ +// Code generated by moq; DO NOT EDIT. +// github.com/matryer/moq + +package workflow + +import ( + context "context" + grpc "google.golang.org/grpc" + "google.golang.org/grpc/metadata" + sync "sync" +) + +// Ensure, that WorkflowServiceClientMock does implement WorkflowServiceClient. +// If this is not the case, regenerate this file with moq. +var _ WorkflowServiceClient = &WorkflowServiceClientMock{} + +// WorkflowServiceClientMock is a mock implementation of WorkflowServiceClient. +// +// func TestSomethingThatUsesWorkflowServiceClient(t *testing.T) { +// +// // make and configure a mocked WorkflowServiceClient +// mockedWorkflowServiceClient := &WorkflowServiceClientMock{ +// GetWorkflowsFunc: func(ctx context.Context, in *GetWorkflowsRequest, opts ...grpc.CallOption) (WorkflowService_GetWorkflowsClient, error) { +// panic("mock out the GetWorkflows method") +// }, +// PublishEventFunc: func(ctx context.Context, in *PublishEventRequest, opts ...grpc.CallOption) (*PublishEventResponse, error) { +// panic("mock out the PublishEvent method") +// }, +// } +// +// // use mockedWorkflowServiceClient in code that requires WorkflowServiceClient +// // and then make assertions. +// +// } +type WorkflowServiceClientMock struct { + // GetWorkflowsFunc mocks the GetWorkflows method. + GetWorkflowsFunc func(ctx context.Context, in *GetWorkflowsRequest, opts ...grpc.CallOption) (WorkflowService_GetWorkflowsClient, error) + + // PublishEventFunc mocks the PublishEvent method. + PublishEventFunc func(ctx context.Context, in *PublishEventRequest, opts ...grpc.CallOption) (*PublishEventResponse, error) + + // calls tracks calls to the methods. + calls struct { + // GetWorkflows holds details about calls to the GetWorkflows method. + GetWorkflows []struct { + // Ctx is the ctx argument value. + Ctx context.Context + // In is the in argument value. + In *GetWorkflowsRequest + // Opts is the opts argument value. + Opts []grpc.CallOption + } + // PublishEvent holds details about calls to the PublishEvent method. + PublishEvent []struct { + // Ctx is the ctx argument value. + Ctx context.Context + // In is the in argument value. + In *PublishEventRequest + // Opts is the opts argument value. + Opts []grpc.CallOption + } + } + lockGetWorkflows sync.RWMutex + lockPublishEvent sync.RWMutex +} + +// GetWorkflows calls GetWorkflowsFunc. +func (mock *WorkflowServiceClientMock) GetWorkflows(ctx context.Context, in *GetWorkflowsRequest, opts ...grpc.CallOption) (WorkflowService_GetWorkflowsClient, error) { + if mock.GetWorkflowsFunc == nil { + panic("WorkflowServiceClientMock.GetWorkflowsFunc: method is nil but WorkflowServiceClient.GetWorkflows was just called") + } + callInfo := struct { + Ctx context.Context + In *GetWorkflowsRequest + Opts []grpc.CallOption + }{ + Ctx: ctx, + In: in, + Opts: opts, + } + mock.lockGetWorkflows.Lock() + mock.calls.GetWorkflows = append(mock.calls.GetWorkflows, callInfo) + mock.lockGetWorkflows.Unlock() + return mock.GetWorkflowsFunc(ctx, in, opts...) +} + +// GetWorkflowsCalls gets all the calls that were made to GetWorkflows. +// Check the length with: +// +// len(mockedWorkflowServiceClient.GetWorkflowsCalls()) +func (mock *WorkflowServiceClientMock) GetWorkflowsCalls() []struct { + Ctx context.Context + In *GetWorkflowsRequest + Opts []grpc.CallOption +} { + var calls []struct { + Ctx context.Context + In *GetWorkflowsRequest + Opts []grpc.CallOption + } + mock.lockGetWorkflows.RLock() + calls = mock.calls.GetWorkflows + mock.lockGetWorkflows.RUnlock() + return calls +} + +// PublishEvent calls PublishEventFunc. +func (mock *WorkflowServiceClientMock) PublishEvent(ctx context.Context, in *PublishEventRequest, opts ...grpc.CallOption) (*PublishEventResponse, error) { + if mock.PublishEventFunc == nil { + panic("WorkflowServiceClientMock.PublishEventFunc: method is nil but WorkflowServiceClient.PublishEvent was just called") + } + callInfo := struct { + Ctx context.Context + In *PublishEventRequest + Opts []grpc.CallOption + }{ + Ctx: ctx, + In: in, + Opts: opts, + } + mock.lockPublishEvent.Lock() + mock.calls.PublishEvent = append(mock.calls.PublishEvent, callInfo) + mock.lockPublishEvent.Unlock() + return mock.PublishEventFunc(ctx, in, opts...) +} + +// PublishEventCalls gets all the calls that were made to PublishEvent. +// Check the length with: +// +// len(mockedWorkflowServiceClient.PublishEventCalls()) +func (mock *WorkflowServiceClientMock) PublishEventCalls() []struct { + Ctx context.Context + In *PublishEventRequest + Opts []grpc.CallOption +} { + var calls []struct { + Ctx context.Context + In *PublishEventRequest + Opts []grpc.CallOption + } + mock.lockPublishEvent.RLock() + calls = mock.calls.PublishEvent + mock.lockPublishEvent.RUnlock() + return calls +} + +// Ensure, that WorkflowService_GetWorkflowsClientMock does implement WorkflowService_GetWorkflowsClient. +// If this is not the case, regenerate this file with moq. +var _ WorkflowService_GetWorkflowsClient = &WorkflowService_GetWorkflowsClientMock{} + +// WorkflowService_GetWorkflowsClientMock is a mock implementation of WorkflowService_GetWorkflowsClient. +// +// func TestSomethingThatUsesWorkflowService_GetWorkflowsClient(t *testing.T) { +// +// // make and configure a mocked WorkflowService_GetWorkflowsClient +// mockedWorkflowService_GetWorkflowsClient := &WorkflowService_GetWorkflowsClientMock{ +// CloseSendFunc: func() error { +// panic("mock out the CloseSend method") +// }, +// ContextFunc: func() context.Context { +// panic("mock out the Context method") +// }, +// HeaderFunc: func() (metadata.MD, error) { +// panic("mock out the Header method") +// }, +// RecvFunc: func() (*GetWorkflowsResponse, error) { +// panic("mock out the Recv method") +// }, +// RecvMsgFunc: func(m interface{}) error { +// panic("mock out the RecvMsg method") +// }, +// SendMsgFunc: func(m interface{}) error { +// panic("mock out the SendMsg method") +// }, +// TrailerFunc: func() metadata.MD { +// panic("mock out the Trailer method") +// }, +// } +// +// // use mockedWorkflowService_GetWorkflowsClient in code that requires WorkflowService_GetWorkflowsClient +// // and then make assertions. +// +// } +type WorkflowService_GetWorkflowsClientMock struct { + // CloseSendFunc mocks the CloseSend method. + CloseSendFunc func() error + + // ContextFunc mocks the Context method. + ContextFunc func() context.Context + + // HeaderFunc mocks the Header method. + HeaderFunc func() (metadata.MD, error) + + // RecvFunc mocks the Recv method. + RecvFunc func() (*GetWorkflowsResponse, error) + + // RecvMsgFunc mocks the RecvMsg method. + RecvMsgFunc func(m interface{}) error + + // SendMsgFunc mocks the SendMsg method. + SendMsgFunc func(m interface{}) error + + // TrailerFunc mocks the Trailer method. + TrailerFunc func() metadata.MD + + // calls tracks calls to the methods. + calls struct { + // CloseSend holds details about calls to the CloseSend method. + CloseSend []struct { + } + // Context holds details about calls to the Context method. + Context []struct { + } + // Header holds details about calls to the Header method. + Header []struct { + } + // Recv holds details about calls to the Recv method. + Recv []struct { + } + // RecvMsg holds details about calls to the RecvMsg method. + RecvMsg []struct { + // M is the m argument value. + M interface{} + } + // SendMsg holds details about calls to the SendMsg method. + SendMsg []struct { + // M is the m argument value. + M interface{} + } + // Trailer holds details about calls to the Trailer method. + Trailer []struct { + } + } + lockCloseSend sync.RWMutex + lockContext sync.RWMutex + lockHeader sync.RWMutex + lockRecv sync.RWMutex + lockRecvMsg sync.RWMutex + lockSendMsg sync.RWMutex + lockTrailer sync.RWMutex +} + +// CloseSend calls CloseSendFunc. +func (mock *WorkflowService_GetWorkflowsClientMock) CloseSend() error { + if mock.CloseSendFunc == nil { + panic("WorkflowService_GetWorkflowsClientMock.CloseSendFunc: method is nil but WorkflowService_GetWorkflowsClient.CloseSend was just called") + } + callInfo := struct { + }{} + mock.lockCloseSend.Lock() + mock.calls.CloseSend = append(mock.calls.CloseSend, callInfo) + mock.lockCloseSend.Unlock() + return mock.CloseSendFunc() +} + +// CloseSendCalls gets all the calls that were made to CloseSend. +// Check the length with: +// +// len(mockedWorkflowService_GetWorkflowsClient.CloseSendCalls()) +func (mock *WorkflowService_GetWorkflowsClientMock) CloseSendCalls() []struct { +} { + var calls []struct { + } + mock.lockCloseSend.RLock() + calls = mock.calls.CloseSend + mock.lockCloseSend.RUnlock() + return calls +} + +// Context calls ContextFunc. +func (mock *WorkflowService_GetWorkflowsClientMock) Context() context.Context { + if mock.ContextFunc == nil { + panic("WorkflowService_GetWorkflowsClientMock.ContextFunc: method is nil but WorkflowService_GetWorkflowsClient.Context was just called") + } + callInfo := struct { + }{} + mock.lockContext.Lock() + mock.calls.Context = append(mock.calls.Context, callInfo) + mock.lockContext.Unlock() + return mock.ContextFunc() +} + +// ContextCalls gets all the calls that were made to Context. +// Check the length with: +// +// len(mockedWorkflowService_GetWorkflowsClient.ContextCalls()) +func (mock *WorkflowService_GetWorkflowsClientMock) ContextCalls() []struct { +} { + var calls []struct { + } + mock.lockContext.RLock() + calls = mock.calls.Context + mock.lockContext.RUnlock() + return calls +} + +// Header calls HeaderFunc. +func (mock *WorkflowService_GetWorkflowsClientMock) Header() (metadata.MD, error) { + if mock.HeaderFunc == nil { + panic("WorkflowService_GetWorkflowsClientMock.HeaderFunc: method is nil but WorkflowService_GetWorkflowsClient.Header was just called") + } + callInfo := struct { + }{} + mock.lockHeader.Lock() + mock.calls.Header = append(mock.calls.Header, callInfo) + mock.lockHeader.Unlock() + return mock.HeaderFunc() +} + +// HeaderCalls gets all the calls that were made to Header. +// Check the length with: +// +// len(mockedWorkflowService_GetWorkflowsClient.HeaderCalls()) +func (mock *WorkflowService_GetWorkflowsClientMock) HeaderCalls() []struct { +} { + var calls []struct { + } + mock.lockHeader.RLock() + calls = mock.calls.Header + mock.lockHeader.RUnlock() + return calls +} + +// Recv calls RecvFunc. +func (mock *WorkflowService_GetWorkflowsClientMock) Recv() (*GetWorkflowsResponse, error) { + if mock.RecvFunc == nil { + panic("WorkflowService_GetWorkflowsClientMock.RecvFunc: method is nil but WorkflowService_GetWorkflowsClient.Recv was just called") + } + callInfo := struct { + }{} + mock.lockRecv.Lock() + mock.calls.Recv = append(mock.calls.Recv, callInfo) + mock.lockRecv.Unlock() + return mock.RecvFunc() +} + +// RecvCalls gets all the calls that were made to Recv. +// Check the length with: +// +// len(mockedWorkflowService_GetWorkflowsClient.RecvCalls()) +func (mock *WorkflowService_GetWorkflowsClientMock) RecvCalls() []struct { +} { + var calls []struct { + } + mock.lockRecv.RLock() + calls = mock.calls.Recv + mock.lockRecv.RUnlock() + return calls +} + +// RecvMsg calls RecvMsgFunc. +func (mock *WorkflowService_GetWorkflowsClientMock) RecvMsg(m interface{}) error { + if mock.RecvMsgFunc == nil { + panic("WorkflowService_GetWorkflowsClientMock.RecvMsgFunc: method is nil but WorkflowService_GetWorkflowsClient.RecvMsg was just called") + } + callInfo := struct { + M interface{} + }{ + M: m, + } + mock.lockRecvMsg.Lock() + mock.calls.RecvMsg = append(mock.calls.RecvMsg, callInfo) + mock.lockRecvMsg.Unlock() + return mock.RecvMsgFunc(m) +} + +// RecvMsgCalls gets all the calls that were made to RecvMsg. +// Check the length with: +// +// len(mockedWorkflowService_GetWorkflowsClient.RecvMsgCalls()) +func (mock *WorkflowService_GetWorkflowsClientMock) RecvMsgCalls() []struct { + M interface{} +} { + var calls []struct { + M interface{} + } + mock.lockRecvMsg.RLock() + calls = mock.calls.RecvMsg + mock.lockRecvMsg.RUnlock() + return calls +} + +// SendMsg calls SendMsgFunc. +func (mock *WorkflowService_GetWorkflowsClientMock) SendMsg(m interface{}) error { + if mock.SendMsgFunc == nil { + panic("WorkflowService_GetWorkflowsClientMock.SendMsgFunc: method is nil but WorkflowService_GetWorkflowsClient.SendMsg was just called") + } + callInfo := struct { + M interface{} + }{ + M: m, + } + mock.lockSendMsg.Lock() + mock.calls.SendMsg = append(mock.calls.SendMsg, callInfo) + mock.lockSendMsg.Unlock() + return mock.SendMsgFunc(m) +} + +// SendMsgCalls gets all the calls that were made to SendMsg. +// Check the length with: +// +// len(mockedWorkflowService_GetWorkflowsClient.SendMsgCalls()) +func (mock *WorkflowService_GetWorkflowsClientMock) SendMsgCalls() []struct { + M interface{} +} { + var calls []struct { + M interface{} + } + mock.lockSendMsg.RLock() + calls = mock.calls.SendMsg + mock.lockSendMsg.RUnlock() + return calls +} + +// Trailer calls TrailerFunc. +func (mock *WorkflowService_GetWorkflowsClientMock) Trailer() metadata.MD { + if mock.TrailerFunc == nil { + panic("WorkflowService_GetWorkflowsClientMock.TrailerFunc: method is nil but WorkflowService_GetWorkflowsClient.Trailer was just called") + } + callInfo := struct { + }{} + mock.lockTrailer.Lock() + mock.calls.Trailer = append(mock.calls.Trailer, callInfo) + mock.lockTrailer.Unlock() + return mock.TrailerFunc() +} + +// TrailerCalls gets all the calls that were made to Trailer. +// Check the length with: +// +// len(mockedWorkflowService_GetWorkflowsClient.TrailerCalls()) +func (mock *WorkflowService_GetWorkflowsClientMock) TrailerCalls() []struct { +} { + var calls []struct { + } + mock.lockTrailer.RLock() + calls = mock.calls.Trailer + mock.lockTrailer.RUnlock() + return calls +} diff --git a/internal/proto/workflow/v2/workflow.pb.go b/internal/proto/workflow/v2/workflow.pb.go new file mode 100644 index 000000000..3f0a46e04 --- /dev/null +++ b/internal/proto/workflow/v2/workflow.pb.go @@ -0,0 +1,1214 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.1 +// protoc (unknown) +// source: internal/proto/workflow/v2/workflow.proto + +package workflow + +import ( + reflect "reflect" + sync "sync" + + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type GetWorkflowsRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + AgentId string `protobuf:"bytes,1,opt,name=agent_id,json=agentId,proto3" json:"agent_id,omitempty"` +} + +func (x *GetWorkflowsRequest) Reset() { + *x = GetWorkflowsRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_internal_proto_workflow_v2_workflow_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetWorkflowsRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetWorkflowsRequest) ProtoMessage() {} + +func (x *GetWorkflowsRequest) ProtoReflect() protoreflect.Message { + mi := &file_internal_proto_workflow_v2_workflow_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetWorkflowsRequest.ProtoReflect.Descriptor instead. +func (*GetWorkflowsRequest) Descriptor() ([]byte, []int) { + return file_internal_proto_workflow_v2_workflow_proto_rawDescGZIP(), []int{0} +} + +func (x *GetWorkflowsRequest) GetAgentId() string { + if x != nil { + return x.AgentId + } + return "" +} + +type GetWorkflowsResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Types that are assignable to Cmd: + // + // *GetWorkflowsResponse_StartWorkflow_ + // *GetWorkflowsResponse_StopWorkflow_ + Cmd isGetWorkflowsResponse_Cmd `protobuf_oneof:"cmd"` +} + +func (x *GetWorkflowsResponse) Reset() { + *x = GetWorkflowsResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_internal_proto_workflow_v2_workflow_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetWorkflowsResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetWorkflowsResponse) ProtoMessage() {} + +func (x *GetWorkflowsResponse) ProtoReflect() protoreflect.Message { + mi := &file_internal_proto_workflow_v2_workflow_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetWorkflowsResponse.ProtoReflect.Descriptor instead. +func (*GetWorkflowsResponse) Descriptor() ([]byte, []int) { + return file_internal_proto_workflow_v2_workflow_proto_rawDescGZIP(), []int{1} +} + +func (m *GetWorkflowsResponse) GetCmd() isGetWorkflowsResponse_Cmd { + if m != nil { + return m.Cmd + } + return nil +} + +func (x *GetWorkflowsResponse) GetStartWorkflow() *GetWorkflowsResponse_StartWorkflow { + if x, ok := x.GetCmd().(*GetWorkflowsResponse_StartWorkflow_); ok { + return x.StartWorkflow + } + return nil +} + +func (x *GetWorkflowsResponse) GetStopWorkflow() *GetWorkflowsResponse_StopWorkflow { + if x, ok := x.GetCmd().(*GetWorkflowsResponse_StopWorkflow_); ok { + return x.StopWorkflow + } + return nil +} + +type isGetWorkflowsResponse_Cmd interface { + isGetWorkflowsResponse_Cmd() +} + +type GetWorkflowsResponse_StartWorkflow_ struct { + StartWorkflow *GetWorkflowsResponse_StartWorkflow `protobuf:"bytes,1,opt,name=start_workflow,json=startWorkflow,proto3,oneof"` +} + +type GetWorkflowsResponse_StopWorkflow_ struct { + StopWorkflow *GetWorkflowsResponse_StopWorkflow `protobuf:"bytes,2,opt,name=stop_workflow,json=stopWorkflow,proto3,oneof"` +} + +func (*GetWorkflowsResponse_StartWorkflow_) isGetWorkflowsResponse_Cmd() {} + +func (*GetWorkflowsResponse_StopWorkflow_) isGetWorkflowsResponse_Cmd() {} + +type PublishEventRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Event *Event `protobuf:"bytes,1,opt,name=event,proto3" json:"event,omitempty"` +} + +func (x *PublishEventRequest) Reset() { + *x = PublishEventRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_internal_proto_workflow_v2_workflow_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PublishEventRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PublishEventRequest) ProtoMessage() {} + +func (x *PublishEventRequest) ProtoReflect() protoreflect.Message { + mi := &file_internal_proto_workflow_v2_workflow_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PublishEventRequest.ProtoReflect.Descriptor instead. +func (*PublishEventRequest) Descriptor() ([]byte, []int) { + return file_internal_proto_workflow_v2_workflow_proto_rawDescGZIP(), []int{2} +} + +func (x *PublishEventRequest) GetEvent() *Event { + if x != nil { + return x.Event + } + return nil +} + +type PublishEventResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *PublishEventResponse) Reset() { + *x = PublishEventResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_internal_proto_workflow_v2_workflow_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PublishEventResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PublishEventResponse) ProtoMessage() {} + +func (x *PublishEventResponse) ProtoReflect() protoreflect.Message { + mi := &file_internal_proto_workflow_v2_workflow_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PublishEventResponse.ProtoReflect.Descriptor instead. +func (*PublishEventResponse) Descriptor() ([]byte, []int) { + return file_internal_proto_workflow_v2_workflow_proto_rawDescGZIP(), []int{3} +} + +type Workflow struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // A unique identifier for a workflow. + WorkflowId string `protobuf:"bytes,1,opt,name=workflow_id,json=workflowId,proto3" json:"workflow_id,omitempty"` + // The actions that make up the workflow. + Actions []*Workflow_Action `protobuf:"bytes,2,rep,name=actions,proto3" json:"actions,omitempty"` +} + +func (x *Workflow) Reset() { + *x = Workflow{} + if protoimpl.UnsafeEnabled { + mi := &file_internal_proto_workflow_v2_workflow_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Workflow) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Workflow) ProtoMessage() {} + +func (x *Workflow) ProtoReflect() protoreflect.Message { + mi := &file_internal_proto_workflow_v2_workflow_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Workflow.ProtoReflect.Descriptor instead. +func (*Workflow) Descriptor() ([]byte, []int) { + return file_internal_proto_workflow_v2_workflow_proto_rawDescGZIP(), []int{4} +} + +func (x *Workflow) GetWorkflowId() string { + if x != nil { + return x.WorkflowId + } + return "" +} + +func (x *Workflow) GetActions() []*Workflow_Action { + if x != nil { + return x.Actions + } + return nil +} + +type Event struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // A unique identifier for a workflow. + WorkflowId string `protobuf:"bytes,1,opt,name=workflow_id,json=workflowId,proto3" json:"workflow_id,omitempty"` + // Types that are assignable to Event: + // + // *Event_ActionStarted_ + // *Event_ActionSucceeded_ + // *Event_ActionFailed_ + // *Event_WorkflowRejected_ + Event isEvent_Event `protobuf_oneof:"event"` +} + +func (x *Event) Reset() { + *x = Event{} + if protoimpl.UnsafeEnabled { + mi := &file_internal_proto_workflow_v2_workflow_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Event) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Event) ProtoMessage() {} + +func (x *Event) ProtoReflect() protoreflect.Message { + mi := &file_internal_proto_workflow_v2_workflow_proto_msgTypes[5] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Event.ProtoReflect.Descriptor instead. +func (*Event) Descriptor() ([]byte, []int) { + return file_internal_proto_workflow_v2_workflow_proto_rawDescGZIP(), []int{5} +} + +func (x *Event) GetWorkflowId() string { + if x != nil { + return x.WorkflowId + } + return "" +} + +func (m *Event) GetEvent() isEvent_Event { + if m != nil { + return m.Event + } + return nil +} + +func (x *Event) GetActionStarted() *Event_ActionStarted { + if x, ok := x.GetEvent().(*Event_ActionStarted_); ok { + return x.ActionStarted + } + return nil +} + +func (x *Event) GetActionSucceeded() *Event_ActionSucceeded { + if x, ok := x.GetEvent().(*Event_ActionSucceeded_); ok { + return x.ActionSucceeded + } + return nil +} + +func (x *Event) GetActionFailed() *Event_ActionFailed { + if x, ok := x.GetEvent().(*Event_ActionFailed_); ok { + return x.ActionFailed + } + return nil +} + +func (x *Event) GetWorkflowRejected() *Event_WorkflowRejected { + if x, ok := x.GetEvent().(*Event_WorkflowRejected_); ok { + return x.WorkflowRejected + } + return nil +} + +type isEvent_Event interface { + isEvent_Event() +} + +type Event_ActionStarted_ struct { + ActionStarted *Event_ActionStarted `protobuf:"bytes,2,opt,name=action_started,json=actionStarted,proto3,oneof"` +} + +type Event_ActionSucceeded_ struct { + ActionSucceeded *Event_ActionSucceeded `protobuf:"bytes,3,opt,name=action_succeeded,json=actionSucceeded,proto3,oneof"` +} + +type Event_ActionFailed_ struct { + ActionFailed *Event_ActionFailed `protobuf:"bytes,4,opt,name=action_failed,json=actionFailed,proto3,oneof"` +} + +type Event_WorkflowRejected_ struct { + WorkflowRejected *Event_WorkflowRejected `protobuf:"bytes,5,opt,name=workflow_rejected,json=workflowRejected,proto3,oneof"` +} + +func (*Event_ActionStarted_) isEvent_Event() {} + +func (*Event_ActionSucceeded_) isEvent_Event() {} + +func (*Event_ActionFailed_) isEvent_Event() {} + +func (*Event_WorkflowRejected_) isEvent_Event() {} + +type GetWorkflowsResponse_StartWorkflow struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Workflow *Workflow `protobuf:"bytes,1,opt,name=workflow,proto3" json:"workflow,omitempty"` +} + +func (x *GetWorkflowsResponse_StartWorkflow) Reset() { + *x = GetWorkflowsResponse_StartWorkflow{} + if protoimpl.UnsafeEnabled { + mi := &file_internal_proto_workflow_v2_workflow_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetWorkflowsResponse_StartWorkflow) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetWorkflowsResponse_StartWorkflow) ProtoMessage() {} + +func (x *GetWorkflowsResponse_StartWorkflow) ProtoReflect() protoreflect.Message { + mi := &file_internal_proto_workflow_v2_workflow_proto_msgTypes[6] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetWorkflowsResponse_StartWorkflow.ProtoReflect.Descriptor instead. +func (*GetWorkflowsResponse_StartWorkflow) Descriptor() ([]byte, []int) { + return file_internal_proto_workflow_v2_workflow_proto_rawDescGZIP(), []int{1, 0} +} + +func (x *GetWorkflowsResponse_StartWorkflow) GetWorkflow() *Workflow { + if x != nil { + return x.Workflow + } + return nil +} + +type GetWorkflowsResponse_StopWorkflow struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + WorkflowId string `protobuf:"bytes,1,opt,name=workflow_id,json=workflowId,proto3" json:"workflow_id,omitempty"` +} + +func (x *GetWorkflowsResponse_StopWorkflow) Reset() { + *x = GetWorkflowsResponse_StopWorkflow{} + if protoimpl.UnsafeEnabled { + mi := &file_internal_proto_workflow_v2_workflow_proto_msgTypes[7] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetWorkflowsResponse_StopWorkflow) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetWorkflowsResponse_StopWorkflow) ProtoMessage() {} + +func (x *GetWorkflowsResponse_StopWorkflow) ProtoReflect() protoreflect.Message { + mi := &file_internal_proto_workflow_v2_workflow_proto_msgTypes[7] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetWorkflowsResponse_StopWorkflow.ProtoReflect.Descriptor instead. +func (*GetWorkflowsResponse_StopWorkflow) Descriptor() ([]byte, []int) { + return file_internal_proto_workflow_v2_workflow_proto_rawDescGZIP(), []int{1, 1} +} + +func (x *GetWorkflowsResponse_StopWorkflow) GetWorkflowId() string { + if x != nil { + return x.WorkflowId + } + return "" +} + +type Workflow_Action struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // A unique identifier for an action in the context of a workflow. + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + // The name of the action. This can be used to identify actions in logging. + Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` + // The image to run. + Image string `protobuf:"bytes,3,opt,name=image,proto3" json:"image,omitempty"` + // The command to execute when launching the image. When using Docker as the action runtime + // it is used as the entrypoint. + Cmd *string `protobuf:"bytes,4,opt,name=cmd,proto3,oneof" json:"cmd,omitempty"` + // Arguments to pass to the container. + Args []string `protobuf:"bytes,5,rep,name=args,proto3" json:"args,omitempty"` + // Environment variables to configure when launching the container. + Env map[string]string `protobuf:"bytes,6,rep,name=env,proto3" json:"env,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + // Volumes to mount when launching the container. + Volumes []string `protobuf:"bytes,7,rep,name=volumes,proto3" json:"volumes,omitempty"` + // The network namespace to launch the container in. + NetworkNamespace *string `protobuf:"bytes,8,opt,name=network_namespace,json=networkNamespace,proto3,oneof" json:"network_namespace,omitempty"` +} + +func (x *Workflow_Action) Reset() { + *x = Workflow_Action{} + if protoimpl.UnsafeEnabled { + mi := &file_internal_proto_workflow_v2_workflow_proto_msgTypes[8] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Workflow_Action) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Workflow_Action) ProtoMessage() {} + +func (x *Workflow_Action) ProtoReflect() protoreflect.Message { + mi := &file_internal_proto_workflow_v2_workflow_proto_msgTypes[8] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Workflow_Action.ProtoReflect.Descriptor instead. +func (*Workflow_Action) Descriptor() ([]byte, []int) { + return file_internal_proto_workflow_v2_workflow_proto_rawDescGZIP(), []int{4, 0} +} + +func (x *Workflow_Action) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *Workflow_Action) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *Workflow_Action) GetImage() string { + if x != nil { + return x.Image + } + return "" +} + +func (x *Workflow_Action) GetCmd() string { + if x != nil && x.Cmd != nil { + return *x.Cmd + } + return "" +} + +func (x *Workflow_Action) GetArgs() []string { + if x != nil { + return x.Args + } + return nil +} + +func (x *Workflow_Action) GetEnv() map[string]string { + if x != nil { + return x.Env + } + return nil +} + +func (x *Workflow_Action) GetVolumes() []string { + if x != nil { + return x.Volumes + } + return nil +} + +func (x *Workflow_Action) GetNetworkNamespace() string { + if x != nil && x.NetworkNamespace != nil { + return *x.NetworkNamespace + } + return "" +} + +type Event_ActionStarted struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // A unique identifier for an action in the context of a workflow. + ActionId string `protobuf:"bytes,1,opt,name=action_id,json=actionId,proto3" json:"action_id,omitempty"` +} + +func (x *Event_ActionStarted) Reset() { + *x = Event_ActionStarted{} + if protoimpl.UnsafeEnabled { + mi := &file_internal_proto_workflow_v2_workflow_proto_msgTypes[10] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Event_ActionStarted) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Event_ActionStarted) ProtoMessage() {} + +func (x *Event_ActionStarted) ProtoReflect() protoreflect.Message { + mi := &file_internal_proto_workflow_v2_workflow_proto_msgTypes[10] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Event_ActionStarted.ProtoReflect.Descriptor instead. +func (*Event_ActionStarted) Descriptor() ([]byte, []int) { + return file_internal_proto_workflow_v2_workflow_proto_rawDescGZIP(), []int{5, 0} +} + +func (x *Event_ActionStarted) GetActionId() string { + if x != nil { + return x.ActionId + } + return "" +} + +type Event_ActionSucceeded struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // A unique identifier for an action in the context of a workflow. + ActionId string `protobuf:"bytes,1,opt,name=action_id,json=actionId,proto3" json:"action_id,omitempty"` +} + +func (x *Event_ActionSucceeded) Reset() { + *x = Event_ActionSucceeded{} + if protoimpl.UnsafeEnabled { + mi := &file_internal_proto_workflow_v2_workflow_proto_msgTypes[11] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Event_ActionSucceeded) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Event_ActionSucceeded) ProtoMessage() {} + +func (x *Event_ActionSucceeded) ProtoReflect() protoreflect.Message { + mi := &file_internal_proto_workflow_v2_workflow_proto_msgTypes[11] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Event_ActionSucceeded.ProtoReflect.Descriptor instead. +func (*Event_ActionSucceeded) Descriptor() ([]byte, []int) { + return file_internal_proto_workflow_v2_workflow_proto_rawDescGZIP(), []int{5, 1} +} + +func (x *Event_ActionSucceeded) GetActionId() string { + if x != nil { + return x.ActionId + } + return "" +} + +type Event_ActionFailed struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // A unique identifier for an action in the context of a workflow. + ActionId string `protobuf:"bytes,1,opt,name=action_id,json=actionId,proto3" json:"action_id,omitempty"` + // A UpperCamelCase word or phrase concisly describing why an action failed. It is typically + // provided by the action itself. + FailureReason *string `protobuf:"bytes,2,opt,name=failure_reason,json=failureReason,proto3,oneof" json:"failure_reason,omitempty"` + // A free-form human readable string elaborating on the reason for failure. It is typically + // provided by the action itself. + FailureMessage *string `protobuf:"bytes,3,opt,name=failure_message,json=failureMessage,proto3,oneof" json:"failure_message,omitempty"` +} + +func (x *Event_ActionFailed) Reset() { + *x = Event_ActionFailed{} + if protoimpl.UnsafeEnabled { + mi := &file_internal_proto_workflow_v2_workflow_proto_msgTypes[12] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Event_ActionFailed) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Event_ActionFailed) ProtoMessage() {} + +func (x *Event_ActionFailed) ProtoReflect() protoreflect.Message { + mi := &file_internal_proto_workflow_v2_workflow_proto_msgTypes[12] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Event_ActionFailed.ProtoReflect.Descriptor instead. +func (*Event_ActionFailed) Descriptor() ([]byte, []int) { + return file_internal_proto_workflow_v2_workflow_proto_rawDescGZIP(), []int{5, 2} +} + +func (x *Event_ActionFailed) GetActionId() string { + if x != nil { + return x.ActionId + } + return "" +} + +func (x *Event_ActionFailed) GetFailureReason() string { + if x != nil && x.FailureReason != nil { + return *x.FailureReason + } + return "" +} + +func (x *Event_ActionFailed) GetFailureMessage() string { + if x != nil && x.FailureMessage != nil { + return *x.FailureMessage + } + return "" +} + +type Event_WorkflowRejected struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // A message describing why the workflow was rejected. + Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` +} + +func (x *Event_WorkflowRejected) Reset() { + *x = Event_WorkflowRejected{} + if protoimpl.UnsafeEnabled { + mi := &file_internal_proto_workflow_v2_workflow_proto_msgTypes[13] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Event_WorkflowRejected) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Event_WorkflowRejected) ProtoMessage() {} + +func (x *Event_WorkflowRejected) ProtoReflect() protoreflect.Message { + mi := &file_internal_proto_workflow_v2_workflow_proto_msgTypes[13] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Event_WorkflowRejected.ProtoReflect.Descriptor instead. +func (*Event_WorkflowRejected) Descriptor() ([]byte, []int) { + return file_internal_proto_workflow_v2_workflow_proto_rawDescGZIP(), []int{5, 3} +} + +func (x *Event_WorkflowRejected) GetMessage() string { + if x != nil { + return x.Message + } + return "" +} + +var File_internal_proto_workflow_v2_workflow_proto protoreflect.FileDescriptor + +var file_internal_proto_workflow_v2_workflow_proto_rawDesc = []byte{ + 0x0a, 0x29, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x2f, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x2f, 0x76, 0x32, 0x2f, 0x77, 0x6f, 0x72, + 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x1a, 0x69, 0x6e, 0x74, + 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x77, 0x6f, 0x72, 0x6b, + 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x76, 0x32, 0x22, 0x30, 0x0a, 0x13, 0x47, 0x65, 0x74, 0x57, 0x6f, + 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x19, + 0x0a, 0x08, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x07, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x22, 0xf0, 0x02, 0x0a, 0x14, 0x47, 0x65, + 0x74, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x12, 0x67, 0x0a, 0x0e, 0x73, 0x74, 0x61, 0x72, 0x74, 0x5f, 0x77, 0x6f, 0x72, 0x6b, + 0x66, 0x6c, 0x6f, 0x77, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3e, 0x2e, 0x69, 0x6e, 0x74, + 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x77, 0x6f, 0x72, 0x6b, + 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x76, 0x32, 0x2e, 0x47, 0x65, 0x74, 0x57, 0x6f, 0x72, 0x6b, 0x66, + 0x6c, 0x6f, 0x77, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x53, 0x74, 0x61, + 0x72, 0x74, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x48, 0x00, 0x52, 0x0d, 0x73, 0x74, + 0x61, 0x72, 0x74, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x12, 0x64, 0x0a, 0x0d, 0x73, + 0x74, 0x6f, 0x70, 0x5f, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x3d, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x2e, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x76, 0x32, 0x2e, + 0x47, 0x65, 0x74, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x73, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x53, 0x74, 0x6f, 0x70, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, + 0x77, 0x48, 0x00, 0x52, 0x0c, 0x73, 0x74, 0x6f, 0x70, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, + 0x77, 0x1a, 0x51, 0x0a, 0x0d, 0x53, 0x74, 0x61, 0x72, 0x74, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, + 0x6f, 0x77, 0x12, 0x40, 0x0a, 0x08, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x24, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x76, + 0x32, 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x52, 0x08, 0x77, 0x6f, 0x72, 0x6b, + 0x66, 0x6c, 0x6f, 0x77, 0x1a, 0x2f, 0x0a, 0x0c, 0x53, 0x74, 0x6f, 0x70, 0x57, 0x6f, 0x72, 0x6b, + 0x66, 0x6c, 0x6f, 0x77, 0x12, 0x1f, 0x0a, 0x0b, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, + 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x77, 0x6f, 0x72, 0x6b, 0x66, + 0x6c, 0x6f, 0x77, 0x49, 0x64, 0x42, 0x05, 0x0a, 0x03, 0x63, 0x6d, 0x64, 0x22, 0x4e, 0x0a, 0x13, + 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x12, 0x37, 0x0a, 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x2e, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x76, 0x32, 0x2e, + 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x22, 0x16, 0x0a, 0x14, + 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x22, 0xcc, 0x03, 0x0a, 0x08, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, + 0x77, 0x12, 0x1f, 0x0a, 0x0b, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x5f, 0x69, 0x64, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, + 0x49, 0x64, 0x12, 0x45, 0x0a, 0x07, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x02, 0x20, + 0x03, 0x28, 0x0b, 0x32, 0x2b, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x76, 0x32, + 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, + 0x52, 0x07, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x1a, 0xd7, 0x02, 0x0a, 0x06, 0x41, 0x63, + 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x02, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x69, 0x6d, 0x61, 0x67, + 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x69, 0x6d, 0x61, 0x67, 0x65, 0x12, 0x15, + 0x0a, 0x03, 0x63, 0x6d, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x03, 0x63, + 0x6d, 0x64, 0x88, 0x01, 0x01, 0x12, 0x12, 0x0a, 0x04, 0x61, 0x72, 0x67, 0x73, 0x18, 0x05, 0x20, + 0x03, 0x28, 0x09, 0x52, 0x04, 0x61, 0x72, 0x67, 0x73, 0x12, 0x46, 0x0a, 0x03, 0x65, 0x6e, 0x76, + 0x18, 0x06, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x34, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, + 0x6c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, + 0x2e, 0x76, 0x32, 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x41, 0x63, 0x74, + 0x69, 0x6f, 0x6e, 0x2e, 0x45, 0x6e, 0x76, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x03, 0x65, 0x6e, + 0x76, 0x12, 0x18, 0x0a, 0x07, 0x76, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x73, 0x18, 0x07, 0x20, 0x03, + 0x28, 0x09, 0x52, 0x07, 0x76, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x73, 0x12, 0x30, 0x0a, 0x11, 0x6e, + 0x65, 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, + 0x18, 0x08, 0x20, 0x01, 0x28, 0x09, 0x48, 0x01, 0x52, 0x10, 0x6e, 0x65, 0x74, 0x77, 0x6f, 0x72, + 0x6b, 0x4e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x88, 0x01, 0x01, 0x1a, 0x36, 0x0a, + 0x08, 0x45, 0x6e, 0x76, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, + 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, + 0x65, 0x3a, 0x02, 0x38, 0x01, 0x42, 0x06, 0x0a, 0x04, 0x5f, 0x63, 0x6d, 0x64, 0x42, 0x14, 0x0a, + 0x12, 0x5f, 0x6e, 0x65, 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, + 0x61, 0x63, 0x65, 0x22, 0xe0, 0x05, 0x0a, 0x05, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x1f, 0x0a, + 0x0b, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x0a, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x49, 0x64, 0x12, 0x58, + 0x0a, 0x0e, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x73, 0x74, 0x61, 0x72, 0x74, 0x65, 0x64, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2f, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, + 0x6c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, + 0x2e, 0x76, 0x32, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x2e, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, + 0x53, 0x74, 0x61, 0x72, 0x74, 0x65, 0x64, 0x48, 0x00, 0x52, 0x0d, 0x61, 0x63, 0x74, 0x69, 0x6f, + 0x6e, 0x53, 0x74, 0x61, 0x72, 0x74, 0x65, 0x64, 0x12, 0x5e, 0x0a, 0x10, 0x61, 0x63, 0x74, 0x69, + 0x6f, 0x6e, 0x5f, 0x73, 0x75, 0x63, 0x63, 0x65, 0x65, 0x64, 0x65, 0x64, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x31, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x2e, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x76, 0x32, 0x2e, + 0x45, 0x76, 0x65, 0x6e, 0x74, 0x2e, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x75, 0x63, 0x63, + 0x65, 0x65, 0x64, 0x65, 0x64, 0x48, 0x00, 0x52, 0x0f, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x53, + 0x75, 0x63, 0x63, 0x65, 0x65, 0x64, 0x65, 0x64, 0x12, 0x55, 0x0a, 0x0d, 0x61, 0x63, 0x74, 0x69, + 0x6f, 0x6e, 0x5f, 0x66, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x2e, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x2e, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x76, 0x32, 0x2e, 0x45, 0x76, 0x65, + 0x6e, 0x74, 0x2e, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x48, + 0x00, 0x52, 0x0c, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x12, + 0x61, 0x0a, 0x11, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x5f, 0x72, 0x65, 0x6a, 0x65, + 0x63, 0x74, 0x65, 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x32, 0x2e, 0x69, 0x6e, 0x74, + 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x77, 0x6f, 0x72, 0x6b, + 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x76, 0x32, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x2e, 0x57, 0x6f, + 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x52, 0x65, 0x6a, 0x65, 0x63, 0x74, 0x65, 0x64, 0x48, 0x00, + 0x52, 0x10, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x52, 0x65, 0x6a, 0x65, 0x63, 0x74, + 0x65, 0x64, 0x1a, 0x2c, 0x0a, 0x0d, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x72, + 0x74, 0x65, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, + 0x1a, 0x2e, 0x0a, 0x0f, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x75, 0x63, 0x63, 0x65, 0x65, + 0x64, 0x65, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, + 0x1a, 0xac, 0x01, 0x0a, 0x0c, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x46, 0x61, 0x69, 0x6c, 0x65, + 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x2a, + 0x0a, 0x0e, 0x66, 0x61, 0x69, 0x6c, 0x75, 0x72, 0x65, 0x5f, 0x72, 0x65, 0x61, 0x73, 0x6f, 0x6e, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x0d, 0x66, 0x61, 0x69, 0x6c, 0x75, 0x72, + 0x65, 0x52, 0x65, 0x61, 0x73, 0x6f, 0x6e, 0x88, 0x01, 0x01, 0x12, 0x2c, 0x0a, 0x0f, 0x66, 0x61, + 0x69, 0x6c, 0x75, 0x72, 0x65, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x09, 0x48, 0x01, 0x52, 0x0e, 0x66, 0x61, 0x69, 0x6c, 0x75, 0x72, 0x65, 0x4d, 0x65, + 0x73, 0x73, 0x61, 0x67, 0x65, 0x88, 0x01, 0x01, 0x42, 0x11, 0x0a, 0x0f, 0x5f, 0x66, 0x61, 0x69, + 0x6c, 0x75, 0x72, 0x65, 0x5f, 0x72, 0x65, 0x61, 0x73, 0x6f, 0x6e, 0x42, 0x12, 0x0a, 0x10, 0x5f, + 0x66, 0x61, 0x69, 0x6c, 0x75, 0x72, 0x65, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, + 0x2c, 0x0a, 0x10, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x52, 0x65, 0x6a, 0x65, 0x63, + 0x74, 0x65, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x42, 0x07, 0x0a, + 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x32, 0xfd, 0x01, 0x0a, 0x0f, 0x57, 0x6f, 0x72, 0x6b, 0x66, + 0x6c, 0x6f, 0x77, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x75, 0x0a, 0x0c, 0x47, 0x65, + 0x74, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x73, 0x12, 0x2f, 0x2e, 0x69, 0x6e, 0x74, + 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x77, 0x6f, 0x72, 0x6b, + 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x76, 0x32, 0x2e, 0x47, 0x65, 0x74, 0x57, 0x6f, 0x72, 0x6b, 0x66, + 0x6c, 0x6f, 0x77, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x30, 0x2e, 0x69, 0x6e, + 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x77, 0x6f, 0x72, + 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x76, 0x32, 0x2e, 0x47, 0x65, 0x74, 0x57, 0x6f, 0x72, 0x6b, + 0x66, 0x6c, 0x6f, 0x77, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, + 0x01, 0x12, 0x73, 0x0a, 0x0c, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x45, 0x76, 0x65, 0x6e, + 0x74, 0x12, 0x2f, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x2e, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x76, 0x32, 0x2e, 0x50, + 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x1a, 0x30, 0x2e, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x2e, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x76, 0x32, 0x2e, + 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x40, 0x5a, 0x3e, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, + 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x74, 0x69, 0x6e, 0x6b, 0x65, 0x72, 0x62, 0x65, 0x6c, 0x6c, 0x2f, + 0x74, 0x69, 0x6e, 0x6b, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x2f, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x2f, 0x76, 0x32, 0x3b, + 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_internal_proto_workflow_v2_workflow_proto_rawDescOnce sync.Once + file_internal_proto_workflow_v2_workflow_proto_rawDescData = file_internal_proto_workflow_v2_workflow_proto_rawDesc +) + +func file_internal_proto_workflow_v2_workflow_proto_rawDescGZIP() []byte { + file_internal_proto_workflow_v2_workflow_proto_rawDescOnce.Do(func() { + file_internal_proto_workflow_v2_workflow_proto_rawDescData = protoimpl.X.CompressGZIP(file_internal_proto_workflow_v2_workflow_proto_rawDescData) + }) + return file_internal_proto_workflow_v2_workflow_proto_rawDescData +} + +var ( + file_internal_proto_workflow_v2_workflow_proto_msgTypes = make([]protoimpl.MessageInfo, 14) + file_internal_proto_workflow_v2_workflow_proto_goTypes = []interface{}{ + (*GetWorkflowsRequest)(nil), // 0: internal.proto.workflow.v2.GetWorkflowsRequest + (*GetWorkflowsResponse)(nil), // 1: internal.proto.workflow.v2.GetWorkflowsResponse + (*PublishEventRequest)(nil), // 2: internal.proto.workflow.v2.PublishEventRequest + (*PublishEventResponse)(nil), // 3: internal.proto.workflow.v2.PublishEventResponse + (*Workflow)(nil), // 4: internal.proto.workflow.v2.Workflow + (*Event)(nil), // 5: internal.proto.workflow.v2.Event + (*GetWorkflowsResponse_StartWorkflow)(nil), // 6: internal.proto.workflow.v2.GetWorkflowsResponse.StartWorkflow + (*GetWorkflowsResponse_StopWorkflow)(nil), // 7: internal.proto.workflow.v2.GetWorkflowsResponse.StopWorkflow + (*Workflow_Action)(nil), // 8: internal.proto.workflow.v2.Workflow.Action + nil, // 9: internal.proto.workflow.v2.Workflow.Action.EnvEntry + (*Event_ActionStarted)(nil), // 10: internal.proto.workflow.v2.Event.ActionStarted + (*Event_ActionSucceeded)(nil), // 11: internal.proto.workflow.v2.Event.ActionSucceeded + (*Event_ActionFailed)(nil), // 12: internal.proto.workflow.v2.Event.ActionFailed + (*Event_WorkflowRejected)(nil), // 13: internal.proto.workflow.v2.Event.WorkflowRejected + } +) +var file_internal_proto_workflow_v2_workflow_proto_depIdxs = []int32{ + 6, // 0: internal.proto.workflow.v2.GetWorkflowsResponse.start_workflow:type_name -> internal.proto.workflow.v2.GetWorkflowsResponse.StartWorkflow + 7, // 1: internal.proto.workflow.v2.GetWorkflowsResponse.stop_workflow:type_name -> internal.proto.workflow.v2.GetWorkflowsResponse.StopWorkflow + 5, // 2: internal.proto.workflow.v2.PublishEventRequest.event:type_name -> internal.proto.workflow.v2.Event + 8, // 3: internal.proto.workflow.v2.Workflow.actions:type_name -> internal.proto.workflow.v2.Workflow.Action + 10, // 4: internal.proto.workflow.v2.Event.action_started:type_name -> internal.proto.workflow.v2.Event.ActionStarted + 11, // 5: internal.proto.workflow.v2.Event.action_succeeded:type_name -> internal.proto.workflow.v2.Event.ActionSucceeded + 12, // 6: internal.proto.workflow.v2.Event.action_failed:type_name -> internal.proto.workflow.v2.Event.ActionFailed + 13, // 7: internal.proto.workflow.v2.Event.workflow_rejected:type_name -> internal.proto.workflow.v2.Event.WorkflowRejected + 4, // 8: internal.proto.workflow.v2.GetWorkflowsResponse.StartWorkflow.workflow:type_name -> internal.proto.workflow.v2.Workflow + 9, // 9: internal.proto.workflow.v2.Workflow.Action.env:type_name -> internal.proto.workflow.v2.Workflow.Action.EnvEntry + 0, // 10: internal.proto.workflow.v2.WorkflowService.GetWorkflows:input_type -> internal.proto.workflow.v2.GetWorkflowsRequest + 2, // 11: internal.proto.workflow.v2.WorkflowService.PublishEvent:input_type -> internal.proto.workflow.v2.PublishEventRequest + 1, // 12: internal.proto.workflow.v2.WorkflowService.GetWorkflows:output_type -> internal.proto.workflow.v2.GetWorkflowsResponse + 3, // 13: internal.proto.workflow.v2.WorkflowService.PublishEvent:output_type -> internal.proto.workflow.v2.PublishEventResponse + 12, // [12:14] is the sub-list for method output_type + 10, // [10:12] is the sub-list for method input_type + 10, // [10:10] is the sub-list for extension type_name + 10, // [10:10] is the sub-list for extension extendee + 0, // [0:10] is the sub-list for field type_name +} + +func init() { file_internal_proto_workflow_v2_workflow_proto_init() } +func file_internal_proto_workflow_v2_workflow_proto_init() { + if File_internal_proto_workflow_v2_workflow_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_internal_proto_workflow_v2_workflow_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetWorkflowsRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_internal_proto_workflow_v2_workflow_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetWorkflowsResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_internal_proto_workflow_v2_workflow_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*PublishEventRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_internal_proto_workflow_v2_workflow_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*PublishEventResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_internal_proto_workflow_v2_workflow_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Workflow); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_internal_proto_workflow_v2_workflow_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Event); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_internal_proto_workflow_v2_workflow_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetWorkflowsResponse_StartWorkflow); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_internal_proto_workflow_v2_workflow_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetWorkflowsResponse_StopWorkflow); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_internal_proto_workflow_v2_workflow_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Workflow_Action); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_internal_proto_workflow_v2_workflow_proto_msgTypes[10].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Event_ActionStarted); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_internal_proto_workflow_v2_workflow_proto_msgTypes[11].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Event_ActionSucceeded); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_internal_proto_workflow_v2_workflow_proto_msgTypes[12].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Event_ActionFailed); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_internal_proto_workflow_v2_workflow_proto_msgTypes[13].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Event_WorkflowRejected); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + file_internal_proto_workflow_v2_workflow_proto_msgTypes[1].OneofWrappers = []interface{}{ + (*GetWorkflowsResponse_StartWorkflow_)(nil), + (*GetWorkflowsResponse_StopWorkflow_)(nil), + } + file_internal_proto_workflow_v2_workflow_proto_msgTypes[5].OneofWrappers = []interface{}{ + (*Event_ActionStarted_)(nil), + (*Event_ActionSucceeded_)(nil), + (*Event_ActionFailed_)(nil), + (*Event_WorkflowRejected_)(nil), + } + file_internal_proto_workflow_v2_workflow_proto_msgTypes[8].OneofWrappers = []interface{}{} + file_internal_proto_workflow_v2_workflow_proto_msgTypes[12].OneofWrappers = []interface{}{} + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_internal_proto_workflow_v2_workflow_proto_rawDesc, + NumEnums: 0, + NumMessages: 14, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_internal_proto_workflow_v2_workflow_proto_goTypes, + DependencyIndexes: file_internal_proto_workflow_v2_workflow_proto_depIdxs, + MessageInfos: file_internal_proto_workflow_v2_workflow_proto_msgTypes, + }.Build() + File_internal_proto_workflow_v2_workflow_proto = out.File + file_internal_proto_workflow_v2_workflow_proto_rawDesc = nil + file_internal_proto_workflow_v2_workflow_proto_goTypes = nil + file_internal_proto_workflow_v2_workflow_proto_depIdxs = nil +} diff --git a/internal/proto/workflow/v2/workflow.proto b/internal/proto/workflow/v2/workflow.proto new file mode 100644 index 000000000..4154e0060 --- /dev/null +++ b/internal/proto/workflow/v2/workflow.proto @@ -0,0 +1,117 @@ +syntax = "proto3"; + +package internal.proto.workflow.v2; + +option go_package = "github.com/tinkerbell/tink/internal/proto/workflow/v2;workflow"; + +// WorkflowService is responsible for retrieving workflows to be executed by the agent and +// publishing events as a workflow executes. +service WorkflowService { + // GetWorkflows creates a stream that will receive workflows intended for the agent identified + // by the GetWorkflowsRequest.agent_id. + rpc GetWorkflows(GetWorkflowsRequest) returns (stream GetWorkflowsResponse) {} + + // PublishEvent publishes a workflow event. + rpc PublishEvent(PublishEventRequest) returns (PublishEventResponse) {} +} + +message GetWorkflowsRequest { + string agent_id = 1; +} + +message GetWorkflowsResponse { + oneof cmd { + StartWorkflow start_workflow = 1; + StopWorkflow stop_workflow = 2; + } + + message StartWorkflow { + Workflow workflow = 1; + } + + message StopWorkflow { + string workflow_id = 1; + } +} + +message PublishEventRequest { + Event event = 1; +} + +message PublishEventResponse {} + +message Workflow { + // A unique identifier for a workflow. + string workflow_id = 1; + + // The actions that make up the workflow. + repeated Action actions = 2; + + message Action { + // A unique identifier for an action in the context of a workflow. + string id = 1; + + // The name of the action. This can be used to identify actions in logging. + string name = 2; + + // The image to run. + string image = 3; + + // The command to execute when launching the image. When using Docker as the action runtime + // it is used as the entrypoint. + optional string cmd = 4; + + // Arguments to pass to the container. + repeated string args = 5; + + // Environment variables to configure when launching the container. + map env = 6; + + // Volumes to mount when launching the container. + repeated string volumes = 7; + + // The network namespace to launch the container in. + optional string network_namespace = 8; + } +} + +message Event { + // A unique identifier for a workflow. + string workflow_id = 1; + + oneof event { + ActionStarted action_started = 2; + ActionSucceeded action_succeeded = 3; + ActionFailed action_failed = 4; + WorkflowRejected workflow_rejected = 5; + } + + message ActionStarted { + // A unique identifier for an action in the context of a workflow. + string action_id = 1; + } + + message ActionSucceeded { + // A unique identifier for an action in the context of a workflow. + string action_id = 1; + } + + message ActionFailed { + // A unique identifier for an action in the context of a workflow. + string action_id = 1; + + // A UpperCamelCase word or phrase concisly describing why an action failed. It is typically + // provided by the action itself. + optional string failure_reason = 2; + + // A free-form human readable string elaborating on the reason for failure. It is typically + // provided by the action itself. + optional string failure_message = 3; + + } + + message WorkflowRejected { + // A message describing why the workflow was rejected. + string message = 2; + } +} \ No newline at end of file diff --git a/internal/proto/workflow/v2/workflow_grpc.pb.go b/internal/proto/workflow/v2/workflow_grpc.pb.go new file mode 100644 index 000000000..3d6b4fc11 --- /dev/null +++ b/internal/proto/workflow/v2/workflow_grpc.pb.go @@ -0,0 +1,174 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.2.0 +// - protoc (unknown) +// source: internal/proto/workflow/v2/workflow.proto + +package workflow + +import ( + context "context" + + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// WorkflowServiceClient is the client API for WorkflowService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type WorkflowServiceClient interface { + // GetWorkflows creates a stream that will receive workflows intended for the agent identified + // by the GetWorkflowsRequest.agent_id. + GetWorkflows(ctx context.Context, in *GetWorkflowsRequest, opts ...grpc.CallOption) (WorkflowService_GetWorkflowsClient, error) + // PublishEvent publishes a workflow event. + PublishEvent(ctx context.Context, in *PublishEventRequest, opts ...grpc.CallOption) (*PublishEventResponse, error) +} + +type workflowServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewWorkflowServiceClient(cc grpc.ClientConnInterface) WorkflowServiceClient { + return &workflowServiceClient{cc} +} + +func (c *workflowServiceClient) GetWorkflows(ctx context.Context, in *GetWorkflowsRequest, opts ...grpc.CallOption) (WorkflowService_GetWorkflowsClient, error) { + stream, err := c.cc.NewStream(ctx, &WorkflowService_ServiceDesc.Streams[0], "/internal.proto.workflow.v2.WorkflowService/GetWorkflows", opts...) + if err != nil { + return nil, err + } + x := &workflowServiceGetWorkflowsClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type WorkflowService_GetWorkflowsClient interface { + Recv() (*GetWorkflowsResponse, error) + grpc.ClientStream +} + +type workflowServiceGetWorkflowsClient struct { + grpc.ClientStream +} + +func (x *workflowServiceGetWorkflowsClient) Recv() (*GetWorkflowsResponse, error) { + m := new(GetWorkflowsResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *workflowServiceClient) PublishEvent(ctx context.Context, in *PublishEventRequest, opts ...grpc.CallOption) (*PublishEventResponse, error) { + out := new(PublishEventResponse) + err := c.cc.Invoke(ctx, "/internal.proto.workflow.v2.WorkflowService/PublishEvent", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// WorkflowServiceServer is the server API for WorkflowService service. +// All implementations should embed UnimplementedWorkflowServiceServer +// for forward compatibility +type WorkflowServiceServer interface { + // GetWorkflows creates a stream that will receive workflows intended for the agent identified + // by the GetWorkflowsRequest.agent_id. + GetWorkflows(*GetWorkflowsRequest, WorkflowService_GetWorkflowsServer) error + // PublishEvent publishes a workflow event. + PublishEvent(context.Context, *PublishEventRequest) (*PublishEventResponse, error) +} + +// UnimplementedWorkflowServiceServer should be embedded to have forward compatible implementations. +type UnimplementedWorkflowServiceServer struct{} + +func (UnimplementedWorkflowServiceServer) GetWorkflows(*GetWorkflowsRequest, WorkflowService_GetWorkflowsServer) error { + return status.Errorf(codes.Unimplemented, "method GetWorkflows not implemented") +} + +func (UnimplementedWorkflowServiceServer) PublishEvent(context.Context, *PublishEventRequest) (*PublishEventResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method PublishEvent not implemented") +} + +// UnsafeWorkflowServiceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to WorkflowServiceServer will +// result in compilation errors. +type UnsafeWorkflowServiceServer interface { + mustEmbedUnimplementedWorkflowServiceServer() +} + +func RegisterWorkflowServiceServer(s grpc.ServiceRegistrar, srv WorkflowServiceServer) { + s.RegisterService(&WorkflowService_ServiceDesc, srv) +} + +func _WorkflowService_GetWorkflows_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(GetWorkflowsRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(WorkflowServiceServer).GetWorkflows(m, &workflowServiceGetWorkflowsServer{stream}) +} + +type WorkflowService_GetWorkflowsServer interface { + Send(*GetWorkflowsResponse) error + grpc.ServerStream +} + +type workflowServiceGetWorkflowsServer struct { + grpc.ServerStream +} + +func (x *workflowServiceGetWorkflowsServer) Send(m *GetWorkflowsResponse) error { + return x.ServerStream.SendMsg(m) +} + +func _WorkflowService_PublishEvent_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(PublishEventRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(WorkflowServiceServer).PublishEvent(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/internal.proto.workflow.v2.WorkflowService/PublishEvent", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(WorkflowServiceServer).PublishEvent(ctx, req.(*PublishEventRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// WorkflowService_ServiceDesc is the grpc.ServiceDesc for WorkflowService service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var WorkflowService_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "internal.proto.workflow.v2.WorkflowService", + HandlerType: (*WorkflowServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "PublishEvent", + Handler: _WorkflowService_PublishEvent_Handler, + }, + }, + Streams: []grpc.StreamDesc{ + { + StreamName: "GetWorkflows", + Handler: _WorkflowService_GetWorkflows_Handler, + ServerStreams: true, + }, + }, + Metadata: "internal/proto/workflow/v2/workflow.proto", +}