Skip to content

Commit

Permalink
Merge pull request vmware-tanzu#47 from kensimon/extract-tar-within-tar
Browse files Browse the repository at this point in the history
Implicitly extract tar.gz files from plugins that upload them
Signed-off-by: Jesse Hamilton jesse.hamilton@heptio.com

Signed-off-by: Jesse Hamilton jesse.hamilton@heptio.com
  • Loading branch information
Ken Simon authored Aug 10, 2017
2 parents 258e776 + 732dce8 commit f81c791
Show file tree
Hide file tree
Showing 10 changed files with 239 additions and 49 deletions.
48 changes: 39 additions & 9 deletions pkg/plugin/aggregation/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"github.com/golang/glog"
"github.com/heptio/sonobuoy/pkg/plugin"
"github.com/viniciuschiele/tarx"
)

// Aggregator is responsible for taking results from an HTTP server (configured
Expand Down Expand Up @@ -196,26 +197,55 @@ func (a *Aggregator) handleResult(result *plugin.Result) error {
// Create the output directory for the result. Will be of the
// form .../plugins/:results_type/:node.json (for DaemonSet plugins) or
// .../plugins/:results_type.json (for Job plugins)
resultsFile := path.Join(a.OutputDir, result.Path()+result.Extension())
resultsFile := path.Join(a.OutputDir, result.Path()+result.Extension)
resultsDir := path.Dir(resultsFile)
glog.Infof("Creating directory %v", resultsDir)
if err := os.MkdirAll(resultsDir, 0755); err != nil {
glog.Errorf("Could not make directory %v: %v", resultsDir, err)

return err
}

// Open the results file for writing
f, err := os.Create(resultsFile)
// Write the results file out and close it
err := func() error {
f, err := os.Create(resultsFile)
if err != nil {
glog.Errorf("Could not open output file %v for writing: %v", resultsFile, err)
return err
}
defer f.Close()

// Copy the request body into the file
_, err = io.Copy(f, result.Body)
if err != nil {
glog.Errorf("Error writing plugin result: %v", err)
return err
}

return nil
}()
if err != nil {
glog.Errorf("Could not open output file %v for writing: %v", resultsFile, err)
return err
}
defer f.Close()

// Copy the request body into the file
io.Copy(f, result.Body)
glog.Infof("wrote results to %v\n", resultsFile)
// If it's a tarball, extract it
if result.Extension == ".tar.gz" {
resultsDir := path.Join(a.OutputDir, result.Path())

err = tarx.Extract(resultsFile, resultsDir, &tarx.ExtractOptions{})
if err != nil {
glog.Errorf("Could not extract tar file %v: %v", resultsFile, err)
return err
}

err = os.Remove(resultsFile)
if err != nil {
return err
}

glog.Infof("extracted results tarball into %v", resultsDir)
} else {
glog.Infof("wrote results to %v", resultsFile)
}

return nil
}
135 changes: 127 additions & 8 deletions pkg/plugin/aggregation/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@ limitations under the License.
package aggregation

import (
"bytes"
"io/ioutil"
"os"
"os/exec"
"path"
"strconv"
"testing"

"github.com/heptio/sonobuoy/pkg/plugin"
pluginutils "github.com/heptio/sonobuoy/pkg/plugin/driver/utils"
"github.com/viniciuschiele/tarx"
)

func TestAggregation(t *testing.T) {
Expand All @@ -32,7 +36,7 @@ func TestAggregation(t *testing.T) {
}
// Happy path
withAggregator(t, expected, func(agg *Aggregator) {
resp := doRequest(t, "PUT", "/api/v1/results/by-node/node1/systemd_logs", "foo")
resp := doRequest(t, "PUT", "/api/v1/results/by-node/node1/systemd_logs.json", []byte("foo"))
if resp.StatusCode != 200 {
body, _ := ioutil.ReadAll(resp.Body)
t.Errorf("Got (%v) response from server: %v", resp.StatusCode, string(body))
Expand All @@ -44,7 +48,59 @@ func TestAggregation(t *testing.T) {
t.Errorf("results for node1 incorrect (got %v): %v", string(bytes), err)
}
} else {
t.Errorf("AggregationServer didn't record a result for node1")
t.Errorf("AggregationServer didn't record a result for node1. Got: %+v", agg.Results)
}
})
}

func TestAggregation_noExtension(t *testing.T) {
expected := []plugin.ExpectedResult{
plugin.ExpectedResult{NodeName: "node1", ResultType: "systemd_logs"},
}

withAggregator(t, expected, func(agg *Aggregator) {
resp := doRequest(t, "PUT", "/api/v1/results/by-node/node1/systemd_logs", []byte("foo"))
if resp.StatusCode != 200 {
body, _ := ioutil.ReadAll(resp.Body)
t.Errorf("Got (%v) response from server: %v", resp.StatusCode, string(body))
}

if result, ok := agg.Results["systemd_logs/node1"]; ok {
bytes, err := ioutil.ReadFile(path.Join(agg.OutputDir, result.Path()))
if string(bytes) != "foo" {
t.Errorf("results for node1 incorrect (got %v): %v", string(bytes), err)
}
} else {
t.Errorf("AggregationServer didn't record a result for node1. Got: %+v", agg.Results)
}
})
}

