Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TEP-0127: Larger results using sidecar logs - parsing sidecar logs to extract results #5840

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions internal/sidecarlogresults/sidecarlogresults.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,25 @@ limitations under the License.
package sidecarlogresults
chitrangpatel marked this conversation as resolved.
Show resolved Hide resolved

import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"path/filepath"

"github.com/tektoncd/pipeline/pkg/apis/config"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
"golang.org/x/sync/errgroup"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
)

// ErrSizeExceeded indicates that the result exceeded its maximum allowed size
var ErrSizeExceeded = errors.New("results size exceeds configured limit")

// SidecarLogResult holds fields for storing extracted results
type SidecarLogResult struct {
Name string
Expand Down Expand Up @@ -125,3 +135,59 @@ func LookForResults(w io.Writer, runDir string, resultsDir string, resultNames [
}
return nil
}

// GetResultsFromSidecarLogs extracts results from the logs of the results sidecar
func GetResultsFromSidecarLogs(ctx context.Context, clientset kubernetes.Interface, namespace string, name string, container string, podPhase corev1.PodPhase) ([]v1beta1.PipelineResourceResult, error) {
sidecarLogResults := []v1beta1.PipelineResourceResult{}
if podPhase == corev1.PodPending {
return sidecarLogResults, nil
}
podLogOpts := corev1.PodLogOptions{Container: container}
req := clientset.CoreV1().Pods(namespace).GetLogs(name, &podLogOpts)
sidecarLogs, err := req.Stream(ctx)
if err != nil {
return sidecarLogResults, err
}
defer sidecarLogs.Close()
maxResultLimit := config.FromContextOrDefaults(ctx).FeatureFlags.MaxResultSize
return extractResultsFromLogs(sidecarLogs, sidecarLogResults, maxResultLimit)
}

func extractResultsFromLogs(logs io.Reader, sidecarLogResults []v1beta1.PipelineResourceResult, maxResultLimit int) ([]v1beta1.PipelineResourceResult, error) {
scanner := bufio.NewScanner(logs)
buf := make([]byte, maxResultLimit)
scanner.Buffer(buf, maxResultLimit)
for scanner.Scan() {
result, err := parseResults(scanner.Bytes(), maxResultLimit)
if err != nil {
return nil, err
}
sidecarLogResults = append(sidecarLogResults, result)
}

if scanner.Err() != nil {
if errors.Is(scanner.Err(), bufio.ErrTooLong) {
return sidecarLogResults, ErrSizeExceeded
}
return nil, scanner.Err()
}
return sidecarLogResults, nil
}

func parseResults(resultBytes []byte, maxResultLimit int) (v1beta1.PipelineResourceResult, error) {
result := v1beta1.PipelineResourceResult{}
if len(resultBytes) > maxResultLimit {
return result, ErrSizeExceeded
}

var res SidecarLogResult
if err := json.Unmarshal(resultBytes, &res); err != nil {
return result, fmt.Errorf("Invalid result %w", err)
}
result = v1beta1.PipelineResourceResult{
Key: res.Name,
Value: res.Value,
ResultType: v1beta1.TaskRunResultType,
}
return result, nil
}
186 changes: 186 additions & 0 deletions internal/sidecarlogresults/sidecarlogresults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,22 @@ package sidecarlogresults

import (
"bytes"
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"sort"
"strings"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
"github.com/tektoncd/pipeline/test/diff"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
fakekubeclientset "k8s.io/client-go/kubernetes/fake"
)

func TestLookForResults_FanOutAndWait(t *testing.T) {
Expand Down Expand Up @@ -121,6 +129,184 @@ func TestLookForResults(t *testing.T) {
}
}

func TestExtractResultsFromLogs(t *testing.T) {
inputResults := []SidecarLogResult{
{
Name: "result1",
Value: "foo",
}, {
Name: "result2",
Value: "bar",
},
}
podLogs := ""
for _, r := range inputResults {
res, _ := json.Marshal(&r)
podLogs = fmt.Sprintf("%s%s\n", podLogs, string(res))
}
logs := strings.NewReader(podLogs)

results, err := extractResultsFromLogs(logs, []v1beta1.PipelineResourceResult{}, 4096)
if err != nil {
t.Error(err)
}
want := []v1beta1.PipelineResourceResult{
{
Key: "result1",
Value: "foo",
ResultType: v1beta1.TaskRunResultType,
}, {
Key: "result2",
Value: "bar",
ResultType: v1beta1.TaskRunResultType,
},
}
if d := cmp.Diff(want, results); d != "" {
t.Fatal(diff.PrintWantGot(d))
}
}

func TestExtractResultsFromLogs_Failure(t *testing.T) {
inputResults := []SidecarLogResult{
{
Name: "result1",
Value: strings.Repeat("v", 4098),
},
}
podLogs := ""
for _, r := range inputResults {
res, _ := json.Marshal(&r)
podLogs = fmt.Sprintf("%s%s\n", podLogs, string(res))
}
logs := strings.NewReader(podLogs)

_, err := extractResultsFromLogs(logs, []v1beta1.PipelineResourceResult{}, 4096)
if err != ErrSizeExceeded {
t.Fatalf("Expected error %v but got %v", ErrSizeExceeded, err)
}
}

func TestParseResults(t *testing.T) {
results := []SidecarLogResult{
{
Name: "result1",
Value: "foo",
}, {
Name: "result2",
Value: `{"IMAGE_URL":"ar.com", "IMAGE_DIGEST":"sha234"}`,
}, {
Name: "result3",
Value: `["hello","world"]`,
},
}
podLogs := []string{}
for _, r := range results {
res, _ := json.Marshal(&r)
podLogs = append(podLogs, string(res))
}
want := []v1beta1.PipelineResourceResult{{
Key: "result1",
Value: "foo",
ResultType: v1beta1.TaskRunResultType,
}, {
Key: "result2",
Value: `{"IMAGE_URL":"ar.com", "IMAGE_DIGEST":"sha234"}`,
ResultType: v1beta1.TaskRunResultType,
}, {
Key: "result3",
Value: `["hello","world"]`,
ResultType: v1beta1.TaskRunResultType,
}}
stepResults := []v1beta1.PipelineResourceResult{}
for _, plog := range podLogs {
res, err := parseResults([]byte(plog), 4096)
if err != nil {
t.Error(err)
}
stepResults = append(stepResults, res)
}
if d := cmp.Diff(want, stepResults); d != "" {
t.Fatal(diff.PrintWantGot(d))
}
}

func TestParseResults_Failure(t *testing.T) {
result := SidecarLogResult{
Name: "result2",
Value: strings.Repeat("k", 4098),
}
res1, _ := json.Marshal("result1 v1")
res2, _ := json.Marshal(&result)
podLogs := []string{string(res1), string(res2)}
want := []string{
"Invalid result json: cannot unmarshal string into Go value of type sidecarlogresults.SidecarLogResult",
ErrSizeExceeded.Error(),
}
got := []string{}
for _, plog := range podLogs {
_, err := parseResults([]byte(plog), 4096)
got = append(got, err.Error())
}
if d := cmp.Diff(want, got); d != "" {
t.Fatal(diff.PrintWantGot(d))
}
}

func TestGetResultsFromSidecarLogs(t *testing.T) {
for _, c := range []struct {
desc string
podPhase v1.PodPhase
wantError bool
}{{
desc: "pod pending to start",
podPhase: corev1.PodPending,
wantError: false,
}, {
desc: "pod running extract logs",
podPhase: corev1.PodRunning,
wantError: true,
}} {
t.Run(c.desc, func(t *testing.T) {
ctx := context.Background()
clientset := fakekubeclientset.NewSimpleClientset()
pod := &v1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "pod",
Namespace: "foo",
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "container",
Image: "image",
},
},
},
Status: v1.PodStatus{
Phase: c.podPhase,
},
}
pod, err := clientset.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
if err != nil {
t.Errorf("Error occurred while creating pod %s: %s", pod.Name, err.Error())
}

// Fake logs are not formatted properly so there will be an error
_, err = GetResultsFromSidecarLogs(ctx, clientset, "foo", "pod", "container", pod.Status.Phase)
if err != nil && !c.wantError {
t.Fatalf("did not expect an error but got: %v", err)
}
if c.wantError && err == nil {
t.Fatal("expected to get an error but did not")
}
})
}
}

