Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error running 1000s of tasks: "etcdserver: request is too large" #1186 #1264

Merged
merged 16 commits into from
Mar 18, 2019
Merged
23 changes: 23 additions & 0 deletions cmd/argo/commands/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import (
"strings"
"text/tabwriter"

"github.com/argoproj/argo/errors"
"github.com/argoproj/argo/workflow/util/file"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: unnecessary blank line

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think file watcher or intellij go code formatter is doing this. it is grouping the imports

"github.com/argoproj/pkg/humanize"
"github.com/ghodss/yaml"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -36,6 +39,10 @@ func NewGetCommand() *cobra.Command {
if err != nil {
log.Fatal(err)
}
err = CheckAndDecompress(wf)
if err != nil {
log.Fatal(err)
}
printWorkflow(wf, output)
},
}
Expand All @@ -45,6 +52,22 @@ func NewGetCommand() *cobra.Command {
return command
}

func CheckAndDecompress(wf *wfv1.Workflow) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The status.nodes field is used if user runs argo list -o=wide: https://github.com/argoproj/argo/blob/master/cmd/argo/commands/list.go#L137

Also nodes field is accessed in argo logs: https://github.com/argoproj/argo/blob/master/cmd/argo/commands/logs.go#L139

I would suggest to add method GetNodes() to WorkflowStatus structure which does same as CheckAndDecompress and returns nodes and use it everywhere in CLI instead of Status.Nodes

if wf.Status.CompressedNodes != "" {
nodeContent, err := file.DecodeDecompressString(wf.Status.CompressedNodes)
if err != nil {
return errors.InternalWrapError(err)
}
err = json.Unmarshal([]byte(nodeContent), &wf.Status.Nodes)
if err != nil {
log.Fatal(err)
return err
}
wf.Status.CompressedNodes = ""
}
return nil
}

