Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Template executor plugins. Fixes #5201 #7256

Merged
merged 4 commits into from
Jan 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ dist
docs
examples
manifests
plugins
sdks
test/e2e
ui/dist
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/ci-build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ jobs:
max-parallel: 4
matrix:
include:
- test: test-plugins
containerRuntimeExecutor: emissary
profile: plugins
- test: test-functional
containerRuntimeExecutor: emissary
profile: minimal
Expand Down
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ run:
- executor
- examples
- functional
- plugins
linters:
enable:
- bodyclose
Expand Down
18 changes: 14 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ UI ?= false
API ?= $(UI)
GOTEST ?= go test -v
PROFILE ?= minimal
PLUGINS ?= $(shell [ $PROFILE = plugins ] && echo false || echo true)
# by keeping this short we speed up the tests
DEFAULT_REQUEUE_TIME ?= 100ms
# whether or not to start the Argo Service in TLS mode
Expand Down Expand Up @@ -237,9 +238,14 @@ argoexec-image:
if [ $(DOCKER_PUSH) = true ] && [ $(IMAGE_NAMESPACE) != argoproj ] ; then docker push $(IMAGE_NAMESPACE)/$*:$(VERSION) ; fi

# generation
plugins/%-plugin-configmap.yaml: ./dist/argo
./dist/argo executor-plugin build $(dir $@)

.PHONY: plugins
plugins: $(shell find plugins -name '*-configmap.yaml')

.PHONY: codegen
codegen: types swagger docs manifests
codegen: types swagger docs manifests plugins
make --directory sdks/java generate
make --directory sdks/python generate

Expand Down Expand Up @@ -271,7 +277,7 @@ docs: \
rm -Rf vendor v3
go mod tidy
# `go generate ./...` takes around 10s, so we only run on specific packages.
go generate ./persist/sqldb ./pkg/apiclient/workflow ./server/auth ./server/auth/sso ./workflow/executor
go generate ./persist/sqldb ./pkg/plugins ./pkg/apiclient/workflow ./server/auth ./server/auth/sso ./workflow/executor
./hack/check-env-doc.sh

$(GOPATH)/bin/mockery:
Expand All @@ -293,7 +299,7 @@ $(GOPATH)/bin/protoc-gen-swagger:
$(GOPATH)/bin/openapi-gen:
go install k8s.io/kube-openapi/cmd/openapi-gen@v0.0.0-20210305001622-591a79e4bda7
$(GOPATH)/bin/swagger:
go install github.com/go-swagger/go-swagger/cmd/swagger@v0.25.0
go install github.com/go-swagger/go-swagger/cmd/swagger@v0.28.0
$(GOPATH)/bin/goimports:
go install golang.org/x/tools/cmd/goimports@v0.1.6

Expand Down Expand Up @@ -388,6 +394,7 @@ lint: server/static/files.go $(GOPATH)/bin/golangci-lint
# for local we have a faster target that prints to stdout, does not use json, and can cache because it has no coverage
.PHONY: test
test: server/static/files.go dist/argosay
go build ./...
env KUBECONFIG=/dev/null $(GOTEST) ./...

.PHONY: install
Expand Down Expand Up @@ -450,6 +457,9 @@ ifneq ($(API),true)
endif
ifneq ($(UI),true)
@echo "⚠️ not starting UI. If you want to test the UI, run 'make start UI=true' to start it"
endif
ifneq ($(PLUGINS),true)
@echo "⚠️ not starting plugins. If you want to test plugins, run 'make start PROFILE=plugins' to start it"
endif
# Check dex, minio, postgres and mysql are in hosts file
ifeq ($(AUTH_MODE),sso)
Expand All @@ -460,7 +470,7 @@ endif
grep '127.0.0.1[[:blank:]]*mysql' /etc/hosts
./hack/port-forward.sh
ifeq ($(RUN_MODE),local)
env DEFAULT_REQUEUE_TIME=$(DEFAULT_REQUEUE_TIME) SECURE=$(SECURE) ALWAYS_OFFLOAD_NODE_STATUS=$(ALWAYS_OFFLOAD_NODE_STATUS) LOG_LEVEL=$(LOG_LEVEL) UPPERIO_DB_DEBUG=$(UPPERIO_DB_DEBUG) IMAGE_NAMESPACE=$(IMAGE_NAMESPACE) VERSION=$(VERSION) AUTH_MODE=$(AUTH_MODE) NAMESPACED=$(NAMESPACED) NAMESPACE=$(KUBE_NAMESPACE) MANAGED_NAMESPACE=$(MANAGED_NAMESPACE) UI=$(UI) API=$(API) $(GOPATH)/bin/goreman -set-ports=false -logtime=false start $(shell if [ -z $GREP_LOGS ]; then echo; else echo "| grep \"$(GREP_LOGS)\""; fi)
env DEFAULT_REQUEUE_TIME=$(DEFAULT_REQUEUE_TIME) SECURE=$(SECURE) ALWAYS_OFFLOAD_NODE_STATUS=$(ALWAYS_OFFLOAD_NODE_STATUS) LOG_LEVEL=$(LOG_LEVEL) UPPERIO_DB_DEBUG=$(UPPERIO_DB_DEBUG) IMAGE_NAMESPACE=$(IMAGE_NAMESPACE) VERSION=$(VERSION) AUTH_MODE=$(AUTH_MODE) NAMESPACED=$(NAMESPACED) NAMESPACE=$(KUBE_NAMESPACE) MANAGED_NAMESPACE=$(MANAGED_NAMESPACE) UI=$(UI) API=$(API) PLUGINS=$(PLUGINS) $(GOPATH)/bin/goreman -set-ports=false -logtime=false start $(shell if [ -z $GREP_LOGS ]; then echo; else echo "| grep \"$(GREP_LOGS)\""; fi)
endif

