Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error running 1000s of tasks: "etcdserver: request is too large" #1186 #1264

Merged
merged 16 commits into from
Mar 18, 2019
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ cmd/**/debug
hack/**/debug
debug.test
*.iml
examples/k8s-jobs1.yaml
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this under .gitignore?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry these files by mistake got it in PR I will revert it

24 changes: 22 additions & 2 deletions cmd/argo/commands/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ import (
"strings"
"text/tabwriter"

"github.com/argoproj/argo/errors"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/util/file"
"github.com/argoproj/pkg/humanize"
"github.com/ghodss/yaml"
"github.com/spf13/cobra"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
)

const onExitSuffix = "onExit"
Expand All @@ -36,6 +37,10 @@ func NewGetCommand() *cobra.Command {
if err != nil {
log.Fatal(err)
}
err = CheckAndDecompress(wf)
if err != nil {
log.Fatal(err)
}
printWorkflow(wf, output)
},
}
Expand All @@ -45,6 +50,21 @@ func NewGetCommand() *cobra.Command {
return command
}

func CheckAndDecompress(wf *wfv1.Workflow) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The status.nodes field is used if user runs argo list -o=wide: https://github.com/argoproj/argo/blob/master/cmd/argo/commands/list.go#L137

Also nodes field is accessed in argo logs: https://github.com/argoproj/argo/blob/master/cmd/argo/commands/logs.go#L139

I would suggest to add method GetNodes() to WorkflowStatus structure which does same as CheckAndDecompress and returns nodes and use it everywhere in CLI instead of Status.Nodes

if wf.Status.CompressedNodes != "" {
nodeContent, err := file.DecodeDecompressString(wf.Status.CompressedNodes)
if err != nil {
return errors.InternalWrapError(err)
}
err = json.Unmarshal([]byte(nodeContent), &wf.Status.Nodes)
if err != nil {
log.Fatal(err)
}
wf.Status.CompressedNodes = ""
}
return nil
}

