Skip to content

Commit

Permalink
injecitng sidecar and parsing logs to extract results
Browse files Browse the repository at this point in the history
  • Loading branch information
chitrangpatel committed Dec 6, 2022
1 parent d53dc1b commit 2828651
Show file tree
Hide file tree
Showing 13 changed files with 650 additions and 36 deletions.
96 changes: 96 additions & 0 deletions internal/sidecarlogresults/sidecarlogresults.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
Copyright 2019 The Tekton Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package sidecarlogresults

import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"io"

"github.com/tektoncd/pipeline/pkg/apis/config"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)

// ErrorReasonMaxResultSizeExceeded indicates that the result exceeded its maximum allowed size
// var ErrorReasonMaxResultSizeExceeded = fmt.Errorf("%s", "MaxResultSizeExceeded")
var ErrorReasonMaxResultSizeExceeded = "MaxResultSizeExceeded"

// SidecarLogResult holds fields for storing extracted results
type SidecarLogResult struct {
Name string
Value string
}

// GetResultsFromSidecarLogs extracts results from the logs of the results sidecar
func GetResultsFromSidecarLogs(ctx context.Context, clientset kubernetes.Interface, namespace string, name string, container string) ([]v1beta1.PipelineResourceResult, error) {
sidecarLogResults := []v1beta1.PipelineResourceResult{}
p, _ := clientset.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{})
if p.Status.Phase == corev1.PodPending {
return sidecarLogResults, nil
}
podLogOpts := corev1.PodLogOptions{Container: container}
req := clientset.CoreV1().Pods(namespace).GetLogs(name, &podLogOpts)
sidecarLogs, err := req.Stream(ctx)
if err != nil {
return sidecarLogResults, err
}
defer sidecarLogs.Close()
maxResultLimit := config.FromContextOrDefaults(ctx).FeatureFlags.MaxResultSize
return extractResultsFromLogs(sidecarLogs, sidecarLogResults, maxResultLimit)
}

func extractResultsFromLogs(logs io.Reader, sidecarLogResults []v1beta1.PipelineResourceResult, maxResultLimit int) ([]v1beta1.PipelineResourceResult, error) {
scanner := bufio.NewScanner(logs)
buf := make([]byte, maxResultLimit)
scanner.Buffer(buf, maxResultLimit)
for scanner.Scan() {
result, err := parseResults(scanner.Bytes(), maxResultLimit)
if err != nil {
return nil, err
}
sidecarLogResults = append(sidecarLogResults, result)
}

if err := scanner.Err(); err != nil {
if errors.Is(err, bufio.ErrTooLong) {
return sidecarLogResults, fmt.Errorf("%s", ErrorReasonMaxResultSizeExceeded)
}
return nil, err
}
return sidecarLogResults, nil
}

func parseResults(resultBytes []byte, maxResultLimit int) (v1beta1.PipelineResourceResult, error) {
result := v1beta1.PipelineResourceResult{}
if len(resultBytes) > maxResultLimit {
return result, fmt.Errorf("%s", ErrorReasonMaxResultSizeExceeded)
}

var res SidecarLogResult
if err := json.Unmarshal(resultBytes, &res); err != nil {
return result, fmt.Errorf("Invalid result %w", err)
}
result = v1beta1.PipelineResourceResult{
Key: res.Name,
Value: res.Value,
ResultType: v1beta1.TaskRunResultType,
}
return result, nil
}
195 changes: 195 additions & 0 deletions internal/sidecarlogresults/sidecarlogresults_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
package sidecarlogresults

import (
"context"
"encoding/json"
"fmt"
"strings"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
"github.com/tektoncd/pipeline/test/diff"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
fakekubeclientset "k8s.io/client-go/kubernetes/fake"
)

func TestExtractResultsFromLogs(t *testing.T) {
inputResults := []SidecarLogResult{
{
Name: "result1",
Value: "foo",
}, {
Name: "result2",
Value: "bar",
},
}
podLogs := ""
for _, r := range inputResults {
res, _ := json.Marshal(&r)
podLogs = fmt.Sprintf("%s%s\n", podLogs, string(res))
}
logs := strings.NewReader(podLogs)

results, err := extractResultsFromLogs(logs, []v1beta1.PipelineResourceResult{}, 4096)
if err != nil {
t.Error(err)
}
want := []v1beta1.PipelineResourceResult{
{
Key: "result1",
Value: "foo",
ResultType: v1beta1.TaskRunResultType,
}, {
Key: "result2",
Value: "bar",
ResultType: v1beta1.TaskRunResultType,
},
}
if d := cmp.Diff(want, results); d != "" {
t.Fatal(diff.PrintWantGot(d))
}
}

func TestExtractResultsFromLogs_Failure(t *testing.T) {
inputResults := []SidecarLogResult{
{
Name: "result1",
Value: strings.Repeat("v", 4098),
},
}
podLogs := ""
for _, r := range inputResults {
res, _ := json.Marshal(&r)
podLogs = fmt.Sprintf("%s%s\n", podLogs, string(res))
}
logs := strings.NewReader(podLogs)

_, err := extractResultsFromLogs(logs, []v1beta1.PipelineResourceResult{}, 4096)
if err.Error() != ErrorReasonMaxResultSizeExceeded {
t.Fatal(fmt.Sprintf("Expexted error %v but got %v", ErrorReasonMaxResultSizeExceeded, err))
}
}