func TestAggregation_tarfile(t *testing.T) {
expected := []plugin.ExpectedResult{
plugin.ExpectedResult{ResultType: "e2e"},
}

fileBytes := []byte("foo")
tarBytes := makeTarWithContents(t, "inside_tar.txt", fileBytes)

withAggregator(t, expected, func(agg *Aggregator) {
resp := doRequest(t, "PUT", "/api/v1/results/global/e2e.tar.gz", tarBytes)
if resp.StatusCode != 200 {
body, _ := ioutil.ReadAll(resp.Body)
t.Errorf("Got (%v) response from server: %v", resp.StatusCode, string(body))
}

if result, ok := agg.Results["e2e"]; ok {
realBytes, err := ioutil.ReadFile(path.Join(agg.OutputDir, result.Path(), "inside_tar.txt"))
if bytes.Compare(realBytes, fileBytes) != 0 || err != nil {
t.Logf("results e2e tests incorrect (got %v, expected %v): %v", string(realBytes), string(fileBytes), err)
output, _ := exec.Command("ls", "-lR", agg.OutputDir).CombinedOutput()
t.Log(string(output))
t.Fail()
}
} else {
t.Errorf("AggregationServer didn't record a result for e2e tests. Got: %+v", agg.Results)
}
})
}
Expand All @@ -55,14 +111,13 @@ func TestAggregation_wrongnodes(t *testing.T) {
}

withAggregator(t, expected, func(agg *Aggregator) {
resp := doRequest(t, "PUT", "/api/v1/results/by-node/node10/systemd_logs", "foo")
resp := doRequest(t, "PUT", "/api/v1/results/by-node/node10/systemd_logs.json", []byte("foo"))
if resp.StatusCode != 403 {
t.Errorf("Expected a 403 forbidden for checking in an unexpected node, got %v", resp.StatusCode)
}

if _, ok := agg.Results["node10"]; ok {
if _, ok := agg.Results["systemd_logs/node10"]; ok {
t.Fatal("Aggregator accepted a result from an unexpected host")
t.Fail()
}
})
}
Expand All @@ -74,20 +129,43 @@ func TestAggregation_duplicates(t *testing.T) {
}
withAggregator(t, expected, func(agg *Aggregator) {
// Check in a node
resp := doRequest(t, "PUT", "/api/v1/results/by-node/node1/systemd_logs", "foo")
resp := doRequest(t, "PUT", "/api/v1/results/by-node/node1/systemd_logs.json", []byte("foo"))
if resp.StatusCode != 200 {
t.Errorf("Got non-200 response from server: %v", resp.StatusCode)
}

// Check in the same node again, should conflict
resp = doRequest(t, "PUT", "/api/v1/results/by-node/node1/systemd_logs", "foo")
resp = doRequest(t, "PUT", "/api/v1/results/by-node/node1/systemd_logs.json", []byte("foo"))
if resp.StatusCode != 409 {
t.Errorf("Expected a 409 conflict for checking in a duplicate node, got %v", resp.StatusCode)
}

if _, ok := agg.Results["node10"]; ok {
t.Fatal("Aggregator accepted a result from an unexpected host")
t.Fail()
}
})
}

func TestAggregation_errors(t *testing.T) {
expected := []plugin.ExpectedResult{
plugin.ExpectedResult{ResultType: "e2e"},
}

withAggregator(t, expected, func(agg *Aggregator) {
resultsCh := make(chan *plugin.Result)
go agg.IngestResults(resultsCh)

// Send an error
resultsCh <- pluginutils.MakeErrorResult("e2e", map[string]interface{}{"error": "foo"}, "")
agg.Wait(make(chan bool))

if result, ok := agg.Results["e2e"]; ok {
bytes, err := ioutil.ReadFile(path.Join(agg.OutputDir, result.Path()) + ".json")
if err != nil || string(bytes) != `{"error":"foo"}` {
t.Errorf("results for e2e plugin incorrect (got %v): %v", string(bytes), err)
}
} else {
t.Errorf("Aggregator didn't record error result from e2e plugin, got %v", agg.Results)
}
})
}
Expand Down Expand Up @@ -117,3 +195,44 @@ func withAggregator(t *testing.T, expected []plugin.ExpectedResult, callback fun
srv.WaitUntilReady()
callback(agg)
}