func printWorkflow(wf *wfv1.Workflow, outFmt string) {
switch outFmt {
case "name":
Expand Down
4 changes: 4 additions & 0 deletions cmd/argo/commands/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ func countPendingRunningCompleted(wf *wfv1.Workflow) (int, int, int) {
pending := 0
running := 0
completed := 0
err := CheckAndDecompress(wf)
if err != nil {
log.Fatal(err)
}
for _, node := range wf.Status.Nodes {
tmpl := wf.GetTemplate(node.TemplateName)
if tmpl == nil || !tmpl.IsPodType() {
Expand Down
12 changes: 11 additions & 1 deletion cmd/argo/commands/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
)

type logEntry struct {
Expand Down Expand Up @@ -136,6 +136,11 @@ func (p *logPrinter) PrintPodLogs(podName string) error {
// Prints logs for workflow pod steps and return most recent log timestamp per pod name
func (p *logPrinter) printRecentWorkflowLogs(wf *v1alpha1.Workflow) map[string]*time.Time {
var podNodes []v1alpha1.NodeStatus
err := CheckAndDecompress(wf)
if err != nil {
log.Warn(err)
return nil
}
for _, node := range wf.Status.Nodes {
if node.Type == v1alpha1.NodeTypePod && node.Phase != v1alpha1.NodeError {
podNodes = append(podNodes, node)
Expand Down Expand Up @@ -193,6 +198,11 @@ func (p *logPrinter) printLiveWorkflowLogs(workflowName string, wfClient workflo
defer cancel()

processPods := func(wf *v1alpha1.Workflow) {
err := CheckAndDecompress(wf)
if err != nil {
log.Warn(err)
return
}
for id := range wf.Status.Nodes {
node := wf.Status.Nodes[id]
if node.Type == v1alpha1.NodeTypePod && node.Phase != v1alpha1.NodeError && streamedPods[node.ID] == false {
Expand Down
2 changes: 2 additions & 0 deletions cmd/argo/commands/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ func watchWorkflow(name string) {
select {
case next := <-watchIf.ResultChan():
wf, _ = next.Object.(*wfv1.Workflow)
err := CheckAndDecompress(wf)
errors.CheckError(err)
case <-ticker.C:
}
if wf == nil {
Expand Down
26 changes: 26 additions & 0 deletions examples/compressed_workflow.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Stress test to test upper bounds of concurrent pods
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: pod-limits-
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make the generate name the name of the file. It makes it difficult to debug during e2e testing which workflow failed. Also, since this is a test and not an example, it probably belongs under test/e2e/functional

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry these files by mistake got it in PR I will revert it

spec:
entrypoint: pod-limits
parallelism: 5
arguments:
parameters:
- name: limit
value: 16

templates:
- name: pod-limits
steps:
- - name: run-pod
template: run-pod
withSequence:
count: "{{workflow.parameters.limit}}"

- name: run-pod
script:
image: alpine:latest
command: [sh]
source: head /dev/urandom | tr -dc A-Za-z0-9 | head -c 2048 ; echo ''
Binary file added logs/workflowwithexecutor
Binary file not shown.
3 changes: 3 additions & 0 deletions pkg/apis/workflow/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,9 @@ type WorkflowStatus struct {
// A human readable message indicating details about why the workflow is in this condition.
Message string `json:"message,omitempty"`

// Compressed and base64 decoded Nodes map
CompressedNodes string `json:"compressedNodes,omitempty"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add comment

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add spaces after all // comments


// Nodes is a mapping between a node ID and the node's status.
Nodes map[string]NodeStatus `json:"nodes,omitempty"`

Expand Down
97 changes: 97 additions & 0 deletions util/file/fileutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package file

import (
"archive/tar"
"bytes"
"compress/gzip"
"encoding/base64"
"io"
"io/ioutil"
"os"
"strings"

log "github.com/sirupsen/logrus"
)

// IsFileOrDirExistInGZip return true if file or directory exists in GZip file
func IsFileOrDirExistInGZip(sourcePath string, gzipFilePath string) bool {

fi, err := os.Open(gzipFilePath)

if os.IsNotExist(err) {
return false
}
defer close(fi)

fz, err := gzip.NewReader(fi)
if err != nil {
return false
}
tr := tar.NewReader(fz)
for {
hdr, err := tr.Next()
if err == io.EOF {
break
}
if err != nil {

return false
}
if hdr.FileInfo().IsDir() && strings.Contains(strings.Trim(hdr.Name, "/"), strings.Trim(sourcePath, "/")) {
return true
}
if strings.Contains(sourcePath, hdr.Name) && hdr.Size > 0 {
return true
}
}
return false
}

//Close the file
func close(f io.Closer) {
err := f.Close()
if err != nil {
log.Warnf("Failed to close the file/writer/reader. %v", err)
}
}

// CompressEncodeString will return the compressed string with base64 encoded
func CompressEncodeString(content string) string {
return base64.StdEncoding.EncodeToString(CompressContent([]byte(content)))
}

// DecodeDecompressString will return decode and decompress the
func DecodeDecompressString(content string) (string, error) {

buf, err := base64.StdEncoding.DecodeString(content)
if err != nil {
return "", err
}
dBuf, err := DecompressContent(buf)
if err != nil {
return "", err
}
return string(dBuf), nil
}

// CompressContent will compress the byte array using zip writer
func CompressContent(content []byte) []byte {
var buf bytes.Buffer
zipWriter := gzip.NewWriter(&buf)

_, err := zipWriter.Write(content)
if err != nil {
log.Warnf("Error in compressing: %v", err)
}
close(zipWriter)
return buf.Bytes()
}

// DecompressContent will return the uncompressed content
func DecompressContent(content []byte) ([]byte, error) {

buf := bytes.NewReader(content)
gZipReader, _ := gzip.NewReader(buf)
defer close(gZipReader)
return ioutil.ReadAll(gZipReader)
}
21 changes: 21 additions & 0 deletions util/file/fileutil_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package file

import (
"testing"

"github.com/stretchr/testify/assert"
)

// TestResubmitWorkflowWithOnExit ensures we do not carry over the onExit node even if successful
func TestCompressContentString(t *testing.T) {
content := "{\"pod-limits-rrdm8-591645159\":{\"id\":\"pod-limits-rrdm8-591645159\",\"name\":\"pod-limits-rrdm8[0]." +
"run-pod(0:0)\",\"displayName\":\"run-pod(0:0)\",\"type\":\"Pod\",\"templateName\":\"run-pod\",\"phase\":" +
"\"Succeeded\",\"boundaryID\":\"pod-limits-rrdm8\",\"startedAt\":\"2019-03-07T19:14:50Z\",\"finishedAt\":" +
"\"2019-03-07T19:14:55Z\"}}"

compString := CompressEncodeString(content)

resultString, _ := DecodeDecompressString(compString)

assert.Equal(t, content, resultString)
}
10 changes: 10 additions & 0 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,16 @@ func (wfc *WorkflowController) processNextItem() bool {
}

woc := newWorkflowOperationCtx(wf, wfc)
//Decompress the node if it is compressed

err = woc.checkAndDecompress()
if err != nil {
log.Warnf("Failed to decompress '%s' to workflow object: %v", key, err)
woc.markWorkflowFailed(fmt.Sprintf("invalid spec: %s", err.Error()))
woc.persistUpdates()
wfc.throttler.Remove(key)
return true
}
woc.operate()
if woc.wf.Status.Completed() {
wfc.throttler.Remove(key)
Expand Down
Loading