$(GOPATH)/bin/stern:
Expand Down
2 changes: 1 addition & 1 deletion Procfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
controller: ./hack/free-port.sh 9090 && ARGO_REMOVE_PVC_PROTECTION_FINALIZER=true ARGO_PROGRESS_PATCH_TICK_DURATION=7s DEFAULT_REQUEUE_TIME=${DEFAULT_REQUEUE_TIME} LEADER_ELECTION_IDENTITY=local ALWAYS_OFFLOAD_NODE_STATUS=${ALWAYS_OFFLOAD_NODE_STATUS} OFFLOAD_NODE_STATUS_TTL=30s WORKFLOW_GC_PERIOD=30s UPPERIO_DB_DEBUG=${UPPERIO_DB_DEBUG} ARCHIVED_WORKFLOW_GC_PERIOD=30s ./dist/workflow-controller --executor-image ${IMAGE_NAMESPACE}/argoexec:${VERSION} --namespaced=${NAMESPACED} --namespace ${NAMESPACE} --managed-namespace=${MANAGED_NAMESPACE} --loglevel ${LOG_LEVEL}
controller: ./hack/free-port.sh 9090 && ARGO_EXECUTOR_PLUGINS=${PLUGINS} ARGO_REMOVE_PVC_PROTECTION_FINALIZER=true ARGO_PROGRESS_PATCH_TICK_DURATION=7s DEFAULT_REQUEUE_TIME=${DEFAULT_REQUEUE_TIME} LEADER_ELECTION_IDENTITY=local ALWAYS_OFFLOAD_NODE_STATUS=${ALWAYS_OFFLOAD_NODE_STATUS} OFFLOAD_NODE_STATUS_TTL=30s WORKFLOW_GC_PERIOD=30s UPPERIO_DB_DEBUG=${UPPERIO_DB_DEBUG} ARCHIVED_WORKFLOW_GC_PERIOD=30s ./dist/workflow-controller --executor-image ${IMAGE_NAMESPACE}/argoexec:${VERSION} --namespaced=${NAMESPACED} --namespace ${NAMESPACE} --managed-namespace=${MANAGED_NAMESPACE} --loglevel ${LOG_LEVEL}
argo-server: ./hack/free-port.sh 2746 && [ "$API" = "true" ] && UPPERIO_DB_DEBUG=${UPPERIO_DB_DEBUG} ./dist/argo --loglevel ${LOG_LEVEL} server --namespaced=${NAMESPACED} --namespace ${NAMESPACE} --auth-mode ${AUTH_MODE} --secure=$SECURE --x-frame-options=SAMEORIGIN
ui: ./hack/free-port.sh 8080 && [ "$UI" = "true" ] && yarn --cwd ui install && yarn --cwd ui start
logs: make logs
12 changes: 10 additions & 2 deletions api/jsonschema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -5228,7 +5228,7 @@
"type": "array"
},
"force": {
"description": "Force copies a file forcibly even if it exists (default: false)",
"description": "Force copies a file forcibly even if it exists",
"type": "boolean"
},
"hdfsUser": {
Expand Down Expand Up @@ -5280,7 +5280,7 @@
"type": "array"
},
"force": {
"description": "Force copies a file forcibly even if it exists (default: false)",
"description": "Force copies a file forcibly even if it exists",
"type": "boolean"
},
"hdfsUser": {
Expand Down Expand Up @@ -6034,6 +6034,10 @@
],
"type": "object"
},
"io.argoproj.workflow.v1alpha1.Plugin": {
"description": "Plugin is an Object with exactly one key",
"type": "object"
},
"io.argoproj.workflow.v1alpha1.PodGC": {
"description": "PodGC describes how to delete completed pods as they complete",
"properties": {
Expand Down Expand Up @@ -6764,6 +6768,10 @@
"description": "Parallelism limits the max total parallel pods that can execute at the same time within the boundaries of this template invocation. If additional steps/dag templates are invoked, the pods created by those templates will not be counted towards this total.",
"type": "integer"
},
"plugin": {
"$ref": "#/definitions/io.argoproj.workflow.v1alpha1.Plugin",
"description": "Plugin is a plugin template"
},
"podSpecPatch": {
"description": "PodSpecPatch holds strategic merge patch to apply against the pod spec. Allows parameterization of container fields which are not strings (e.g. resource limits).",
"type": "string"
Expand Down
12 changes: 10 additions & 2 deletions api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -9494,7 +9494,7 @@
}
},
"force": {
"description": "Force copies a file forcibly even if it exists (default: false)",
"description": "Force copies a file forcibly even if it exists",
"type": "boolean"
},
"hdfsUser": {
Expand Down Expand Up @@ -9543,7 +9543,7 @@
}
},
"force": {
"description": "Force copies a file forcibly even if it exists (default: false)",
"description": "Force copies a file forcibly even if it exists",
"type": "boolean"
},
"hdfsUser": {
Expand Down Expand Up @@ -10296,6 +10296,10 @@
}
}
},
"io.argoproj.workflow.v1alpha1.Plugin": {
"description": "Plugin is an Object with exactly one key",
"type": "object"
},
"io.argoproj.workflow.v1alpha1.PodGC": {
"description": "PodGC describes how to delete completed pods as they complete",
"type": "object",
Expand Down Expand Up @@ -11027,6 +11031,10 @@
"description": "Parallelism limits the max total parallel pods that can execute at the same time within the boundaries of this template invocation. If additional steps/dag templates are invoked, the pods created by those templates will not be counted towards this total.",
"type": "integer"
},
"plugin": {
"description": "Plugin is a plugin template",
"$ref": "#/definitions/io.argoproj.workflow.v1alpha1.Plugin"
},
"podSpecPatch": {
"description": "PodSpecPatch holds strategic merge patch to apply against the pod spec. Allows parameterization of container fields which are not strings (e.g. resource limits).",
"type": "string"
Expand Down
44 changes: 44 additions & 0 deletions cmd/argo/commands/executorplugin/build.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package executorplugin

import (
"fmt"
"os"

"github.com/spf13/cobra"

plugin "github.com/argoproj/argo-workflows/v3/workflow/util/plugins"
)

func NewBuildCommand() *cobra.Command {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this command work for all languages like java, go, etc?

return &cobra.Command{
Use: "build DIR",
Short: "build an executor plugin",
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
if len(args) != 1 {
cmd.HelpFunc()(cmd, args)
os.Exit(1)
}
pluginDir := args[0]
plug, err := loadPluginManifest(pluginDir)
if err != nil {
return err
}
cm, err := plugin.ToConfigMap(plug)
if err != nil {
return err
}
cmPath, err := saveConfigMap(cm, pluginDir)
if err != nil {
return err
}
fmt.Printf("%s created\n", cmPath)
readmePath, err := saveReadme(pluginDir, plug)
if err != nil {
return err
}
fmt.Printf("%s created\n", readmePath)
return nil
},
}
}
91 changes: 91 additions & 0 deletions cmd/argo/commands/executorplugin/io.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package executorplugin