func TestParseResults(t *testing.T) {
results := []SidecarLogResult{
{
Name: "result1",
Value: "foo",
}, {
Name: "result2",
Value: `{"IMAGE_URL":"ar.com", "IMAGE_DIGEST":"sha234"}`,
}, {
Name: "result3",
Value: `["hello","world"]`,
},
}
podLogs := []string{}
for _, r := range results {
res, _ := json.Marshal(&r)
podLogs = append(podLogs, string(res))
}
want := []v1beta1.PipelineResourceResult{{
Key: "result1",
Value: "foo",
ResultType: 1,
}, {
Key: "result2",
Value: `{"IMAGE_URL":"ar.com", "IMAGE_DIGEST":"sha234"}`,
ResultType: 1,
}, {
Key: "result3",
Value: `["hello","world"]`,
ResultType: 1,
}}
stepResults := []v1beta1.PipelineResourceResult{}
for _, plog := range podLogs {
res, err := parseResults([]byte(plog), 4096)
if err != nil {
t.Error(err)
}
stepResults = append(stepResults, res)
}
if d := cmp.Diff(want, stepResults); d != "" {
t.Fatal(diff.PrintWantGot(d))
}
}

func TestParseResults_Failure(t *testing.T) {
result := SidecarLogResult{
Name: "result2",
Value: strings.Repeat("k", 4098),
}
res1, _ := json.Marshal("result1 v1")
res2, _ := json.Marshal(&result)
podLogs := []string{string(res1), string(res2)}
want := []string{
"Invalid result json: cannot unmarshal string into Go value of type sidecarlogresults.SidecarLogResult",
ErrorReasonMaxResultSizeExceeded,
}
got := []string{}
for _, plog := range podLogs {
_, err := parseResults([]byte(plog), 4096)
got = append(got, err.Error())
}
if d := cmp.Diff(want, got); d != "" {
t.Fatal(diff.PrintWantGot(d))
}
}

func TestGetResultsFromSidecarLogs(t *testing.T) {
for _, c := range []struct {
desc string
podPhase v1.PodPhase
wantError bool
}{{
desc: "pod pending to start",
podPhase: corev1.PodPending,
wantError: false,
}, {
desc: "pod running extract logs",
podPhase: corev1.PodRunning,
wantError: true,
}} {
t.Run(c.desc, func(t *testing.T) {
ctx := context.Background()
clientset := fakekubeclientset.NewSimpleClientset()
pod := &v1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "pod",
Namespace: "foo",
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "container",
Image: "image",
},
},
},
Status: v1.PodStatus{
Phase: c.podPhase,
},
}
pod, err := clientset.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
if err != nil {
t.Errorf("Error occured while creating pod %s: %s", pod.Name, err.Error())
}

// Fake logs are not formatted properly so there will be an error
_, err = GetResultsFromSidecarLogs(ctx, clientset, "foo", "pod", "container")
if err != nil && !c.wantError {
t.Fatalf("did not expect an error but got: %v", err)
}
if c.wantError && err == nil {
t.Fatal("dxpected to get an error but did not")
}
})
}
}
18 changes: 18 additions & 0 deletions pkg/apis/pipeline/reservedsidecar.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
Copyright 2019 The Tekton Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package pipeline

// ReservedResultsSidecarName is the name of the results sidecar that outputs the results to stdout
// when the results-from feature-flag is set to "sidecar-logs".
const ReservedResultsSidecarName = "tekton-log-results"
2 changes: 2 additions & 0 deletions pkg/apis/pipeline/v1beta1/taskrun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ const (
TaskRunReasonsResultsVerificationFailed TaskRunReason = "TaskRunResultsVerificationFailed"
// AwaitingTaskRunResults is the reason set when waiting upon `TaskRun` results and signatures to verify
AwaitingTaskRunResults TaskRunReason = "AwaitingTaskRunResults"
// TaskRunReasonResultLargerThanAllowedLimit is the reason set when one of the results exceeds its maximum allowed limit of 1 KB
TaskRunReasonResultLargerThanAllowedLimit TaskRunReason = "TaskRunResultLargerThanAllowedLimit"
)

func (t TaskRunReason) String() string {
Expand Down
5 changes: 4 additions & 1 deletion pkg/entrypoint/entrypointer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"strings"
"time"

"github.com/tektoncd/pipeline/pkg/apis/config"
"github.com/tektoncd/pipeline/pkg/apis/pipeline"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
"github.com/tektoncd/pipeline/pkg/spire"
Expand Down Expand Up @@ -85,6 +86,8 @@ type Entrypointer struct {
SpireWorkloadAPI spire.EntrypointerAPIClient
// ResultsDirectory is the directory to find results, defaults to pipeline.DefaultResultPath
ResultsDirectory string
// ResultExtractionMethod is the method using which the controller extracts the results from the task pod.
ResultExtractionMethod string
}

// Waiter encapsulates waiting for files to exist.
Expand Down Expand Up @@ -231,7 +234,7 @@ func (e Entrypointer) readResultsFromDisk(ctx context.Context, resultDir string)
}

// push output to termination path
if len(output) != 0 {
if e.ResultExtractionMethod == config.ResultExtractionMethodTerminationMessage && len(output) != 0 {
if err := termination.WriteMessage(e.TerminationPath, output); err != nil {
return err
}
Expand Down
Loading

0 comments on commit 2828651

Please sign in to comment.