func createResult(t *testing.T, dir string, resultName string, resultValue string) {
t.Helper()
resultFile := filepath.Join(dir, resultName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package sidecarlogsvalidation
package pipeline

const (
// ReservedResultsSidecarName is the name of the results sidecar that outputs the results to stdout
// when the results-from feature-flag is set to "sidecar-logs".
ReservedResultsSidecarName = "tekton-log-results"

// ReservedResultsSidecarContainerName is the name of the results sidecar container that is injected
// by the reconciler.
ReservedResultsSidecarContainerName = "sidecar-tekton-log-results"
)
2 changes: 2 additions & 0 deletions pkg/apis/pipeline/v1beta1/taskrun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ const (
TaskRunReasonsResultsVerificationFailed TaskRunReason = "TaskRunResultsVerificationFailed"
// AwaitingTaskRunResults is the reason set when waiting upon `TaskRun` results and signatures to verify
AwaitingTaskRunResults TaskRunReason = "AwaitingTaskRunResults"
// TaskRunReasonResultLargerThanAllowedLimit is the reason set when one of the results exceeds its maximum allowed limit of 1 KB
TaskRunReasonResultLargerThanAllowedLimit TaskRunReason = "TaskRunResultLargerThanAllowedLimit"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @chitrangpatel !

I was wondering if there is any reason this is kept in v1beta1 only? If not I can open up a PR to keep v1 and v1beta1 synced for the reconciler changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think thats because the reconciler currently uses v1beta1. This reason is only called there. Its ok to keep them both I sync I suppose. That way, when we update the reconciler to point to v1, it works.

)

func (t TaskRunReason) String() string {
Expand Down
7 changes: 7 additions & 0 deletions pkg/pod/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"strconv"
"strings"

"github.com/tektoncd/pipeline/pkg/apis/config"
"github.com/tektoncd/pipeline/pkg/apis/pipeline"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
"gomodules.xyz/jsonpatch/v2"
Expand Down Expand Up @@ -252,6 +253,12 @@ func StopSidecars(ctx context.Context, nopImage string, kubeclient kubernetes.In
updated := false
if newPod.Status.Phase == corev1.PodRunning {
for _, s := range newPod.Status.ContainerStatuses {
// If the results-from is set to sidecar logs,
// a sidecar container with name `sidecar-log-results` is injected by the reconiler.
// Do not kill this sidecar. Let it exit gracefully.
dibyom marked this conversation as resolved.
Show resolved Hide resolved
if config.FromContextOrDefaults(ctx).FeatureFlags.ResultExtractionMethod == config.ResultExtractionMethodSidecarLogs && s.Name == pipeline.ReservedResultsSidecarContainerName {
continue
}
// Stop any running container that isn't a step.
// An injected sidecar container might not have the
// "sidecar-" prefix, so we can't just look for that
Expand Down
Loading