import (
"fmt"
"os"
"path/filepath"
"text/template"

apiv1 "k8s.io/api/core/v1"
"sigs.k8s.io/yaml"

"github.com/argoproj/argo-workflows/v3/pkg/plugins/spec"
)

func loadPluginManifest(pluginDir string) (*spec.Plugin, error) {
manifest, err := os.ReadFile(filepath.Join(pluginDir, "plugin.yaml"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I'm a huge fan of hardwiring plugin.yaml. It makes sense for things like Dockerfile, but plugin.yaml is too generic. At the very least it should be a customizable argument, but I would argue it's better to require file name explicitly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Plugin?

if err != nil {
return nil, err
}
p := &spec.Plugin{}
err = yaml.UnmarshalStrict(manifest, p)
if err != nil {
return nil, err
}
files, err := filepath.Glob(filepath.Join(pluginDir, "server.*"))
if err != nil {
return nil, err
}

if len(files) > 1 {
return nil, fmt.Errorf("plugin %s has more than one server.* file", p.Name)
}
if len(files) == 1 {
code, err := os.ReadFile(files[0])
if err != nil {
return nil, err
}
p.Spec.Sidecar.Container.Args = []string{string(code)}
}
return p, p.Validate()
}

func addHeader(x []byte, h string) []byte {
return []byte(fmt.Sprintf("%s\n%s", h, string(x)))
}

func addCodegenHeader(x []byte) []byte {
return addHeader(x, "# This is an auto-generated file. DO NOT EDIT")
}

func saveConfigMap(cm *apiv1.ConfigMap, pluginDir string) (string, error) {
data, err := yaml.Marshal(cm)
if err != nil {
return "", err
}
cmPath := filepath.Join(pluginDir, fmt.Sprintf("%s-configmap.yaml", cm.Name))
err = os.WriteFile(cmPath, addCodegenHeader(data), 0666)
return cmPath, err
}

func saveReadme(pluginDir string, plug *spec.Plugin) (string, error) {
readmePath := filepath.Join(pluginDir, "README.md")
f, err := os.Create(readmePath)
if err != nil {
return "", err
}
defer f.Close()
tmpl, err := template.New("readme").Parse(`<!-- This is an auto-generated file. DO NOT EDIT -->
# {{.Name}}

* Needs: {{index .Annotations "workflows.argoproj.io/version"}}
* Image: {{.Spec.Sidecar.Container.Image}}

{{index .Annotations "workflows.argoproj.io/description"}}

Install:

kubectl apply -f {{.Name}}-executor-plugin-configmap.yaml

Uninstall:

kubectl delete cm {{.Name}}-executor-plugin
`)
if err != nil {
return "", err
}
if err = tmpl.Execute(f, plug); err != nil {
return "", err
}
return readmePath, nil
}
19 changes: 19 additions & 0 deletions cmd/argo/commands/executorplugin/root.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package executorplugin
alexec marked this conversation as resolved.
Show resolved Hide resolved

import (
"github.com/spf13/cobra"
)

func NewRootCommand() *cobra.Command {
command := &cobra.Command{
Use: "executor-plugin",
Short: "manage executor plugins",
Run: func(cmd *cobra.Command, args []string) {
cmd.HelpFunc()(cmd, args)
},
}

command.AddCommand(NewBuildCommand())

return command
}
2 changes: 1 addition & 1 deletion cmd/argo/commands/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func isNonBoundaryParentNode(node wfv1.NodeType) bool {
}

func isExecutionNode(node wfv1.NodeType) bool {
return (node == wfv1.NodeTypePod) || (node == wfv1.NodeTypeSkipped) || (node == wfv1.NodeTypeSuspend) || (node == wfv1.NodeTypeHTTP)
return (node == wfv1.NodeTypePod) || (node == wfv1.NodeTypeSkipped) || (node == wfv1.NodeTypeSuspend) || (node == wfv1.NodeTypeHTTP) || (node == wfv1.NodeTypePlugin)
}

func insertSorted(wf *wfv1.Workflow, sortedArray []renderNode, item renderNode) []renderNode {
Expand Down
2 changes: 2 additions & 0 deletions cmd/argo/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/argoproj/argo-workflows/v3/cmd/argo/commands/client"
"github.com/argoproj/argo-workflows/v3/cmd/argo/commands/clustertemplate"
"github.com/argoproj/argo-workflows/v3/cmd/argo/commands/cron"
"github.com/argoproj/argo-workflows/v3/cmd/argo/commands/executorplugin"
"github.com/argoproj/argo-workflows/v3/cmd/argo/commands/template"
cmdutil "github.com/argoproj/argo-workflows/v3/util/cmd"
)
Expand Down Expand Up @@ -110,6 +111,7 @@ If your server is behind an ingress with a path (you'll be running "argo server
command.AddCommand(template.NewTemplateCommand())
command.AddCommand(cron.NewCronWorkflowCommand())
command.AddCommand(clustertemplate.NewClusterTemplateCommand())
command.AddCommand(executorplugin.NewRootCommand())

client.AddKubectlFlagsToCmd(command)
client.AddAPIClientFlagsToCmd(command)
Expand Down
Loading