From 732dce8b4af187a163b63e5a6fc70e289063d6ce Mon Sep 17 00:00:00 2001 From: Ken Simon Date: Wed, 9 Aug 2017 17:56:15 -0700 Subject: [PATCH] Implicitly extract tar.gz files from plugins that upload them This makes it so plugins upload to a path like: ``` /api/v1/results/global/e2e.tar.gz ``` And it will show up in: ``` /results/plugins/e2e/results/ ``` (For single-node plugins, this would map to:) ``` /results/plugins/systemd_logs/node1/ ``` This needs to be documented along with the snapshot documentation that is already under review. Since the upload path is now expected to have a file extension in it, In case for some reason the sonobuoy master launches with an old version of the sonobuoy worker (or for plugins not using `sonobuoy worker` that upload their own contents), if the path has no file extension, it is guessed to be `.json` or `.tar.gz`, same as before. Signed-off-by: Ken Simon Signed-off-by: Jesse Hamilton jesse.hamilton@heptio.com Signed-off-by: Jesse Hamilton jesse.hamilton@heptio.com --- pkg/plugin/aggregation/aggregator.go | 48 ++++++-- pkg/plugin/aggregation/aggregator_test.go | 135 ++++++++++++++++++++-- pkg/plugin/aggregation/server.go | 22 +++- pkg/plugin/aggregation/server_test.go | 8 +- pkg/plugin/driver/daemonset/daemonset.go | 4 +- pkg/plugin/driver/job/job.go | 4 +- pkg/plugin/driver/utils/utils.go | 5 +- pkg/plugin/interface.go | 11 +- pkg/worker/worker.go | 7 ++ pkg/worker/worker_test.go | 44 +++++-- 10 files changed, 239 insertions(+), 49 deletions(-) diff --git a/pkg/plugin/aggregation/aggregator.go b/pkg/plugin/aggregation/aggregator.go index 0f5a62674..a67fcc6cc 100644 --- a/pkg/plugin/aggregation/aggregator.go +++ b/pkg/plugin/aggregation/aggregator.go @@ -30,6 +30,7 @@ import ( "github.com/golang/glog" "github.com/heptio/sonobuoy/pkg/plugin" + "github.com/viniciuschiele/tarx" ) // Aggregator is responsible for taking results from an HTTP server (configured @@ -196,26 +197,55 @@ func (a *Aggregator) handleResult(result *plugin.Result) error { // Create the output directory for the result. Will be of the // form .../plugins/:results_type/:node.json (for DaemonSet plugins) or // .../plugins/:results_type.json (for Job plugins) - resultsFile := path.Join(a.OutputDir, result.Path()+result.Extension()) + resultsFile := path.Join(a.OutputDir, result.Path()+result.Extension) resultsDir := path.Dir(resultsFile) glog.Infof("Creating directory %v", resultsDir) if err := os.MkdirAll(resultsDir, 0755); err != nil { glog.Errorf("Could not make directory %v: %v", resultsDir, err) - return err } - // Open the results file for writing - f, err := os.Create(resultsFile) + // Write the results file out and close it + err := func() error { + f, err := os.Create(resultsFile) + if err != nil { + glog.Errorf("Could not open output file %v for writing: %v", resultsFile, err) + return err + } + defer f.Close() + + // Copy the request body into the file + _, err = io.Copy(f, result.Body) + if err != nil { + glog.Errorf("Error writing plugin result: %v", err) + return err + } + + return nil + }() if err != nil { - glog.Errorf("Could not open output file %v for writing: %v", resultsFile, err) return err } - defer f.Close() - // Copy the request body into the file - io.Copy(f, result.Body) - glog.Infof("wrote results to %v\n", resultsFile) + // If it's a tarball, extract it + if result.Extension == ".tar.gz" { + resultsDir := path.Join(a.OutputDir, result.Path()) + + err = tarx.Extract(resultsFile, resultsDir, &tarx.ExtractOptions{}) + if err != nil { + glog.Errorf("Could not extract tar file %v: %v", resultsFile, err) + return err + } + + err = os.Remove(resultsFile) + if err != nil { + return err + } + + glog.Infof("extracted results tarball into %v", resultsDir) + } else { + glog.Infof("wrote results to %v", resultsFile) + } return nil } diff --git a/pkg/plugin/aggregation/aggregator_test.go b/pkg/plugin/aggregation/aggregator_test.go index 62afc5450..d8cfe2485 100644 --- a/pkg/plugin/aggregation/aggregator_test.go +++ b/pkg/plugin/aggregation/aggregator_test.go @@ -17,13 +17,17 @@ limitations under the License. package aggregation import ( + "bytes" "io/ioutil" "os" + "os/exec" "path" "strconv" "testing" "github.com/heptio/sonobuoy/pkg/plugin" + pluginutils "github.com/heptio/sonobuoy/pkg/plugin/driver/utils" + "github.com/viniciuschiele/tarx" ) func TestAggregation(t *testing.T) { @@ -32,7 +36,7 @@ func TestAggregation(t *testing.T) { } // Happy path withAggregator(t, expected, func(agg *Aggregator) { - resp := doRequest(t, "PUT", "/api/v1/results/by-node/node1/systemd_logs", "foo") + resp := doRequest(t, "PUT", "/api/v1/results/by-node/node1/systemd_logs.json", []byte("foo")) if resp.StatusCode != 200 { body, _ := ioutil.ReadAll(resp.Body) t.Errorf("Got (%v) response from server: %v", resp.StatusCode, string(body)) @@ -44,7 +48,59 @@ func TestAggregation(t *testing.T) { t.Errorf("results for node1 incorrect (got %v): %v", string(bytes), err) } } else { - t.Errorf("AggregationServer didn't record a result for node1") + t.Errorf("AggregationServer didn't record a result for node1. Got: %+v", agg.Results) + } + }) +} + +func TestAggregation_noExtension(t *testing.T) { + expected := []plugin.ExpectedResult{ + plugin.ExpectedResult{NodeName: "node1", ResultType: "systemd_logs"}, + } + + withAggregator(t, expected, func(agg *Aggregator) { + resp := doRequest(t, "PUT", "/api/v1/results/by-node/node1/systemd_logs", []byte("foo")) + if resp.StatusCode != 200 { + body, _ := ioutil.ReadAll(resp.Body) + t.Errorf("Got (%v) response from server: %v", resp.StatusCode, string(body)) + } + + if result, ok := agg.Results["systemd_logs/node1"]; ok { + bytes, err := ioutil.ReadFile(path.Join(agg.OutputDir, result.Path())) + if string(bytes) != "foo" { + t.Errorf("results for node1 incorrect (got %v): %v", string(bytes), err) + } + } else { + t.Errorf("AggregationServer didn't record a result for node1. Got: %+v", agg.Results) + } + }) +} + +func TestAggregation_tarfile(t *testing.T) { + expected := []plugin.ExpectedResult{ + plugin.ExpectedResult{ResultType: "e2e"}, + } + + fileBytes := []byte("foo") + tarBytes := makeTarWithContents(t, "inside_tar.txt", fileBytes) + + withAggregator(t, expected, func(agg *Aggregator) { + resp := doRequest(t, "PUT", "/api/v1/results/global/e2e.tar.gz", tarBytes) + if resp.StatusCode != 200 { + body, _ := ioutil.ReadAll(resp.Body) + t.Errorf("Got (%v) response from server: %v", resp.StatusCode, string(body)) + } + + if result, ok := agg.Results["e2e"]; ok { + realBytes, err := ioutil.ReadFile(path.Join(agg.OutputDir, result.Path(), "inside_tar.txt")) + if bytes.Compare(realBytes, fileBytes) != 0 || err != nil { + t.Logf("results e2e tests incorrect (got %v, expected %v): %v", string(realBytes), string(fileBytes), err) + output, _ := exec.Command("ls", "-lR", agg.OutputDir).CombinedOutput() + t.Log(string(output)) + t.Fail() + } + } else { + t.Errorf("AggregationServer didn't record a result for e2e tests. Got: %+v", agg.Results) } }) } @@ -55,14 +111,13 @@ func TestAggregation_wrongnodes(t *testing.T) { } withAggregator(t, expected, func(agg *Aggregator) { - resp := doRequest(t, "PUT", "/api/v1/results/by-node/node10/systemd_logs", "foo") + resp := doRequest(t, "PUT", "/api/v1/results/by-node/node10/systemd_logs.json", []byte("foo")) if resp.StatusCode != 403 { t.Errorf("Expected a 403 forbidden for checking in an unexpected node, got %v", resp.StatusCode) } - if _, ok := agg.Results["node10"]; ok { + if _, ok := agg.Results["systemd_logs/node10"]; ok { t.Fatal("Aggregator accepted a result from an unexpected host") - t.Fail() } }) } @@ -74,20 +129,43 @@ func TestAggregation_duplicates(t *testing.T) { } withAggregator(t, expected, func(agg *Aggregator) { // Check in a node - resp := doRequest(t, "PUT", "/api/v1/results/by-node/node1/systemd_logs", "foo") + resp := doRequest(t, "PUT", "/api/v1/results/by-node/node1/systemd_logs.json", []byte("foo")) if resp.StatusCode != 200 { t.Errorf("Got non-200 response from server: %v", resp.StatusCode) } // Check in the same node again, should conflict - resp = doRequest(t, "PUT", "/api/v1/results/by-node/node1/systemd_logs", "foo") + resp = doRequest(t, "PUT", "/api/v1/results/by-node/node1/systemd_logs.json", []byte("foo")) if resp.StatusCode != 409 { t.Errorf("Expected a 409 conflict for checking in a duplicate node, got %v", resp.StatusCode) } if _, ok := agg.Results["node10"]; ok { t.Fatal("Aggregator accepted a result from an unexpected host") - t.Fail() + } + }) +} + +func TestAggregation_errors(t *testing.T) { + expected := []plugin.ExpectedResult{ + plugin.ExpectedResult{ResultType: "e2e"}, + } + + withAggregator(t, expected, func(agg *Aggregator) { + resultsCh := make(chan *plugin.Result) + go agg.IngestResults(resultsCh) + + // Send an error + resultsCh <- pluginutils.MakeErrorResult("e2e", map[string]interface{}{"error": "foo"}, "") + agg.Wait(make(chan bool)) + + if result, ok := agg.Results["e2e"]; ok { + bytes, err := ioutil.ReadFile(path.Join(agg.OutputDir, result.Path()) + ".json") + if err != nil || string(bytes) != `{"error":"foo"}` { + t.Errorf("results for e2e plugin incorrect (got %v): %v", string(bytes), err) + } + } else { + t.Errorf("Aggregator didn't record error result from e2e plugin, got %v", agg.Results) } }) } @@ -117,3 +195,44 @@ func withAggregator(t *testing.T, expected []plugin.ExpectedResult, callback fun srv.WaitUntilReady() callback(agg) } + +// Create a gzipped tar file with the given filename (and contents) inside it, +// return the raw bytes for that tar file. +func makeTarWithContents(t *testing.T, filename string, fileContents []byte) (tarbytes []byte) { + dir, err := ioutil.TempDir("", "sonobuoy_server_test") + if err != nil { + t.Fatalf("Could not create temp directory: %v", err) + return + } + defer os.RemoveAll(dir) + + tardir := path.Join(dir, "results") + err = os.Mkdir(tardir, 0755) + if err != nil { + t.Fatal("Could not create results directory %v: %v", tardir, err) + return + } + + filepath := path.Join(tardir, filename) + tarfile := path.Join(dir, "results.tar.gz") + + err = ioutil.WriteFile(filepath, fileContents, 0644) + if err != nil { + t.Fatalf("Could not write to temp file %v: %v", filepath, err) + return + } + + err = tarx.Compress(tarfile, tardir, &tarx.CompressOptions{Compression: tarx.Gzip}) + if err != nil { + t.Fatalf("Could not create tar file %v: %v", tarfile, err) + return + } + + tarbytes, err = ioutil.ReadFile(tarfile) + if err != nil { + t.Fatalf("Could not read created tar file %v: %v", tarfile, err) + return + } + + return tarbytes +} diff --git a/pkg/plugin/aggregation/server.go b/pkg/plugin/aggregation/server.go index 1856cf7d3..08416426d 100644 --- a/pkg/plugin/aggregation/server.go +++ b/pkg/plugin/aggregation/server.go @@ -153,13 +153,15 @@ func (s *Server) nodeResultsHandler(w http.ResponseWriter, r *http.Request) { return } - // Parse the path into the node name and the type - node, resultType := parts[0], parts[1] + // Parse the path into the node name, result type, and extension + node, file := parts[0], parts[1] + resultType, extension := parseFileName(file) glog.Infof("got %v result from %v\n", resultType, node) result := &plugin.Result{ ResultType: resultType, + Extension: extension, NodeName: node, Body: r.Body, } @@ -183,7 +185,6 @@ func (s *Server) globalResultsHandler(w http.ResponseWriter, r *http.Request) { http.NotFound(w, r) return } - resultType := parts[0] // We accept PUT because the client is specifying the resource identifier via // the HTTP path. (As opposed to POST, where typically the clients would post @@ -198,11 +199,13 @@ func (s *Server) globalResultsHandler(w http.ResponseWriter, r *http.Request) { return } + resultType, extension := parseFileName(parts[0]) glog.Infof("got %v result\n", resultType) result := &plugin.Result{ NodeName: "", ResultType: resultType, + Extension: extension, Body: r.Body, } @@ -212,3 +215,16 @@ func (s *Server) globalResultsHandler(w http.ResponseWriter, r *http.Request) { s.ResultsCallback(result, w) r.Body.Close() } + +// given an uploaded filename, parse it into its base name and extension. If +// there are no "." characters, the extension will be blank and the name will +// be set to the filename as-is +func parseFileName(file string) (name string, extension string) { + filenameParts := strings.SplitN(file, ".", 2) + + if len(filenameParts) == 2 { + return filenameParts[0], "." + filenameParts[1] + } + + return file, "" +} diff --git a/pkg/plugin/aggregation/server_test.go b/pkg/plugin/aggregation/server_test.go index aab5ecd90..ef2762ceb 100644 --- a/pkg/plugin/aggregation/server_test.go +++ b/pkg/plugin/aggregation/server_test.go @@ -17,12 +17,12 @@ limitations under the License. package aggregation import ( + "bytes" "io/ioutil" "net/http" "net/url" "os" "strconv" - "strings" "testing" "github.com/heptio/sonobuoy/pkg/plugin" @@ -32,7 +32,7 @@ func TestStart(t *testing.T) { checkins := make(map[string]*plugin.Result, 0) expectedResult := "systemd_logs/results/testnode" - expectedJSON := `{"some": "json"}` + expectedJSON := []byte(`{"some": "json"}`) tmpdir, err := ioutil.TempDir("", "sonobuoy_server_test") if err != nil { @@ -100,7 +100,7 @@ func TestStart(t *testing.T) { var testPort = 8099 -func doRequest(t *testing.T, method, path, body string) *http.Response { +func doRequest(t *testing.T, method, path string, body []byte) *http.Response { // Make a new HTTP transport for every request, this avoids issues where HTTP // connection keep-alive leaves connections running to old server instances. // (We can take the performance hit since it's just tests.) @@ -115,7 +115,7 @@ func doRequest(t *testing.T, method, path, body string) *http.Response { req, err := http.NewRequest( method, resultsURL.String(), - strings.NewReader(body), + bytes.NewReader(body), ) if err != nil { t.Fatalf("error constructing request: %v", err) diff --git a/pkg/plugin/driver/daemonset/daemonset.go b/pkg/plugin/driver/daemonset/daemonset.go index dcc64624d..573e79685 100644 --- a/pkg/plugin/driver/daemonset/daemonset.go +++ b/pkg/plugin/driver/daemonset/daemonset.go @@ -213,7 +213,7 @@ func (p *Plugin) Monitor(kubeclient kubernetes.Interface, availableNodes []v1.No if isFailing, reason := utils.IsPodFailing(&pod); isFailing { podsReported[nodeName] = true - resultsCh <- utils.MakeErrorResult(p, map[string]interface{}{ + resultsCh <- utils.MakeErrorResult(p.GetResultType(), map[string]interface{}{ "error": reason, "pod": pod, }, nodeName) @@ -228,7 +228,7 @@ func (p *Plugin) Monitor(kubeclient kubernetes.Interface, availableNodes []v1.No for _, node := range availableNodes { if !podsFound[node.Name] && !podsReported[node.Name] { podsReported[node.Name] = true - resultsCh <- utils.MakeErrorResult(p, map[string]interface{}{ + resultsCh <- utils.MakeErrorResult(p.GetResultType(), map[string]interface{}{ "error": fmt.Sprintf( "No pod was scheduled on node %v within %v. Check tolerations for plugin %v", node.Name, diff --git a/pkg/plugin/driver/job/job.go b/pkg/plugin/driver/job/job.go index 4989caef9..87d2f91ff 100644 --- a/pkg/plugin/driver/job/job.go +++ b/pkg/plugin/driver/job/job.go @@ -121,13 +121,13 @@ func (p *Plugin) Monitor(kubeclient kubernetes.Interface, _ []v1.Node, resultsCh // Make sure there's a pod pod, err := p.findPod(kubeclient) if err != nil { - resultsCh <- utils.MakeErrorResult(p, map[string]interface{}{"error": err.Error()}, "") + resultsCh <- utils.MakeErrorResult(p.GetResultType(), map[string]interface{}{"error": err.Error()}, "") break } // Make sure the pod isn't failing if isFailing, reason := utils.IsPodFailing(pod); isFailing { - resultsCh <- utils.MakeErrorResult(p, map[string]interface{}{ + resultsCh <- utils.MakeErrorResult(p.GetResultType(), map[string]interface{}{ "error": reason, "pod": pod, }, "") diff --git a/pkg/plugin/driver/utils/utils.go b/pkg/plugin/driver/utils/utils.go index c12e1175b..a738f1b3d 100644 --- a/pkg/plugin/driver/utils/utils.go +++ b/pkg/plugin/driver/utils/utils.go @@ -62,7 +62,7 @@ func IsPodFailing(pod *v1.Pod) (bool, string) { // for this plugin as a JSON file, so it's what users will see for why the // plugin failed. If errdata["error"] is not set, it will be filled in with an // "Unknown error" string. -func MakeErrorResult(p plugin.Interface, errdata map[string]interface{}, nodeName string) *plugin.Result { +func MakeErrorResult(resultType string, errdata map[string]interface{}, nodeName string) *plugin.Result { errJSON, _ := json.Marshal(errdata) errstr := "Unknown error" @@ -73,8 +73,9 @@ func MakeErrorResult(p plugin.Interface, errdata map[string]interface{}, nodeNam return &plugin.Result{ Body: bytes.NewReader(errJSON), Error: errstr, - ResultType: p.GetResultType(), + ResultType: resultType, NodeName: nodeName, + Extension: ".json", } } diff --git a/pkg/plugin/interface.go b/pkg/plugin/interface.go index be6482fb1..e32f53c76 100644 --- a/pkg/plugin/interface.go +++ b/pkg/plugin/interface.go @@ -77,6 +77,7 @@ type ExpectedResult struct { type Result struct { NodeName string ResultType string + Extension string Body io.Reader Error string } @@ -144,13 +145,3 @@ func (r *Result) ExpectedResultID() string { return r.ResultType + "/" + r.NodeName } - -// Extension returns the results extension for different plugins -// TODO: We should load this for the plugin. -func (r *Result) Extension() string { - if r.ResultType == "e2e" { - return ".tar.gz" - } - - return ".json" -} diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 820203201..160a93bd0 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -20,6 +20,7 @@ import ( "io" "io/ioutil" "os" + "strings" "time" "github.com/golang/glog" @@ -52,6 +53,12 @@ func GatherResults(waitfile string, url string) error { s := string(inputFileName) glog.Infof("Detected done file, transmitting: (%v)", s) + // Append a file extension, if there is one + filenameParts := strings.SplitN(s, ".", 2) + if len(filenameParts) == 2 { + url += "." + filenameParts[1] + } + // transmit back the results file. return DoRequest(url, func() (io.Reader, error) { outfile, err := os.Open(s) diff --git a/pkg/worker/worker_test.go b/pkg/worker/worker_test.go index 27ba19c00..d629f5f99 100644 --- a/pkg/worker/worker_test.go +++ b/pkg/worker/worker_test.go @@ -20,6 +20,7 @@ import ( "io/ioutil" "net/http" "os" + "os/exec" "path" "strconv" "testing" @@ -42,7 +43,7 @@ func TestRun(t *testing.T) { withAggregator(t, expectedResults, func(aggr *aggregation.Aggregator) { for _, h := range hosts { - url := "http://:" + strconv.Itoa(aggregatorPort) + "/api/v1/results/by-node/" + h + "/systemd_logs" + url := "http://:" + strconv.Itoa(aggregatorPort) + "/api/v1/results/by-node/" + h + "/systemd_logs.json" withTempDir(t, func(tmpdir string) { ioutil.WriteFile(tmpdir+"/systemd_logs", []byte("{}"), 0755) @@ -52,10 +53,7 @@ func TestRun(t *testing.T) { t.Fatalf("Got error running agent: %v", err) } - logsPath := path.Join(aggr.OutputDir, "systemd_logs", "results", "node1.json") - if _, err := os.Stat(logsPath); err != nil && os.IsNotExist(err) { - t.Errorf("Systemd logs agent ran, but couldn't find expected results at %v", logsPath) - } + ensureExists(t, path.Join(aggr.OutputDir, "systemd_logs", "results", "node1.json")) }) } }) @@ -69,6 +67,28 @@ func TestRunGlobal(t *testing.T) { plugin.ExpectedResult{ResultType: "systemd_logs"}, } + withAggregator(t, expectedResults, func(aggr *aggregation.Aggregator) { + withTempDir(t, func(tmpdir string) { + ioutil.WriteFile(tmpdir+"/systemd_logs.json", []byte("{}"), 0755) + ioutil.WriteFile(tmpdir+"/done", []byte(tmpdir+"/systemd_logs.json"), 0755) + err := GatherResults(tmpdir+"/done", url) + if err != nil { + t.Fatalf("Got error running agent: %v", err) + } + + ensureExists(t, path.Join(aggr.OutputDir, "systemd_logs", "results.json")) + }) + }) +} + +func TestRunGlobal_noExtension(t *testing.T) { + url := "http://:" + strconv.Itoa(aggregatorPort) + "/api/v1/results/global/systemd_logs" + + // Create an expectedResults array + expectedResults := []plugin.ExpectedResult{ + plugin.ExpectedResult{ResultType: "systemd_logs"}, + } + withAggregator(t, expectedResults, func(aggr *aggregation.Aggregator) { withTempDir(t, func(tmpdir string) { ioutil.WriteFile(tmpdir+"/systemd_logs", []byte("{}"), 0755) @@ -78,16 +98,22 @@ func TestRunGlobal(t *testing.T) { t.Fatalf("Got error running agent: %v", err) } - logsPath := path.Join(aggr.OutputDir, "systemd_logs", "results.json") - if _, err := os.Stat(logsPath); err != nil && os.IsNotExist(err) { - t.Errorf("Systemd logs agent ran, but couldn't find expected results at %v", logsPath) - } + ensureExists(t, path.Join(aggr.OutputDir, "systemd_logs", "results")) }) }) } const aggregatorPort = 8090 +func ensureExists(t *testing.T, filepath string) { + if _, err := os.Stat(filepath); err != nil && os.IsNotExist(err) { + t.Logf("Plugin agent ran, but couldn't find expected results at %v:", filepath) + output, _ := exec.Command("ls", "-l", path.Dir(filepath)).CombinedOutput() + t.Log(string(output)) + t.Fail() + } +} + func withTempDir(t *testing.T, callback func(tmpdir string)) { // Create a temporary directory for results gathering tmpdir, err := ioutil.TempDir("", "sonobuoy_test")