// Create a gzipped tar file with the given filename (and contents) inside it,
// return the raw bytes for that tar file.
func makeTarWithContents(t *testing.T, filename string, fileContents []byte) (tarbytes []byte) {
dir, err := ioutil.TempDir("", "sonobuoy_server_test")
if err != nil {
t.Fatalf("Could not create temp directory: %v", err)
return
}
defer os.RemoveAll(dir)

tardir := path.Join(dir, "results")
err = os.Mkdir(tardir, 0755)
if err != nil {
t.Fatal("Could not create results directory %v: %v", tardir, err)
return
}

filepath := path.Join(tardir, filename)
tarfile := path.Join(dir, "results.tar.gz")

err = ioutil.WriteFile(filepath, fileContents, 0644)
if err != nil {
t.Fatalf("Could not write to temp file %v: %v", filepath, err)
return
}

err = tarx.Compress(tarfile, tardir, &tarx.CompressOptions{Compression: tarx.Gzip})
if err != nil {
t.Fatalf("Could not create tar file %v: %v", tarfile, err)
return
}

tarbytes, err = ioutil.ReadFile(tarfile)
if err != nil {
t.Fatalf("Could not read created tar file %v: %v", tarfile, err)
return
}

return tarbytes
}
22 changes: 19 additions & 3 deletions pkg/plugin/aggregation/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,15 @@ func (s *Server) nodeResultsHandler(w http.ResponseWriter, r *http.Request) {
return
}

// Parse the path into the node name and the type
node, resultType := parts[0], parts[1]
// Parse the path into the node name, result type, and extension
node, file := parts[0], parts[1]
resultType, extension := parseFileName(file)

glog.Infof("got %v result from %v\n", resultType, node)

result := &plugin.Result{
ResultType: resultType,
Extension: extension,
NodeName: node,
Body: r.Body,
}
Expand All @@ -183,7 +185,6 @@ func (s *Server) globalResultsHandler(w http.ResponseWriter, r *http.Request) {
http.NotFound(w, r)
return
}
resultType := parts[0]

// We accept PUT because the client is specifying the resource identifier via
// the HTTP path. (As opposed to POST, where typically the clients would post
Expand All @@ -198,11 +199,13 @@ func (s *Server) globalResultsHandler(w http.ResponseWriter, r *http.Request) {
return
}

resultType, extension := parseFileName(parts[0])
glog.Infof("got %v result\n", resultType)

result := &plugin.Result{
NodeName: "",
ResultType: resultType,
Extension: extension,
Body: r.Body,
}

Expand All @@ -212,3 +215,16 @@ func (s *Server) globalResultsHandler(w http.ResponseWriter, r *http.Request) {
s.ResultsCallback(result, w)
r.Body.Close()
}

// given an uploaded filename, parse it into its base name and extension. If
// there are no "." characters, the extension will be blank and the name will
// be set to the filename as-is
func parseFileName(file string) (name string, extension string) {
filenameParts := strings.SplitN(file, ".", 2)

if len(filenameParts) == 2 {
return filenameParts[0], "." + filenameParts[1]
}

return file, ""
}
8 changes: 4 additions & 4 deletions pkg/plugin/aggregation/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ limitations under the License.
package aggregation

import (
"bytes"
"io/ioutil"
"net/http"
"net/url"
"os"
"strconv"
"strings"
"testing"

"github.com/heptio/sonobuoy/pkg/plugin"
Expand All @@ -32,7 +32,7 @@ func TestStart(t *testing.T) {
checkins := make(map[string]*plugin.Result, 0)

expectedResult := "systemd_logs/results/testnode"
expectedJSON := `{"some": "json"}`
expectedJSON := []byte(`{"some": "json"}`)

tmpdir, err := ioutil.TempDir("", "sonobuoy_server_test")
if err != nil {
Expand Down Expand Up @@ -100,7 +100,7 @@ func TestStart(t *testing.T) {

var testPort = 8099

func doRequest(t *testing.T, method, path, body string) *http.Response {
func doRequest(t *testing.T, method, path string, body []byte) *http.Response {
// Make a new HTTP transport for every request, this avoids issues where HTTP
// connection keep-alive leaves connections running to old server instances.
// (We can take the performance hit since it's just tests.)
Expand All @@ -115,7 +115,7 @@ func doRequest(t *testing.T, method, path, body string) *http.Response {
req, err := http.NewRequest(
method,
resultsURL.String(),
strings.NewReader(body),
bytes.NewReader(body),
)
if err != nil {
t.Fatalf("error constructing request: %v", err)
Expand Down
4 changes: 2 additions & 2 deletions pkg/plugin/driver/daemonset/daemonset.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (p *Plugin) Monitor(kubeclient kubernetes.Interface, availableNodes []v1.No
if isFailing, reason := utils.IsPodFailing(&pod); isFailing {
podsReported[nodeName] = true

resultsCh <- utils.MakeErrorResult(p, map[string]interface{}{
resultsCh <- utils.MakeErrorResult(p.GetResultType(), map[string]interface{}{
"error": reason,
"pod": pod,
}, nodeName)
Expand All @@ -228,7 +228,7 @@ func (p *Plugin) Monitor(kubeclient kubernetes.Interface, availableNodes []v1.No
for _, node := range availableNodes {
if !podsFound[node.Name] && !podsReported[node.Name] {
podsReported[node.Name] = true
resultsCh <- utils.MakeErrorResult(p, map[string]interface{}{
resultsCh <- utils.MakeErrorResult(p.GetResultType(), map[string]interface{}{
"error": fmt.Sprintf(
"No pod was scheduled on node %v within %v. Check tolerations for plugin %v",
node.Name,
Expand Down
Loading

0 comments on commit f81c791

Please sign in to comment.