diff --git a/.gitignore b/.gitignore index bd94127d7fcd..bfb7f0584df3 100644 --- a/.gitignore +++ b/.gitignore @@ -34,3 +34,4 @@ git-ask-pass.sh /.brew_home /go-diagrams/ /.run/ +pprof diff --git a/Makefile b/Makefile index 9a8e693a84f1..041ecd0c9be4 100644 --- a/Makefile +++ b/Makefile @@ -27,6 +27,7 @@ KUBE_NAMESPACE ?= argo VERSION := latest DEV_IMAGE := true +DOCKER_PUSH := false # VERSION is the version to be used for files in manifests and should always be latest uunlesswe are releasing # we assume HEAD means you are on a tag @@ -73,6 +74,9 @@ NAMESPACED := true ifeq ($(PROFILE),prometheus) RUN_MODE := kubernetes endif +ifeq ($(PROFILE),stress) +RUN_MODE := kubernetes +endif ALWAYS_OFFLOAD_NODE_STATUS := false ifeq ($(PROFILE),mysql) @@ -144,6 +148,7 @@ define docker_build docker build --progress plain -t $(IMAGE_NAMESPACE)/$(1):$(VERSION) --target $(1) -f $(DOCKERFILE) --build-arg IMAGE_OS=$(OUTPUT_IMAGE_OS) --build-arg IMAGE_ARCH=$(OUTPUT_IMAGE_ARCH) . if [ $(DEV_IMAGE) = true ]; then mv $(2) dist/$(2)-$(OUTPUT_IMAGE_OS)-$(OUTPUT_IMAGE_ARCH); fi if [ $(K3D) = true ]; then k3d image import $(IMAGE_NAMESPACE)/$(1):$(VERSION); fi + if [ $(DOCKER_PUSH) = true ] && [ $(IMAGE_NAMESPACE) != argoproj ] ; then docker push $(IMAGE_NAMESPACE)/$(1):$(VERSION) ; fi touch $(3) endef define docker_pull @@ -390,15 +395,21 @@ endif test: server/static/files.go env KUBECONFIG=/dev/null $(GOTEST) ./... -dist/$(PROFILE).yaml: $(MANIFESTS) $(E2E_MANIFESTS) /usr/local/bin/kustomize - mkdir -p dist - kustomize build --load_restrictor=none test/e2e/manifests/$(PROFILE) | sed 's/:latest/:$(VERSION)/' | sed 's/pns/$(E2E_EXECUTOR)/' > dist/$(PROFILE).yaml - .PHONY: install -install: dist/$(PROFILE).yaml - cat test/e2e/manifests/argo-ns.yaml | sed 's/argo/$(KUBE_NAMESPACE)/' > dist/argo-ns.yaml - kubectl apply -f dist/argo-ns.yaml - kubectl -n $(KUBE_NAMESPACE) apply -l app.kubernetes.io/part-of=argo --prune --force -f dist/$(PROFILE).yaml +install: $(MANIFESTS) $(E2E_MANIFESTS) /usr/local/bin/kustomize + kubectl get ns $(KUBE_NAMESPACE) || kubectl create ns $(KUBE_NAMESPACE) + kubectl config set-context --current --namespace=$(KUBE_NAMESPACE) + @echo "installing PROFILE=$(PROFILE) VERSION=$(VERSION), E2E_EXECUTOR=$(E2E_EXECUTOR)" + kustomize build --load_restrictor=none test/e2e/manifests/$(PROFILE) | sed 's/image: argoproj/image: $(IMAGE_NAMESPACE)/' | sed 's/:latest/:$(VERSION)/' | sed 's/pns/$(E2E_EXECUTOR)/' | kubectl -n $(KUBE_NAMESPACE) apply -f - + kubectl -n $(KUBE_NAMESPACE) apply -f test/stress/massive-workflow.yaml + kubectl -n $(KUBE_NAMESPACE) rollout restart deploy workflow-controller + kubectl -n $(KUBE_NAMESPACE) rollout restart deploy argo-server + kubectl -n $(KUBE_NAMESPACE) rollout restart deploy minio +ifeq ($(RUN_MODE),kubernetes) + # scale to 2 replicas so we touch upon leader election + kubectl -n $(KUBE_NAMESPACE) scale deploy/workflow-controller --replicas 2 + kubectl -n $(KUBE_NAMESPACE) scale deploy/argo-server --replicas 1 +endif .PHONY: pull-build-images pull-build-images: @@ -421,29 +432,25 @@ test-images: $(call docker_pull,argoproj/argosay:v2) $(call docker_pull,python:alpine3.6) -.PHONY: stop -stop: - killall argo workflow-controller kubectl || true - $(GOPATH)/bin/goreman: go get github.com/mattn/goreman .PHONY: start -start: stop install controller cli executor-image $(GOPATH)/bin/goreman - kubectl config set-context --current --namespace=$(KUBE_NAMESPACE) ifeq ($(RUN_MODE),kubernetes) - $(MAKE) controller-image cli-image - kubectl -n $(KUBE_NAMESPACE) scale deploy/workflow-controller --replicas 1 - kubectl -n $(KUBE_NAMESPACE) scale deploy/argo-server --replicas 1 +start: controller-image cli-image install executor-image +else +start: install controller cli executor-image $(GOPATH)/bin/goreman endif ifeq ($(RUN_MODE),kubernetes) - kubectl -n $(KUBE_NAMESPACE) wait --for=condition=Ready pod -l app=argo-server --timeout 1m - kubectl -n $(KUBE_NAMESPACE) wait --for=condition=Ready pod -l app=workflow-controller --timeout 1m + kubectl -n $(KUBE_NAMESPACE) wait --for=condition=Available deploy argo-server + kubectl -n $(KUBE_NAMESPACE) wait --for=condition=Available deploy workflow-controller endif ifeq ($(PROFILE),prometheus) - kubectl -n $(KUBE_NAMESPACE) wait --for=condition=Ready pod -l app=prometheus --timeout 1m + kubectl -n $(KUBE_NAMESPACE) wait --for=condition=Available deploy prometheus +endif +ifeq ($(PROFILE),stress) + kubectl -n $(KUBE_NAMESPACE) wait --for=condition=Available deploy prometheus endif - ./hack/port-forward.sh # Check dex, minio, postgres and mysql are in hosts file ifeq ($(AUTH_MODE),sso) grep '127.0.0.1[[:blank:]]*dex' /etc/hosts @@ -451,6 +458,9 @@ endif grep '127.0.0.1[[:blank:]]*minio' /etc/hosts grep '127.0.0.1[[:blank:]]*postgres' /etc/hosts grep '127.0.0.1[[:blank:]]*mysql' /etc/hosts + # allow time for pods to terminate + sleep 10s + ./hack/port-forward.sh ifeq ($(RUN_MODE),local) env DEFAULT_REQUEUE_TIME=$(DEFAULT_REQUEUE_TIME) SECURE=$(SECURE) ALWAYS_OFFLOAD_NODE_STATUS=$(ALWAYS_OFFLOAD_NODE_STATUS) LOG_LEVEL=$(LOG_LEVEL) UPPERIO_DB_DEBUG=$(UPPERIO_DB_DEBUG) VERSION=$(VERSION) AUTH_MODE=$(AUTH_MODE) NAMESPACED=$(NAMESPACED) NAMESPACE=$(KUBE_NAMESPACE) $(GOPATH)/bin/goreman -set-ports=false -logtime=false start endif diff --git a/cmd/workflow-controller/main.go b/cmd/workflow-controller/main.go index 5e8d5aed1dba..354714bb53c6 100644 --- a/cmd/workflow-controller/main.go +++ b/cmd/workflow-controller/main.go @@ -3,6 +3,8 @@ package main import ( "context" "fmt" + "net/http" + _ "net/http/pprof" "os" "time" @@ -21,6 +23,7 @@ import ( wfclientset "github.com/argoproj/argo/pkg/client/clientset/versioned" cmdutil "github.com/argoproj/argo/util/cmd" "github.com/argoproj/argo/workflow/controller" + "github.com/argoproj/argo/workflow/metrics" ) const ( @@ -64,6 +67,8 @@ func NewRootCommand() *cobra.Command { config.Burst = burst config.QPS = qps + metrics.AddMetricsTransportWrapper(config) + namespace, _, err := clientConfig.Namespace() if err != nil { return err @@ -89,6 +94,10 @@ func NewRootCommand() *cobra.Command { go wfController.Run(ctx, workflowWorkers, workflowTTLWorkers, podWorkers, podCleanupWorkers) + go func() { + log.Println(http.ListenAndServe("localhost:6060", nil)) + }() + // Wait forever select {} }, diff --git a/docs/metrics.md b/docs/metrics.md index 2d233d31869b..d9c2e1058cab 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -61,15 +61,65 @@ a way to view and analyze historical data, consider the [workflow archive](workf ### Default Controller Metrics -There are several controller-level metrics. These include: - -* `workflows_processed_count`: a count of all Workflow updates processed by the controller -* `count`: a count of all workflows currently accessible by the controller by status -* `operation_duration_seconds`: a histogram of durations of operations -* `error_count`: a count of certain errors incurred by the controller -* `queue_depth_count`: the depth of the queue of workflows or cron workflows to be processed by the controller -* `queue_adds_count`: the number of adds to the queue of workflows or cron workflows -* `queue_latency`: the time workflows or cron workflows spend in the queue waiting to be processed +Metrics for the Four Golden Signals are: + +* Latency: `argo_workflows_queue_latency` +* Traffic: `argo_workflows_count` and `argo_workflows_queue_depth_count` +* Errors: `argo_workflows_count` and `argo_workflows_error_count` +* Saturation: `argo_workflows_workers_busy` and `argo_workflows_workflow_condition` + + + +#### argo_pod_missing + +Pods were not seen. E.g. by being deleted by Kubernetes. You should only see this under high load. + +!!! NOTE + This metric's name starts with `argo_` not `argo_workflows_`. + +#### argo_workflows_count + +Number of workflow in each phase. The `Running` count does not mean that a workflows pods are running, just that the controller has scheduled them. A workflow can be stuck in `Running` with pending pods for a long time. + +#### argo_workflows_error_count + +A count of certain errors incurred by the controller. + +#### argo_workflows_k8s_request_total + +Number of API requests sent to the Kubernetes API. + +#### argo_workflows_operation_duration_seconds + +A histogram of durations of operations. + +#### argo_workflows_pods_count + +It is possible for a workflow to start, but no pods be running (e.g. cluster is too busy to run them). This metric sheds light on actual work being done. + +#### argo_workflows_queue_adds_count + +The number of additions to the queue of workflows or cron workflows. + +#### argo_workflows_queue_depth_count + +The depth of the queue of workflows or cron workflows to be processed by the controller. + +#### argo_workflows_queue_latency + +The time workflows or cron workflows spend in the queue waiting to be processed. + +#### argo_workflows_workers_busy + +The number of workers that are busy. + +#### argo_workflows_workflow_condition + +The number of workflow with different conditions. This will tell you the number of workflows with running pods. + +#### argo_workflows_workflows_processed_count + +A count of all Workflow updates processed by the controller. ### Metric types diff --git a/docs/stress-testing.md b/docs/stress-testing.md new file mode 100644 index 000000000000..a3b0fb01f92e --- /dev/null +++ b/docs/stress-testing.md @@ -0,0 +1,32 @@ +# Stress Testing + +Create a cluster in [`jesse-sb` project](https://console.cloud.google.com/access/iam?cloudshell=false&project=jesse-sb). + +Install `gcloud` binary. + +Login to GCP: `gloud auth login` + +Connect to your new cluster. + +Make sure you've logged in to Docker Hub: `docker login` + +Run `make start PROFILE=stress IMAGE_NAMESPACE=alexcollinsintuit DOCKER_PUSH=true`. + +If this fails, just try running it again. + +Open http://localhost:2746 and check you can run a workflow. + +Open `test/stress/main.go` and run it with a small number (e.g. 10) workflows and make sure they complete. + +Do you get `ImagePullBackOff`? Make sure image is `argoproj/argosay:v2` in `kubectl -n argo edit workflowtemplate massive-workflow`. + +Open http://localhost:9091/graph. + +You can use [this Tab Auto Refresh Chrome extension](https://chrome.google.com/webstore/detail/tab-auto-refresh/oomoeacogjkolheacgdkkkhbjipaomkn) to auto-refresh the page. + +Open `test/stress/main.go` and run it with a large number (e.g. 10000). + +Use Prometheus to analyse this. + +Finally, you can capture PProf using `./hack/capture-pprof.sh`. + diff --git a/hack/capture-pprof.sh b/hack/capture-pprof.sh new file mode 100755 index 000000000000..dc0d66bd5ebb --- /dev/null +++ b/hack/capture-pprof.sh @@ -0,0 +1,14 @@ +#!/usr/bin/env bash +set -eu -o pipefail + +echo " https://blog.golang.org/pprof" + +cd $(dirname $0)/.. + +n=$(date +%s) + +go tool pprof -png -output dist/heap-$n.png http://localhost:6060/debug/pprof/heap +go tool pprof -png -output dist/allocs-$n.png http://localhost:6060/debug/pprof/allocs +go tool pprof -png -output dist/block-$n.png http://localhost:6060/debug/pprof/block +go tool pprof -png -output dist/mutex-$n.png http://localhost:6060/debug/pprof/mutex +go tool pprof -png -output dist/profile-$n.png http://localhost:6060/debug/pprof/profile?seconds=30 \ No newline at end of file diff --git a/hack/port-forward.sh b/hack/port-forward.sh index e6d37500e417..d200c154b688 100755 --- a/hack/port-forward.sh +++ b/hack/port-forward.sh @@ -21,7 +21,11 @@ info() { echo '[INFO] ' "$@" } -pf MinIO pod/minio 9000 +killall kubectl || true + +if [[ "$(kubectl -n argo get pod -l app=minio -o name)" != "" ]]; then + pf MinIO deploy/minio 9000 +fi dex=$(kubectl -n argo get pod -l app=dex -o name) if [[ "$dex" != "" ]]; then @@ -39,13 +43,15 @@ if [[ "$mysql" != "" ]]; then fi if [[ "$(kubectl -n argo get pod -l app=argo-server -o name)" != "" ]]; then - pf "Argo Server" deploy/argo-server 2746 + pf "Argo Server" svc/argo-server 2746 fi if [[ "$(kubectl -n argo get pod -l app=workflow-controller -o name)" != "" ]]; then - pf "Workflow Controller" deploy/workflow-controller 9090 + pf "Workflow Controller Metrics" svc/workflow-controller-metrics 9090 + pf "Workflow Controller PProf" svc/workflow-controller-pprof 6060 fi if [[ "$(kubectl -n argo get pod -l app=prometheus -o name)" != "" ]]; then - pf "Prometheus Server" deploy/prometheus 9091 9090 + pf "Prometheus Server" svc/prometheus 9091 9090 fi + diff --git a/manifests/quick-start-minimal.yaml b/manifests/quick-start-minimal.yaml index 67e3819c8b1c..b3711714b8ec 100644 --- a/manifests/quick-start-minimal.yaml +++ b/manifests/quick-start-minimal.yaml @@ -502,6 +502,11 @@ data: secretKeySecret: name: my-minio-cred key: secretkey + executor: | + resources: + requests: + cpu: 10m + memory: 64Mi links: | - name: Example Workflow Link scope: workflow @@ -645,6 +650,55 @@ spec: --- apiVersion: apps/v1 kind: Deployment +metadata: + labels: + app: minio + name: minio +spec: + selector: + matchLabels: + app: minio + template: + metadata: + labels: + app: minio + spec: + containers: + - command: + - minio + - server + - /data + env: + - name: MINIO_ACCESS_KEY + value: admin + - name: MINIO_SECRET_KEY + value: password + image: minio/minio:RELEASE.2019-12-17T23-16-33Z + lifecycle: + postStart: + exec: + command: + - mkdir + - -p + - /data/my-bucket + livenessProbe: + httpGet: + path: /minio/health/live + port: 9000 + initialDelaySeconds: 5 + periodSeconds: 10 + name: main + ports: + - containerPort: 9000 + readinessProbe: + httpGet: + path: /minio/health/ready + port: 9000 + initialDelaySeconds: 5 + periodSeconds: 10 +--- +apiVersion: apps/v1 +kind: Deployment metadata: name: workflow-controller spec: @@ -691,44 +745,3 @@ spec: securityContext: runAsNonRoot: true serviceAccountName: argo ---- -apiVersion: v1 -kind: Pod -metadata: - labels: - app: minio - name: minio -spec: - containers: - - command: - - minio - - server - - /data - env: - - name: MINIO_ACCESS_KEY - value: admin - - name: MINIO_SECRET_KEY - value: password - image: minio/minio:RELEASE.2019-12-17T23-16-33Z - lifecycle: - postStart: - exec: - command: - - mkdir - - -p - - /data/my-bucket - livenessProbe: - httpGet: - path: /minio/health/live - port: 9000 - initialDelaySeconds: 5 - periodSeconds: 10 - name: main - ports: - - containerPort: 9000 - readinessProbe: - httpGet: - path: /minio/health/ready - port: 9000 - initialDelaySeconds: 5 - periodSeconds: 10 diff --git a/manifests/quick-start-mysql.yaml b/manifests/quick-start-mysql.yaml index 56ee0dfdc13e..2fde5fb7d991 100644 --- a/manifests/quick-start-mysql.yaml +++ b/manifests/quick-start-mysql.yaml @@ -502,6 +502,11 @@ data: secretKeySecret: name: my-minio-cred key: secretkey + executor: | + resources: + requests: + cpu: 10m + memory: 64Mi links: | - name: Example Workflow Link scope: workflow @@ -689,6 +694,55 @@ spec: --- apiVersion: apps/v1 kind: Deployment +metadata: + labels: + app: minio + name: minio +spec: + selector: + matchLabels: + app: minio + template: + metadata: + labels: + app: minio + spec: + containers: + - command: + - minio + - server + - /data + env: + - name: MINIO_ACCESS_KEY + value: admin + - name: MINIO_SECRET_KEY + value: password + image: minio/minio:RELEASE.2019-12-17T23-16-33Z + lifecycle: + postStart: + exec: + command: + - mkdir + - -p + - /data/my-bucket + livenessProbe: + httpGet: + path: /minio/health/live + port: 9000 + initialDelaySeconds: 5 + periodSeconds: 10 + name: main + ports: + - containerPort: 9000 + readinessProbe: + httpGet: + path: /minio/health/ready + port: 9000 + initialDelaySeconds: 5 + periodSeconds: 10 +--- +apiVersion: apps/v1 +kind: Deployment metadata: labels: app: mysql @@ -780,44 +834,3 @@ spec: securityContext: runAsNonRoot: true serviceAccountName: argo ---- -apiVersion: v1 -kind: Pod -metadata: - labels: - app: minio - name: minio -spec: - containers: - - command: - - minio - - server - - /data - env: - - name: MINIO_ACCESS_KEY - value: admin - - name: MINIO_SECRET_KEY - value: password - image: minio/minio:RELEASE.2019-12-17T23-16-33Z - lifecycle: - postStart: - exec: - command: - - mkdir - - -p - - /data/my-bucket - livenessProbe: - httpGet: - path: /minio/health/live - port: 9000 - initialDelaySeconds: 5 - periodSeconds: 10 - name: main - ports: - - containerPort: 9000 - readinessProbe: - httpGet: - path: /minio/health/ready - port: 9000 - initialDelaySeconds: 5 - periodSeconds: 10 diff --git a/manifests/quick-start-postgres.yaml b/manifests/quick-start-postgres.yaml index f7d900baaee3..c6c1e16d030f 100644 --- a/manifests/quick-start-postgres.yaml +++ b/manifests/quick-start-postgres.yaml @@ -502,6 +502,11 @@ data: secretKeySecret: name: my-minio-cred key: secretkey + executor: | + resources: + requests: + cpu: 10m + memory: 64Mi links: | - name: Example Workflow Link scope: workflow @@ -689,6 +694,55 @@ spec: --- apiVersion: apps/v1 kind: Deployment +metadata: + labels: + app: minio + name: minio +spec: + selector: + matchLabels: + app: minio + template: + metadata: + labels: + app: minio + spec: + containers: + - command: + - minio + - server + - /data + env: + - name: MINIO_ACCESS_KEY + value: admin + - name: MINIO_SECRET_KEY + value: password + image: minio/minio:RELEASE.2019-12-17T23-16-33Z + lifecycle: + postStart: + exec: + command: + - mkdir + - -p + - /data/my-bucket + livenessProbe: + httpGet: + path: /minio/health/live + port: 9000 + initialDelaySeconds: 5 + periodSeconds: 10 + name: main + ports: + - containerPort: 9000 + readinessProbe: + httpGet: + path: /minio/health/ready + port: 9000 + initialDelaySeconds: 5 + periodSeconds: 10 +--- +apiVersion: apps/v1 +kind: Deployment metadata: labels: app: postgres @@ -772,44 +826,3 @@ spec: securityContext: runAsNonRoot: true serviceAccountName: argo ---- -apiVersion: v1 -kind: Pod -metadata: - labels: - app: minio - name: minio -spec: - containers: - - command: - - minio - - server - - /data - env: - - name: MINIO_ACCESS_KEY - value: admin - - name: MINIO_SECRET_KEY - value: password - image: minio/minio:RELEASE.2019-12-17T23-16-33Z - lifecycle: - postStart: - exec: - command: - - mkdir - - -p - - /data/my-bucket - livenessProbe: - httpGet: - path: /minio/health/live - port: 9000 - initialDelaySeconds: 5 - periodSeconds: 10 - name: main - ports: - - containerPort: 9000 - readinessProbe: - httpGet: - path: /minio/health/ready - port: 9000 - initialDelaySeconds: 5 - periodSeconds: 10 diff --git a/manifests/quick-start/base/minio/kustomization.yaml b/manifests/quick-start/base/minio/kustomization.yaml index dccbebf6547c..2d81059de416 100644 --- a/manifests/quick-start/base/minio/kustomization.yaml +++ b/manifests/quick-start/base/minio/kustomization.yaml @@ -2,6 +2,6 @@ apiVersion: kustomize.config.k8s.io/v1beta1 kind: Kustomization resources: - - minio-pod.yaml + - minio-deploy.yaml - minio-service.yaml - my-minio-cred-secret.yaml \ No newline at end of file diff --git a/manifests/quick-start/base/minio/minio-deploy.yaml b/manifests/quick-start/base/minio/minio-deploy.yaml new file mode 100644 index 000000000000..967b917413da --- /dev/null +++ b/manifests/quick-start/base/minio/minio-deploy.yaml @@ -0,0 +1,42 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: minio + labels: + app: minio +spec: + selector: + matchLabels: + app: minio + template: + metadata: + labels: + app: minio + spec: + containers: + - name: main + image: minio/minio:RELEASE.2019-12-17T23-16-33Z + env: + - name: MINIO_ACCESS_KEY + value: admin + - name: MINIO_SECRET_KEY + value: password + ports: + - containerPort: 9000 + command: [minio, server, /data] + lifecycle: + postStart: + exec: + command: [mkdir, -p, /data/my-bucket] + readinessProbe: + httpGet: + path: /minio/health/ready + port: 9000 + initialDelaySeconds: 5 + periodSeconds: 10 + livenessProbe: + httpGet: + path: /minio/health/live + port: 9000 + initialDelaySeconds: 5 + periodSeconds: 10 \ No newline at end of file diff --git a/manifests/quick-start/base/overlays/workflow-controller-configmap.yaml b/manifests/quick-start/base/overlays/workflow-controller-configmap.yaml index 4a118d21dc98..2c2bb72792f8 100644 --- a/manifests/quick-start/base/overlays/workflow-controller-configmap.yaml +++ b/manifests/quick-start/base/overlays/workflow-controller-configmap.yaml @@ -1,5 +1,10 @@ apiVersion: v1 data: + executor: | + resources: + requests: + cpu: 10m + memory: 64Mi artifactRepository: | archiveLogs: true s3: diff --git a/manifests/quick-start/base/prometheus/kustomization.yaml b/manifests/quick-start/base/prometheus/kustomization.yaml index 5d4cf2f4c949..4fa48f13b706 100644 --- a/manifests/quick-start/base/prometheus/kustomization.yaml +++ b/manifests/quick-start/base/prometheus/kustomization.yaml @@ -4,3 +4,4 @@ kind: Kustomization resources: - prometheus-deployment.yaml - prometheus-config-cluster.yaml + - prometheus-service.yaml diff --git a/manifests/quick-start/base/prometheus/prometheus-service.yaml b/manifests/quick-start/base/prometheus/prometheus-service.yaml new file mode 100644 index 000000000000..4f07a56d0fe3 --- /dev/null +++ b/manifests/quick-start/base/prometheus/prometheus-service.yaml @@ -0,0 +1,10 @@ +apiVersion: v1 +kind: Service +metadata: + name: prometheus +spec: + selector: + app: prometheus + ports: + - name: metrics + port: 9090 diff --git a/mkdocs.yml b/mkdocs.yml index 2e5756416b8d..1fd5822e31a5 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -144,6 +144,7 @@ nav: - versioning.md - public-api.md - static-code-analysis.md + - stress-testing.md - releasing.md - Releases ⧉: https://github.com/argoproj/argo/releases - Roadmap: roadmap.md diff --git a/pkg/apis/workflow/v1alpha1/workflow_types.go b/pkg/apis/workflow/v1alpha1/workflow_types.go index e749538c765c..1cb51bb2faee 100644 --- a/pkg/apis/workflow/v1alpha1/workflow_types.go +++ b/pkg/apis/workflow/v1alpha1/workflow_types.go @@ -1475,6 +1475,8 @@ type ConditionType string const ( // ConditionTypeCompleted is a signifies the workflow has completed ConditionTypeCompleted ConditionType = "Completed" + // ConditionTypePodRunning any workflow pods are currently running + ConditionTypePodRunning ConditionType = "PodRunning" // ConditionTypeSpecWarning is a warning on the current application spec ConditionTypeSpecWarning ConditionType = "SpecWarning" // ConditionTypeSpecWarning is an error on the current application spec diff --git a/test/e2e/functional_test.go b/test/e2e/functional_test.go index 72ec08e7a596..c86fd33dbed4 100644 --- a/test/e2e/functional_test.go +++ b/test/e2e/functional_test.go @@ -1005,7 +1005,7 @@ spec: `). When(). SubmitWorkflow(). - Wait(10 * time.Second). + WaitForWorkflow(). Then(). ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) { node := status.Nodes.FindByDisplayName("print-hello") diff --git a/test/e2e/manifests/stress/kustomization.yaml b/test/e2e/manifests/stress/kustomization.yaml new file mode 100644 index 000000000000..0aaeb5afe185 --- /dev/null +++ b/test/e2e/manifests/stress/kustomization.yaml @@ -0,0 +1,13 @@ +apiVersion: kustomize.config.k8s.io/v1beta1 +kind: Kustomization + +resources: + - ../../../../manifests/quick-start/minimal + - ../../../../manifests/quick-start/base/prometheus + - workflow-controller-pprof-service.yaml + +patchesStrategicMerge: + - workflow-controller-deployment.yaml + +commonLabels: + "app.kubernetes.io/part-of": "argo" diff --git a/test/e2e/manifests/stress/workflow-controller-deployment.yaml b/test/e2e/manifests/stress/workflow-controller-deployment.yaml new file mode 100644 index 000000000000..15000b213588 --- /dev/null +++ b/test/e2e/manifests/stress/workflow-controller-deployment.yaml @@ -0,0 +1,17 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: workflow-controller +spec: + template: + spec: + containers: + - name: workflow-controller + args: + - --loglevel=warn + - --configmap=workflow-controller-configmap + - --executor-image=argoproj/argoexec:latest + - --namespaced + - --burst=2048 + - --qps=512 + - --workflow-workers=128 diff --git a/test/e2e/manifests/stress/workflow-controller-pprof-service.yaml b/test/e2e/manifests/stress/workflow-controller-pprof-service.yaml new file mode 100644 index 000000000000..f3807c7e6a72 --- /dev/null +++ b/test/e2e/manifests/stress/workflow-controller-pprof-service.yaml @@ -0,0 +1,10 @@ +apiVersion: v1 +kind: Service +metadata: + name: workflow-controller-pprof +spec: + selector: + app: workflow-controller + ports: + - name: metrics + port: 6060 diff --git a/test/e2e/metrics_test.go b/test/e2e/metrics_test.go index e46f18679998..40725ba0bbf3 100644 --- a/test/e2e/metrics_test.go +++ b/test/e2e/metrics_test.go @@ -41,6 +41,15 @@ func (s *MetricsSuite) TestMetricsEndpoint() { Expect(). Status(200). Body(). + Contains(`HELP argo_workflows_count`). + Contains(`HELP argo_workflows_k8s_request_total`). + Contains(`argo_workflows_k8s_request_total{kind="leases",status_code="200",verb="Get"}`). + Contains(`argo_workflows_k8s_request_total{kind="workflowtemplates",status_code="200",verb="List"}`). + Contains(`argo_workflows_k8s_request_total{kind="workflowtemplates",status_code="200",verb="Watch"}`). + Contains(`HELP argo_workflows_pods_count`). + Contains(`HELP argo_workflows_workers_busy`). + Contains(`HELP argo_workflows_workflow_condition`). + Contains(`HELP argo_workflows_workflows_processed_count`). Contains(`log_messages{level="info"}`). Contains(`log_messages{level="warning"}`). Contains(`log_messages{level="error"}`) diff --git a/test/e2e/stress/many-massive-workflows.yaml b/test/e2e/stress/many-massive-workflows.yaml deleted file mode 100644 index b650f65d5385..000000000000 --- a/test/e2e/stress/many-massive-workflows.yaml +++ /dev/null @@ -1,96 +0,0 @@ ---- -apiVersion: argoproj.io/v1alpha1 -kind: WorkflowTemplate -metadata: - name: massive-workflow - labels: - stress: "true" -spec: - entrypoint: main - arguments: - parameters: - - name: "nodes" - value: "1" - - name: "sleep" - value: "1s" - artifactRepositoryRef: - key: empty - ttlStrategy: - secondsAfterCompletion: 60 - podGC: - strategy: OnPodCompletion - templates: - - name: main - dag: - tasks: - - name: sleep - template: sleep - withSequence: - count: "{{workflow.parameters.nodes}}" - - name: sleep - container: - image: argoproj/argosay:v2 - args: - - sleep - - "{{workflow.parameters.sleep}}" ---- -apiVersion: argoproj.io/v1alpha1 -kind: WorkflowTemplate -metadata: - name: many-massive-workflows - labels: - stress: "true" -spec: - artifactRepositoryRef: - key: empty - ttlStrategy: - secondsAfterCompletion: 60 - podGC: - strategy: OnPodCompletion - entrypoint: main - arguments: - parameters: - - name: "workflows" - value: "1" - - name: "nodes" - value: "1" - - name: "sleep" - value: "1s" - workflowMetadata: - labels: - stress: "true" - templates: - - name: main - dag: - tasks: - - name: create-workflow - template: create-workflow - arguments: - parameters: - - name: i - value: "{{item}}" - withSequence: - count: "{{workflow.parameters.workflows}}" - - name: create-workflow - inputs: - parameters: - - name: i - resource: - action: create - setOwnerReference: true - manifest: | - apiVersion: argoproj.io/v1alpha1 - kind: Workflow - metadata: - name: "large-workflow-{{workflow.parameters.workflows}}x{{workflow.parameters.nodes}}-{{inputs.parameters.i}}" - labels: - stress: "true" - spec: - arguments: - parameters: - - name: "nodes" - value: "{{workflow.parameters.nodes}}" - - name: "sleep" - value: "{{workflow.parameters.sleep}}" - workflowTemplateRef: - name: massive-workflow diff --git a/test/stress/massive-workflow.yaml b/test/stress/massive-workflow.yaml new file mode 100644 index 000000000000..3fb0bcd78adf --- /dev/null +++ b/test/stress/massive-workflow.yaml @@ -0,0 +1,41 @@ +--- +apiVersion: argoproj.io/v1alpha1 +kind: WorkflowTemplate +metadata: + name: massive + labels: + stress: "true" +spec: + entrypoint: main + arguments: + parameters: + - name: "nodes" + value: "1" + - name: "sleep" + value: "1s" + ttlStrategy: + secondsAfterSuccess: 60 + podGC: + strategy: OnPodSuccess + templates: + - name: main + dag: + tasks: + - name: sleep + template: sleep + withSequence: + count: "{{workflow.parameters.nodes}}" + - name: sleep + metadata: + labels: + stress: "true" + container: + image: argoproj/argosay:v2 + imagePullPolicy: IfNotPresent + resources: + requests: + memory: 2Mi + cpu: 10m + args: + - sleep + - "{{workflow.parameters.sleep}}" diff --git a/test/e2e/stress/pod-limits.yaml b/test/stress/pod-limits.yaml similarity index 100% rename from test/e2e/stress/pod-limits.yaml rename to test/stress/pod-limits.yaml diff --git a/test/stress/tool/main.go b/test/stress/tool/main.go new file mode 100644 index 000000000000..2748736d7211 --- /dev/null +++ b/test/stress/tool/main.go @@ -0,0 +1,55 @@ +package main + +import ( + "context" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/clientcmd" + "sigs.k8s.io/yaml" + + wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" + "github.com/argoproj/argo/pkg/client/clientset/versioned" +) + +func main() { + + loadingRules := clientcmd.NewDefaultClientConfigLoadingRules() + configOverrides := &clientcmd.ConfigOverrides{} + kubeConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, configOverrides) + config, err := kubeConfig.ClientConfig() + if err != nil { + panic(err) + } + config.QPS = 512 + namespace, _, _ := kubeConfig.Namespace() + w := versioned.NewForConfigOrDie(config).ArgoprojV1alpha1().Workflows(namespace) + + wf := &wfv1.Workflow{} + err = yaml.Unmarshal([]byte(` +metadata: + generateName: stress- + labels: + stress: "true" +spec: + arguments: + parameters: + - name: nodes + value: "2" + - name: sleep + value: "30s" + workflowTemplateRef: + name: massive +`), wf) + if err != nil { + panic(err) + } + + ctx := context.Background() + for i := 0; i < 100; i++ { + _, err := w.Create(ctx, wf, metav1.CreateOptions{}) + if err != nil { + panic(err) + } + print(i, " ") + } +} diff --git a/util/unstructured/workflow/conditions.go b/util/unstructured/workflow/conditions.go new file mode 100644 index 000000000000..873805041ab2 --- /dev/null +++ b/util/unstructured/workflow/conditions.go @@ -0,0 +1,28 @@ +package workflow + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + + wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" +) + +// GetConditions returns the conditions, excluding the `message` field. +func GetConditions(un *unstructured.Unstructured) wfv1.Conditions { + if un == nil { + return nil + } + items, _, _ := unstructured.NestedSlice(un.Object, "status", "conditions") + var x wfv1.Conditions + for _, item := range items { + m, ok := item.(map[string]interface{}) + if !ok { + return nil + } + x = append(x, wfv1.Condition{ + Type: wfv1.ConditionType(m["type"].(string)), + Status: metav1.ConditionStatus(m["status"].(string)), + }) + } + return x +} diff --git a/util/unstructured/workflow/conditions_test.go b/util/unstructured/workflow/conditions_test.go new file mode 100644 index 000000000000..0044583a3a41 --- /dev/null +++ b/util/unstructured/workflow/conditions_test.go @@ -0,0 +1,29 @@ +package workflow + +import ( + "testing" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + + wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" + "github.com/argoproj/argo/workflow/util" +) + +func TestGetConditions(t *testing.T) { + t.Run("Nil", func(t *testing.T) { + un := &unstructured.Unstructured{Object: map[string]interface{}{}} + + assert.Nil(t, GetConditions(un)) + }) + t.Run("Some", func(t *testing.T) { + un, _ := util.ToUnstructured(&wfv1.Workflow{ + Status: wfv1.WorkflowStatus{ + Conditions: wfv1.Conditions{{Type: wfv1.ConditionTypeCompleted, Status: corev1.ConditionTrue}}, + }, + }) + + assert.Equal(t, wfv1.Conditions{{Type: wfv1.ConditionTypeCompleted, Status: corev1.ConditionTrue}}, GetConditions(un)) + }) +} diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 27e019dee439..67fa259236e7 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -171,6 +171,7 @@ var indexers = cache.Indexers{ indexes.WorkflowTemplateIndex: indexes.MetaNamespaceLabelIndexFunc(common.LabelKeyWorkflowTemplate), indexes.SemaphoreConfigIndexName: indexes.WorkflowSemaphoreKeysIndexFunc(), indexes.WorkflowPhaseIndex: indexes.MetaWorkflowPhaseIndexFunc(), + indexes.ConditionsIndex: indexes.ConditionsIndexFunc, } // Run starts an Workflow resource controller @@ -1002,18 +1003,23 @@ func (wfc *WorkflowController) isArchivable(wf *wfv1.Workflow) bool { func (wfc *WorkflowController) syncWorkflowPhaseMetrics() { for _, phase := range []wfv1.NodePhase{wfv1.NodePending, wfv1.NodeRunning, wfv1.NodeSucceeded, wfv1.NodeFailed, wfv1.NodeError} { - objs, err := wfc.wfInformer.GetIndexer().ByIndex(indexes.WorkflowPhaseIndex, string(phase)) - if err != nil { - log.WithError(err).Errorf("failed to list workflows by '%s'", phase) - continue - } - wfc.metrics.SetWorkflowPhaseGauge(phase, len(objs)) + keys, err := wfc.wfInformer.GetIndexer().IndexKeys(indexes.WorkflowPhaseIndex, string(phase)) + errors.CheckError(err) + wfc.metrics.SetWorkflowPhaseGauge(phase, len(keys)) + } + for _, x := range []wfv1.Condition{ + {Type: wfv1.ConditionTypePodRunning, Status: metav1.ConditionTrue}, + {Type: wfv1.ConditionTypePodRunning, Status: metav1.ConditionFalse}, + } { + keys, err := wfc.wfInformer.GetIndexer().IndexKeys(indexes.ConditionsIndex, indexes.ConditionValue(x)) + errors.CheckError(err) + metrics.WorkflowConditionMetric.WithLabelValues(string(x.Type), string(x.Status)).Set(float64(len(keys))) } } func (wfc *WorkflowController) syncPodPhaseMetrics() { for _, phase := range []apiv1.PodPhase{apiv1.PodRunning, apiv1.PodPending} { - objs, err := wfc.podInformer.GetIndexer().ByIndex(indexes.PodPhaseIndex, string(phase)) + objs, err := wfc.podInformer.GetIndexer().IndexKeys(indexes.PodPhaseIndex, string(phase)) if err != nil { log.WithError(err).Error("failed to list active pods") return diff --git a/workflow/controller/controller_test.go b/workflow/controller/controller_test.go index 5db53092ddd8..5f08ec2216f3 100644 --- a/workflow/controller/controller_test.go +++ b/workflow/controller/controller_test.go @@ -280,7 +280,7 @@ func makePodsPhase(ctx context.Context, woc *wfOperationCtx, phase apiv1.PodPhas panic(err) } for _, pod := range pods.Items { - if pod.Status.Phase == "" { + if pod.Status.Phase != phase { pod.Status.Phase = phase if phase == apiv1.PodFailed { pod.Status.Message = "Pod failed" diff --git a/workflow/controller/indexes/conditions_index.go b/workflow/controller/indexes/conditions_index.go new file mode 100644 index 000000000000..8d257701d138 --- /dev/null +++ b/workflow/controller/indexes/conditions_index.go @@ -0,0 +1,22 @@ +package indexes + +import ( + "fmt" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + + wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" + unwf "github.com/argoproj/argo/util/unstructured/workflow" +) + +func ConditionsIndexFunc(obj interface{}) ([]string, error) { + var values []string + for _, x := range unwf.GetConditions(obj.(*unstructured.Unstructured)) { + values = append(values, ConditionValue(x)) + } + return values, nil +} + +func ConditionValue(x wfv1.Condition) string { + return fmt.Sprintf("%s/%s", x.Type, x.Status) +} diff --git a/workflow/controller/indexes/conditions_index_test.go b/workflow/controller/indexes/conditions_index_test.go new file mode 100644 index 000000000000..ca9ce6b11936 --- /dev/null +++ b/workflow/controller/indexes/conditions_index_test.go @@ -0,0 +1,30 @@ +package indexes + +import ( + "testing" + + "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" + "github.com/argoproj/argo/workflow/util" +) + +func TestConditionsIndexFunc(t *testing.T) { + t.Run("Nil", func(t *testing.T) { + un, _ := util.ToUnstructured(&wfv1.Workflow{}) + strings, _ := ConditionsIndexFunc(un) + assert.Nil(t, strings) + }) + t.Run("Some", func(t *testing.T) { + un, _ := util.ToUnstructured(&wfv1.Workflow{Status: wfv1.WorkflowStatus{ + Conditions: wfv1.Conditions{{ + Type: wfv1.ConditionTypePodRunning, + Status: metav1.ConditionTrue, + Message: "ignored", + }}, + }}) + strings, _ := ConditionsIndexFunc(un) + assert.Equal(t, []string{"PodRunning/True"}, strings) + }) +} diff --git a/workflow/controller/indexes/indexes.go b/workflow/controller/indexes/indexes.go index 71ba7756a20b..98758e6661b8 100644 --- a/workflow/controller/indexes/indexes.go +++ b/workflow/controller/indexes/indexes.go @@ -13,5 +13,6 @@ const ( WorkflowTemplateIndex = "workflowtemplate" WorkflowPhaseIndex = "workflow.phase" PodPhaseIndex = "pod.phase" + ConditionsIndex = "status.conditions" SemaphoreConfigIndexName = "bySemaphoreConfigMap" ) diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 86359277bf1b..fa9f0f5c4216 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -524,7 +524,7 @@ func (woc *wfOperationCtx) persistUpdates(ctx context.Context) { // Release all acquired lock for completed workflow if woc.wf.Status.Synchronization != nil && woc.wf.Status.Fulfilled() { if woc.controller.syncManager.ReleaseAll(woc.wf) { - log.WithFields(log.Fields{"key": woc.wf.Name}).Info("Released all acquired locks") + woc.log.WithFields(log.Fields{"key": woc.wf.Name}).Info("Released all acquired locks") } } @@ -849,7 +849,7 @@ func (woc *wfOperationCtx) podReconciliation(ctx context.Context) error { seenPods := make(map[string]*apiv1.Pod) seenPodLock := &sync.Mutex{} wfNodesLock := &sync.RWMutex{} - + podRunningCondition := wfv1.Condition{Type: wfv1.ConditionTypePodRunning, Status: metav1.ConditionFalse} performAssessment := func(pod *apiv1.Pod) { if pod == nil { return @@ -874,6 +874,9 @@ func (woc *wfOperationCtx) podReconciliation(ctx context.Context) error { node.Phase = wfv1.NodeError } } + if node.Phase == wfv1.NodeRunning { + podRunningCondition.Status = metav1.ConditionTrue + } woc.updated = true } node := woc.wf.Status.Nodes[pod.ObjectMeta.Name] @@ -916,6 +919,8 @@ func (woc *wfOperationCtx) podReconciliation(ctx context.Context) error { wg.Wait() + woc.wf.Status.Conditions.UpsertCondition(podRunningCondition) + // Now check for deleted pods. Iterate our nodes. If any one of our nodes does not show up in // the seen list it implies that the pod was deleted without the controller seeing the event. // It is now impossible to infer pod status. We can do at this point is to mark the node with Error, or @@ -929,7 +934,7 @@ func (woc *wfOperationCtx) podReconciliation(ctx context.Context) error { // grace-period to allow informer sync recentlyStarted := recentlyStarted(node) - woc.log.WithFields(log.Fields{"nodeName": node.Name, "nodePhase": node.Phase, "recentlyStarted": recentlyStarted}).Info("Workflow pod is missing") + woc.log.WithFields(log.Fields{"podName": node.Name, "nodePhase": node.Phase, "recentlyStarted": recentlyStarted}).Info("Workflow pod is missing") metrics.PodMissingMetric.WithLabelValues(strconv.FormatBool(recentlyStarted), string(node.Phase)).Inc() // If the node is pending and the pod does not exist, it could be the case that we want to try to submit it @@ -1088,7 +1093,7 @@ func (woc *wfOperationCtx) assessNodeStatus(pod *apiv1.Pod, node *wfv1.NodeStatu } else { newPhase, message = inferFailedReason(pod) woc.log.WithField("displayName", node.DisplayName).WithField("templateName", node.TemplateName). - WithField("pod", pod.Name).Infof("Pod failed") + WithField("pod", pod.Name).Infof("Pod failed: %s", message) } newDaemonStatus = pointer.BoolPtr(false) case apiv1.PodRunning: @@ -1102,13 +1107,13 @@ func (woc *wfOperationCtx) assessNodeStatus(pod *apiv1.Pod, node *wfv1.NodeStatu newPhase = wfv1.NodeRunning tmplStr, ok := pod.Annotations[common.AnnotationKeyTemplate] if !ok { - log.WithField("pod", pod.ObjectMeta.Name).Warn("missing template annotation") + woc.log.WithField("pod", pod.ObjectMeta.Name).Warn("missing template annotation") return nil } var tmpl wfv1.Template err := json.Unmarshal([]byte(tmplStr), &tmpl) if err != nil { - log.WithError(err).WithField("pod", pod.ObjectMeta.Name).Warn("template annotation unreadable") + woc.log.WithError(err).WithField("pod", pod.ObjectMeta.Name).Warn("template annotation unreadable") return nil } if tmpl.Daemon != nil && *tmpl.Daemon { @@ -1121,7 +1126,7 @@ func (woc *wfOperationCtx) assessNodeStatus(pod *apiv1.Pod, node *wfv1.NodeStatu // proceed to mark node status as running (and daemoned) newPhase = wfv1.NodeRunning newDaemonStatus = pointer.BoolPtr(true) - log.Infof("Processing ready daemon pod: %v", pod.ObjectMeta.SelfLink) + woc.log.Infof("Processing ready daemon pod: %v", pod.ObjectMeta.SelfLink) } } default: @@ -1138,12 +1143,12 @@ func (woc *wfOperationCtx) assessNodeStatus(pod *apiv1.Pod, node *wfv1.NodeStatu newDaemonStatus = nil } if (newDaemonStatus != nil && node.Daemoned == nil) || (newDaemonStatus == nil && node.Daemoned != nil) { - log.Infof("Setting node %v daemoned: %v -> %v", node.ID, node.Daemoned, newDaemonStatus) + woc.log.Infof("Setting node %v daemoned: %v -> %v", node.ID, node.Daemoned, newDaemonStatus) node.Daemoned = newDaemonStatus updated = true if pod.Status.PodIP != "" && pod.Status.PodIP != node.PodIP { // only update Pod IP for daemoned nodes to reduce number of updates - log.Infof("Updating daemon node %s IP %s -> %s", node.ID, node.PodIP, pod.Status.PodIP) + woc.log.Infof("Updating daemon node %s IP %s -> %s", node.ID, node.PodIP, pod.Status.PodIP) node.PodIP = pod.Status.PodIP } } @@ -1151,7 +1156,7 @@ func (woc *wfOperationCtx) assessNodeStatus(pod *apiv1.Pod, node *wfv1.NodeStatu outputStr, ok := pod.Annotations[common.AnnotationKeyOutputs] if ok && node.Outputs == nil { updated = true - log.Infof("Setting node %v outputs", node.ID) + woc.log.Infof("Setting node %v outputs", node.ID) var outputs wfv1.Outputs err := json.Unmarshal([]byte(outputStr), &outputs) if err != nil { @@ -1163,7 +1168,7 @@ func (woc *wfOperationCtx) assessNodeStatus(pod *apiv1.Pod, node *wfv1.NodeStatu } } if node.Phase != newPhase { - log.Infof("Updating node %s status %s -> %s", node.ID, node.Phase, newPhase) + woc.log.Infof("Updating node %s status %s -> %s", node.ID, node.Phase, newPhase) // if we are transitioning from Pending to a different state, clear out pending message if node.Phase == wfv1.NodePending { node.Message = "" @@ -1172,7 +1177,7 @@ func (woc *wfOperationCtx) assessNodeStatus(pod *apiv1.Pod, node *wfv1.NodeStatu node.Phase = newPhase } if message != "" && node.Message != message { - log.Infof("Updating node %s message: %s", node.ID, message) + woc.log.Infof("Updating node %s message: %s", node.ID, message) updated = true node.Message = message } diff --git a/workflow/controller/operator_test.go b/workflow/controller/operator_test.go index c24535e13b23..721d49201ac5 100644 --- a/workflow/controller/operator_test.go +++ b/workflow/controller/operator_test.go @@ -4175,8 +4175,6 @@ func TestValidReferenceMode(t *testing.T) { } var workflowStatusMetric = ` -apiVersion: argoproj.io/v1alpha1 -kind: Workflow metadata: name: retry-to-completion-rngcr spec: @@ -4281,8 +4279,54 @@ func TestWorkflowStatusMetric(t *testing.T) { wf := unmarshalWF(workflowStatusMetric) woc := newWoc(*wf) woc.operate(ctx) - // Must only be one (completed: true) - assert.Len(t, woc.wf.Status.Conditions, 1) + // Must only be two (completed: true), (podRunning: true) + assert.Len(t, woc.wf.Status.Conditions, 2) +} + +func TestWorkflowConditions(t *testing.T) { + ctx := context.Background() + wf := unmarshalWF(` +metadata: + name: my-wf + namespace: my-ns +spec: + entrypoint: main + templates: + - container: + image: whalesay + name: main +`) + cancel, controller := newController(wf) + defer cancel() + + woc := newWorkflowOperationCtx(wf, controller) + woc.operate(ctx) + + assert.Equal(t, wfv1.NodeRunning, woc.wf.Status.Phase) + assert.Nil(t, woc.wf.Status.Conditions, "zero conditions on first reconciliation") + makePodsPhase(ctx, woc, apiv1.PodPending) + woc = newWorkflowOperationCtx(woc.wf, controller) + woc.operate(ctx) + + assert.Equal(t, wfv1.NodeRunning, woc.wf.Status.Phase) + assert.Equal(t, wfv1.Conditions{{Type: wfv1.ConditionTypePodRunning, Status: metav1.ConditionFalse}}, woc.wf.Status.Conditions) + + makePodsPhase(ctx, woc, apiv1.PodRunning) + woc = newWorkflowOperationCtx(woc.wf, controller) + woc.operate(ctx) + + assert.Equal(t, wfv1.NodeRunning, woc.wf.Status.Phase) + assert.Equal(t, wfv1.Conditions{{Type: wfv1.ConditionTypePodRunning, Status: metav1.ConditionTrue}}, woc.wf.Status.Conditions) + + makePodsPhase(ctx, woc, apiv1.PodSucceeded) + woc = newWorkflowOperationCtx(woc.wf, controller) + woc.operate(ctx) + + assert.Equal(t, wfv1.NodeSucceeded, woc.wf.Status.Phase) + assert.Equal(t, wfv1.Conditions{ + {Type: wfv1.ConditionTypePodRunning, Status: metav1.ConditionFalse}, + {Type: wfv1.ConditionTypeCompleted, Status: metav1.ConditionTrue}, + }, woc.wf.Status.Conditions) } var workflowCached = ` diff --git a/workflow/metrics/k8s_request_total_metric.go b/workflow/metrics/k8s_request_total_metric.go new file mode 100644 index 000000000000..0875f6400b09 --- /dev/null +++ b/workflow/metrics/k8s_request_total_metric.go @@ -0,0 +1,78 @@ +package metrics + +import ( + "net/http" + "strconv" + "strings" + + "github.com/prometheus/client_golang/prometheus" + "k8s.io/client-go/rest" +) + +var ( + K8sRequestTotalMetric = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: argoNamespace, + Subsystem: workflowsSubsystem, + Name: "k8s_request_total", + Help: "Number of kubernetes requests executed. https://argoproj.github.io/argo/metrics/#argo_workflows_k8s_request_total", + }, + []string{"kind", "verb", "status_code"}, + ) +) + +type metricsRoundTripper struct { + roundTripper http.RoundTripper +} + +func (m metricsRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) { + x, err := m.roundTripper.RoundTrip(r) + if x != nil { + verb, kind := parseRequest(r) + K8sRequestTotalMetric.WithLabelValues(kind, verb, strconv.Itoa(x.StatusCode)).Inc() + } + return x, err +} + +func parseRequest(r *http.Request) (verb string, kind string) { + i := strings.Index(r.URL.Path, "/v") + 1 + path := strings.Split(r.URL.Path[i:], "/") + n := len(path) + + verb = map[string]string{ + http.MethodGet: "List", + http.MethodPost: "Create", + http.MethodDelete: "Delete", + http.MethodPatch: "Patch", + http.MethodPut: "Update", + }[r.Method] + + if r.URL.Query().Get("watch") != "" { + verb = "Watch" + } else if verb == "List" && n%2 == 1 { + verb = "Get" + } else if verb == "Delete" && n%2 == 0 { + verb = "DeleteCollection" + } + + kind = "Unknown" + switch verb { + case "List", "Watch", "Create", "DeleteCollection": + kind = path[n-1] + case "Get", "Delete", "Patch", "Update": + kind = path[n-2] + } + + return verb, kind +} + +func AddMetricsTransportWrapper(config *rest.Config) *rest.Config { + wrap := config.WrapTransport + config.WrapTransport = func(rt http.RoundTripper) http.RoundTripper { + if wrap != nil { + rt = wrap(rt) + } + return &metricsRoundTripper{roundTripper: rt} + } + return config +} diff --git a/workflow/metrics/k8s_request_total_metric_test.go b/workflow/metrics/k8s_request_total_metric_test.go new file mode 100644 index 000000000000..19aa6d778182 --- /dev/null +++ b/workflow/metrics/k8s_request_total_metric_test.go @@ -0,0 +1,34 @@ +package metrics + +import ( + "net/http" + "net/url" + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_parseRequest(t *testing.T) { + for _, tt := range []struct { + name string + method string + url string + wantVerb string + wantKind string + }{ + {"create", "POST", "https://0.0.0.0:65009/apis/coordination.k8s.io/v1/namespaces/argo/leases", "Create", "leases"}, + {"list", "GET", "https://0.0.0.0:65009/apis/coordination.k8s.io/v1/namespaces/argo/leases", "List", "leases"}, + {"watch", "GET", "https://0.0.0.0:65009/apis/coordination.k8s.io/v1/namespaces/argo/leases?watch=true", "Watch", "leases"}, + {"get", "GET", "https://0.0.0.0:65009/apis/coordination.k8s.io/v1/namespaces/argo/leases/my-lease", "Get", "leases"}, + {"update", "PUT", "https://0.0.0.0:65009/apis/coordination.k8s.io/v1/namespaces/argo/leases/my-lease", "Update", "leases"}, + {"delete", "DELETE", "https://0.0.0.0:65009/apis/coordination.k8s.io/v1/namespaces/argo/leases/my-lease", "Delete", "leases"}, + {"deletecollection", "DELETE", "https://0.0.0.0:65009/apis/coordination.k8s.io/v1/namespaces/argo/leases", "DeleteCollection", "leases"}, + } { + t.Run(tt.name, func(t *testing.T) { + x, _ := url.Parse(tt.url) + verb, kind := parseRequest(&http.Request{Method: tt.method, URL: x}) + assert.Equal(t, tt.wantVerb, verb) + assert.Equal(t, tt.wantKind, kind) + }) + } +} diff --git a/workflow/metrics/pod_missing_metric.go b/workflow/metrics/pod_missing_metric.go index 0ba2bd20cda4..8629046d0b1e 100644 --- a/workflow/metrics/pod_missing_metric.go +++ b/workflow/metrics/pod_missing_metric.go @@ -7,7 +7,7 @@ var ( prometheus.GaugeOpts{ Namespace: argoNamespace, Name: "pod_missing", - Help: "Incidents of pod missing", + Help: "Incidents of pod missing. https://argoproj.github.io/argo/metrics/#argo_pod_missing", }, []string{"recently_started", "node_phase"}, ) diff --git a/workflow/metrics/server.go b/workflow/metrics/server.go index d6223fb9c06a..5d81b23d5340 100644 --- a/workflow/metrics/server.go +++ b/workflow/metrics/server.go @@ -70,7 +70,9 @@ func (m *Metrics) Describe(ch chan<- *prometheus.Desc) { ch <- metric.Desc() } m.logMetric.Describe(ch) + K8sRequestTotalMetric.Describe(ch) PodMissingMetric.Describe(ch) + WorkflowConditionMetric.Describe(ch) } func (m *Metrics) Collect(ch chan<- prometheus.Metric) { @@ -78,7 +80,9 @@ func (m *Metrics) Collect(ch chan<- prometheus.Metric) { ch <- metric } m.logMetric.Collect(ch) + K8sRequestTotalMetric.Collect(ch) PodMissingMetric.Collect(ch) + WorkflowConditionMetric.Collect(ch) } func (m *Metrics) garbageCollector(ctx context.Context) { diff --git a/workflow/metrics/workflow_condition_metric.go b/workflow/metrics/workflow_condition_metric.go new file mode 100644 index 000000000000..cecde7a7bdb7 --- /dev/null +++ b/workflow/metrics/workflow_condition_metric.go @@ -0,0 +1,17 @@ +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +var ( + WorkflowConditionMetric = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: argoNamespace, + Subsystem: workflowsSubsystem, + Name: "workflow_condition", + Help: "Workflow condition. https://argoproj.github.io/argo/metrics/#argo_workflows_workflow_condition", + }, + []string{"type", "status"}, + ) +)