func printWorkflow(wf *wfv1.Workflow, outFmt string) {
switch outFmt {
case "name":
Expand Down
2 changes: 2 additions & 0 deletions cmd/argo/commands/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ func watchWorkflow(name string) {
select {
case next := <-watchIf.ResultChan():
wf, _ = next.Object.(*wfv1.Workflow)
err := CheckAndDecompress(wf)
errors.CheckError(err)
case <-ticker.C:
}
if wf == nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/workflow/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,8 @@ type WorkflowStatus struct {
// A human readable message indicating details about why the workflow is in this condition.
Message string `json:"message,omitempty"`

CompressedNodes string `json:"compressedNodes,omitempty"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add comment

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add spaces after all // comments


// Nodes is a mapping between a node ID and the node's status.
Nodes map[string]NodeStatus `json:"nodes,omitempty"`

Expand Down
48 changes: 34 additions & 14 deletions test/e2e/stress/pod-limits.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Stress test to test upper bounds of concurrent pods
Stress test to test upper bounds of concurrent pods
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like # is missing

apiVersion: argoproj.io/v1alpha1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this example workflow intended to reproduce compression? Instead of running 1000 pod can you please add an example which has 10 script steps and each step produce very large output

kind: Workflow
metadata:
Expand All @@ -7,19 +7,39 @@ spec:
entrypoint: pod-limits
arguments:
parameters:
- name: limit
value: 1000
- name: limit
value: 1000
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we revert this and stick to k8s yaml formatting (without extra spaces for lists)? All our other examples are this way.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


templates:
- name: pod-limits
steps:
- - name: run-pod
template: run-pod
withSequence:
count: "{{workflow.parameters.limit}}"
- name: pod-limits
inputs:
parameters:
- name: limit
steps:
- - name: gen-number-list
template: gen-number-list
arguments:
parameters:
- name: count
value: "{{inputs.parameters.limit}}"
- - name: run-pod
template: run-pod
withParam: "{{steps.gen-number-list.outputs.result}}"

- name: run-pod
container:
image: "alpine:3.7"
command: [sh, -c]
args: ["echo sleeping 1s; sleep 1"]
- name: gen-number-list
inputs:
parameters:
- name: count
script:
image: python:alpine3.6
command: [python]
source: |
import json
import sys
json.dump([i for i in range(1, {{inputs.parameters.count}}+1)], sys.stdout)
withSequence:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The withSequence should part of workflow step which executes this template

count: "{{workflow.parameters.limit}}"

- name: run-pod
container:
image: "alpine:3.7"
10 changes: 10 additions & 0 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,16 @@ func (wfc *WorkflowController) processNextItem() bool {
}

woc := newWorkflowOperationCtx(wf, wfc)
//Decompress the node if it is compressed

err = woc.checkAndDecompress()
if err != nil {
log.Warnf("Failed to decompress '%s' to workflow object: %v", key, err)
woc.markWorkflowFailed(fmt.Sprintf("invalid spec: %s", err.Error()))
woc.persistUpdates()
wfc.throttler.Remove(key)
return true
}
woc.operate()
if woc.wf.Status.Completed() {
wfc.throttler.Remove(key)
Expand Down
103 changes: 101 additions & 2 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"strings"
"time"

"github.com/argoproj/argo/workflow/util/file"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import blocks should follow this convention:

  1. standard packages
  2. third party packages
  3. project packages


argokubeerr "github.com/argoproj/pkg/kube/errors"
"github.com/argoproj/pkg/strftime"
jsonpatch "github.com/evanphx/json-patch"
Expand Down Expand Up @@ -72,6 +74,9 @@ var (
// for before requeuing the workflow onto the workqueue.
const maxOperationTime time.Duration = 10 * time.Second

//maxWorkflowSize is the maximum size for workflow.yaml
const maxWorkflowSize int = 1024 * 1024

// newWorkflowOperationCtx creates and initializes a new wfOperationCtx object.
func newWorkflowOperationCtx(wf *wfv1.Workflow, wfc *WorkflowController) *wfOperationCtx {
// NEVER modify objects from the store. It's a read-only, local cache.
Expand Down Expand Up @@ -275,14 +280,24 @@ func (woc *wfOperationCtx) persistUpdates() {
return
}
wfClient := woc.controller.wfclientset.ArgoprojV1alpha1().Workflows(woc.wf.ObjectMeta.Namespace)
woc.log.Info("Final size", woc.getSize())
if woc.wf.Status.CompressedNodes != "" {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should remove this line like this

        err := woc.checkAndCompress()
	if err != nil {
		woc.log.Warnf("Error compressing workflow: %v", err)
	}
	if woc.wf.Status.CompressedNodes != "" {
		woc.clearNodeStatusMap()
	}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks updated


err := woc.checkAndCompress()
if err != nil {
woc.log.Warnf("Error compressing workflow: %v", err)
}
woc.clearNodeStatusMap()
}

_, err := wfClient.Update(woc.wf)
if err != nil {
woc.log.Warnf("Error updating workflow: %v", err)
woc.log.Warnf("Error updating workflow: %v %s", err, apierr.ReasonForError(err))
if argokubeerr.IsRequestEntityTooLargeErr(err) {
woc.persistWorkflowSizeLimitErr(wfClient, err)
return
}
if !apierr.IsConflict(err) {
if err != nil && !apierr.IsConflict(err) {
jessesuen marked this conversation as resolved.
Show resolved Hide resolved
return
}
woc.log.Info("Re-appying updates on latest version and retrying update")
Expand Down Expand Up @@ -450,11 +465,32 @@ func (woc *wfOperationCtx) podReconciliation() error {
}

for _, pod := range podList.Items {
origNodeStatus := *woc.wf.Status.DeepCopy()
performAssessment(&pod)
err = woc.applyExecutionControl(&pod)
if err != nil {
woc.log.Warnf("Failed to apply execution control to pod %s", pod.Name)
}
err = woc.checkAndCompress()
if err != nil {
woc.wf.Status = origNodeStatus
nodeNameForPod := pod.Annotations[common.AnnotationKeyNodeName]
nodeID := woc.wf.NodeID(nodeNameForPod)
node := woc.wf.Status.Nodes[nodeID]
node.Message = fmt.Sprintf("%v", err)
woc.log.Warn(node.Message)
node.Outputs = nil
node.FinishedAt = metav1.Time{Time: time.Now().UTC()}
node.Phase = wfv1.NodeError
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a helper for all of this: woc.markNodePhase(). Can you use that instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

node.Completed()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Call to node.Completed() is not useful

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

woc.wf.Status.Nodes[nodeID] = node
woc.updated = true
err = woc.checkAndCompress()
if err != nil {
woc.markWorkflowError(err, true)
}
}

}

// Now check for deleted pods. Iterate our nodes. If any one of our nodes does not show up in
Expand Down Expand Up @@ -1576,3 +1612,66 @@ func expandSequence(seq *wfv1.Sequence) ([]wfv1.Item, error) {
}
return items, nil
}

// getSize return the entire workflow json string size
func (woc *wfOperationCtx) getSize() int {
nodeContent, err := json.Marshal(woc.wf)
if err != nil {
return -1
}

compressNodeSize := len(woc.wf.Status.CompressedNodes)

if compressNodeSize > 0 {
nodeStatus, err := json.Marshal(woc.wf.Status.Nodes)
if err != nil {
return -1
}
return len(nodeContent) - len(nodeStatus)
}
return len(nodeContent)
}

//checkAndCompress will check the workflow size and compress node status if total workflow size is more than maxWorkflowSize.
//The compressed content will be assign to compressedNodes element and clear the nodestatus map.
func (woc *wfOperationCtx) checkAndCompress() error {

if woc.wf.Status.CompressedNodes != "" || (woc.wf.Status.CompressedNodes == "" && woc.getSize() >= maxWorkflowSize) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we simplify this logic a little bit? As I understand getSize is needed because it assuments that workflow might have Nodes and CompressedNodes fields at the same time and it assuments that Nodes field will be removed before saving, right?

Instead please change logic to ensure these two fields never set at the same time: checkAndDecompress should set Nodes field and immediately remove CompressedNodes; checkAndCompress should immediately remove Nodes after compressing

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function gets called in two places. one is the final workflow save in persistupdate(). Another place is in podReconciliation during the workflow execution to check the output of each node fit into the size. In this scenario Nodes and CompressedNodes both will co-exist.


nodeContent, err := json.Marshal(woc.wf.Status.Nodes)
if err != nil {
return errors.InternalWrapError(err)
}
buff := string(nodeContent)
woc.wf.Status.CompressedNodes = file.CompressEncodeString(buff)

}
if woc.wf.Status.CompressedNodes != "" && woc.getSize() >= maxWorkflowSize {
return errors.InternalError(fmt.Sprintf("Workflow is longer than maximum allowed size. Size=%d", woc.getSize()))
}
return nil
}

func (woc *wfOperationCtx) clearNodeStatusMap() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is more efficient way to do it:

status := woc.wf.Status
status.Nodes = nil
woc.wf.Status = status

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait, why isn't this just woc.wf.Status.Nodes = nil?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't see this question answered in latest version. Am I missing something or can this be simplified further to woc.wf.Status.Nodes = nil?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

for k := range woc.wf.Status.Nodes {
delete(woc.wf.Status.Nodes, k)
}
}

//checkAndDecompress will decompress the compressednode and assign to workflow.status.nodes map.
func (woc *wfOperationCtx) checkAndDecompress() error {
if woc.wf.Status.CompressedNodes != "" {
nodeContent, err := file.DecodeDecompressString(woc.wf.Status.CompressedNodes)
if err != nil {
return errors.InternalWrapError(err)
}
var tempNodes map[string]wfv1.NodeStatus

err = json.Unmarshal([]byte(nodeContent), &tempNodes)
if err != nil {
woc.log.Warn(err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean return err here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

}
woc.wf.Status.Nodes = tempNodes
}
return nil
}
64 changes: 61 additions & 3 deletions workflow/util/file/fileutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package file

import (
"archive/tar"
"bytes"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't catch this in the other review, but can we move fileutil.go under argoproj/argo/util/file instead of argoproj/argo/workflow/util/file? Since the package has nothing to do with workflows.

"compress/gzip"
"encoding/base64"
"io"
"io/ioutil"
"os"
"strings"

Expand All @@ -18,7 +21,7 @@ func IsFileOrDirExistInGZip(sourcePath string, gzipFilePath string) bool {
if os.IsNotExist(err) {
return false
}
defer closeFile(fi)
defer close(fi)

fz, err := gzip.NewReader(fi)
if err != nil {
Expand All @@ -44,9 +47,64 @@ func IsFileOrDirExistInGZip(sourcePath string, gzipFilePath string) bool {
return false
}

func closeFile(f *os.File) {
//Close the file
func close(f io.Closer) {
err := f.Close()
if err != nil {
log.Warn("Failed to close the file. v%", err)
log.Warn("Failed to close the file/writer/reader. ", err)
}
}

//EncodeContent will encode using base64
func EncodeContent(content []byte) string {
encoder := base64.StdEncoding
return encoder.EncodeToString(content)
jessesuen marked this conversation as resolved.
Show resolved Hide resolved

}

//DecodeContent will decode using base64
func DecodeContent(content string) ([]byte, error) {
encoder := base64.StdEncoding
return encoder.DecodeString(content)
}

//CompressEncodeString will return the compressed string with base64 encoded
func CompressEncodeString(content string) string {
return EncodeContent(CompressContent([]byte(content)))
}

//DecodeDecompressString will return decode and decompress the
func DecodeDecompressString(content string) (string, error) {

buf, err := DecodeContent(content)
if err != nil {
return "", err
}
dBuf, err := DecompressContent(buf)
if err != nil {
return "", err
}
return string(dBuf), nil
}

//CompressContent will compress the byte array using zip writer
func CompressContent(content []byte) []byte {
var buf bytes.Buffer
zipWriter := gzip.NewWriter(&buf)

_, err := zipWriter.Write(content)
if err != nil {
log.Warn("Error in compressing. v%", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be log.Warnf also it should be %v not v%.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

}
close(zipWriter)
return buf.Bytes()
}

//D
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfinished comment

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

func DecompressContent(content []byte) ([]byte, error) {

buf := bytes.NewReader(content)
gZipReader, _ := gzip.NewReader(buf)
defer close(gZipReader)
return ioutil.ReadAll(gZipReader)
}
Loading