Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cli): add version header + warning on client-server mismatch. Fixes #9212 #13635

Merged
merged 9 commits into from
Sep 30, 2024
4 changes: 4 additions & 0 deletions cmd/argo/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/argoproj/argo-workflows/v3/cmd/argo/commands/executorplugin"
"github.com/argoproj/argo-workflows/v3/cmd/argo/commands/template"
cmdutil "github.com/argoproj/argo-workflows/v3/util/cmd"
grpcutil "github.com/argoproj/argo-workflows/v3/util/grpc"
)

const (
Expand Down Expand Up @@ -125,6 +126,9 @@ If your server is behind an ingress with a path (running "argo server --base-hre
var logLevel string
var glogLevel int
var verbose bool
command.PersistentPostRun = func(cmd *cobra.Command, args []string) {
cmdutil.PrintVersionMismatchWarning(argo.GetVersion(), grpcutil.LastSeenServerVersion)
}
command.PersistentPreRun = func(cmd *cobra.Command, args []string) {
if verbose {
logLevel = "debug"
Expand Down
7 changes: 6 additions & 1 deletion pkg/apiclient/argo-server-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
workflowpkg "github.com/argoproj/argo-workflows/v3/pkg/apiclient/workflow"
workflowarchivepkg "github.com/argoproj/argo-workflows/v3/pkg/apiclient/workflowarchive"
workflowtemplatepkg "github.com/argoproj/argo-workflows/v3/pkg/apiclient/workflowtemplate"
grpcutil "github.com/argoproj/argo-workflows/v3/util/grpc"
)

const (
Expand Down Expand Up @@ -65,7 +66,11 @@ func newClientConn(opts ArgoServerOpts) (*grpc.ClientConn, error) {
if opts.Secure {
creds = grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{InsecureSkipVerify: opts.InsecureSkipVerify}))
}
conn, err := grpc.Dial(opts.URL, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(MaxClientGRPCMessageSize)), creds)
conn, err := grpc.Dial(opts.URL,
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(MaxClientGRPCMessageSize)),
creds,
grpc.WithUnaryInterceptor(grpcutil.GetVersionHeaderClientUnaryInterceptor),
)
if err != nil {
return nil, err
}
Expand Down
2 changes: 2 additions & 0 deletions server/apiserver/argoserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ func (as *argoServer) newGRPCServer(instanceIDService instanceid.Service, workfl
grpcutil.ErrorTranslationUnaryServerInterceptor,
as.gatekeeper.UnaryServerInterceptor(),
grpcutil.RatelimitUnaryServerInterceptor(as.apiRateLimiter),
grpcutil.SetVersionHeaderUnaryServerInterceptor(argo.GetVersion()),
)),
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
grpc_prometheus.StreamServerInterceptor,
Expand All @@ -313,6 +314,7 @@ func (as *argoServer) newGRPCServer(instanceIDService instanceid.Service, workfl
grpcutil.ErrorTranslationStreamServerInterceptor,
as.gatekeeper.StreamServerInterceptor(),
grpcutil.RatelimitStreamServerInterceptor(as.apiRateLimiter),
grpcutil.SetVersionHeaderStreamServerInterceptor(argo.GetVersion()),
)),
}

Expand Down
19 changes: 12 additions & 7 deletions test/e2e/argo_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,11 @@ func (s *ArgoServerSuite) TestInfo() {

func (s *ArgoServerSuite) TestVersion() {
s.Run("Version", func() {
s.e().GET("/api/v1/version").
resp := s.e().GET("/api/v1/version").
Expect().
Status(200).
JSON().
Path("$.version").
NotNull()
Status(200)
resp.JSON().Path("$.version").NotNull()
resp.Header("Grpc-Metadata-Argo-Version").NotEmpty()
})
}

Expand Down Expand Up @@ -343,14 +342,20 @@ func (s *ArgoServerSuite) TestUnauthorized() {
defer func() { s.bearerToken = token }()
s.e().GET("/api/v1/workflows/argo").
Expect().
Status(401)
Status(401).
// Version header shouldn't be set on 401s for security, since that could be used by attackers to find vulnerable servers
Header("Grpc-Metadata-Argo-Version").
IsEmpty()
})
s.Run("Basic", func() {
s.username = "garbage"
defer func() { s.username = "" }()
s.e().GET("/api/v1/workflows/argo").
Expect().
Status(401)
Status(401).
// Version header shouldn't be set on 401s for security, since that could be used by attackers to find vulnerable servers
Header("Grpc-Metadata-Argo-Version").
IsEmpty()
})
}

Expand Down
7 changes: 7 additions & 0 deletions util/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ func PrintVersion(cliName string, version wfv1.Version, short bool) {
fmt.Printf(" Platform: %s\n", version.Platform)
}

// PrintVersionMismatchWarning detects if there's a mismatch between the client and server versions and prints a warning if so
func PrintVersionMismatchWarning(clientVersion wfv1.Version, serverVersion string) {
if serverVersion != "" && clientVersion.GitTag != "" && serverVersion != clientVersion.Version {
agilgur5 marked this conversation as resolved.
Show resolved Hide resolved
log.Warnf("CLI version (%s) does not match server version (%s). This can lead to unexpected behavior.", clientVersion.Version, serverVersion)
}
}

// MustIsDir returns whether or not the given filePath is a directory. Exits if path does not exist
func MustIsDir(filePath string) bool {
fileInfo, err := os.Stat(filePath)
Expand Down
53 changes: 53 additions & 0 deletions util/cmd/cmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ package cmd
import (
"reflect"
"testing"

log "github.com/sirupsen/logrus"
logtest "github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
)

func TestMakeParseLabels(t *testing.T) {
Expand Down Expand Up @@ -103,3 +109,50 @@ func TestIsURL(t *testing.T) {
})
}
}

func TestPrintVersionMismatchWarning(t *testing.T) {
tests := []struct {
name string
clientVersion *wfv1.Version
serverVersion string
expectedLog string
}{
{
name: "server version not set",
clientVersion: &wfv1.Version{
Version: "v3.1.0",
GitTag: "v3.1.0",
},
serverVersion: "",
},
{
name: "client version is untagged",
clientVersion: &wfv1.Version{
Version: "v3.1.0",
},
serverVersion: "v3.1.1",
},
{
name: "version mismatch",
clientVersion: &wfv1.Version{
Version: "v3.1.0",
GitTag: "v3.1.0",
},
serverVersion: "v3.1.1",
expectedLog: "CLI version (v3.1.0) does not match server version (v3.1.1). This can lead to unexpected behavior.",
},
}
hook := &logtest.Hook{}
log.AddHook(hook)
defer log.StandardLogger().ReplaceHooks(nil)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
PrintVersionMismatchWarning(*tt.clientVersion, tt.serverVersion)
if tt.expectedLog != "" {
assert.Equal(t, tt.expectedLog, hook.LastEntry().Message)
} else {
assert.Nil(t, hook.LastEntry())
}
})
}
}
48 changes: 48 additions & 0 deletions util/grpc/interceptor.go
agilgur5 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ import (
"runtime/debug"
"strings"

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"

log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"

Expand Down Expand Up @@ -40,7 +43,12 @@ func PanicLoggerStreamServerInterceptor(log *log.Entry) grpc.StreamServerInterce
}
}

const (
ArgoVersionHeader = "argo-version"
)

var (
LastSeenServerVersion string
ErrorTranslationUnaryServerInterceptor = func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
resp, err = handler(ctx, req)
return resp, TranslateError(err)
Expand All @@ -50,6 +58,46 @@ var (
}
)

// SetVersionHeaderUnaryServerInterceptor returns a new unary server interceptor that sets the argo-version header
func SetVersionHeaderUnaryServerInterceptor(version wfv1.Version) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
m, origErr := handler(ctx, req)
if origErr == nil {
// Don't set header if there was an error because attackers could use it to find vulnerable Argo servers
err := grpc.SetHeader(ctx, metadata.Pairs(ArgoVersionHeader, version.Version))
if err != nil {
log.Warnf("Failed to set header '%s': %s", ArgoVersionHeader, err)
}
}
return m, origErr
}
}

// SetVersionHeaderStreamServerInterceptor returns a new stream server interceptor that sets the argo-version header
func SetVersionHeaderStreamServerInterceptor(version wfv1.Version) grpc.StreamServerInterceptor {
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
origErr := handler(srv, ss)
if origErr == nil {
// Don't set header if there was an error because attackers could use it to find vulnerable Argo servers
err := ss.SetHeader(metadata.Pairs(ArgoVersionHeader, version.Version))
if err != nil {
log.Warnf("Failed to set header '%s': %s", ArgoVersionHeader, err)
}
}
return origErr
}
}

// GetVersionHeaderClientUnaryInterceptor returns a new unary client interceptor that extracts the argo-version from the response and sets the global variable LastSeenServerVersion
func GetVersionHeaderClientUnaryInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
var headers metadata.MD
err := invoker(ctx, method, req, reply, cc, append(opts, grpc.Header(&headers))...)
if err == nil && headers != nil && headers.Get(ArgoVersionHeader) != nil {
LastSeenServerVersion = headers.Get(ArgoVersionHeader)[0]
}
return err
}

// RatelimitUnaryServerInterceptor returns a new unary server interceptor that performs request rate limiting.
func RatelimitUnaryServerInterceptor(ratelimiter limiter.Store) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
Expand Down
136 changes: 136 additions & 0 deletions util/grpc/interceptor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package grpc

import (
"context"
"errors"
"testing"

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)

type mockServerTransportStream struct {
header metadata.MD
isError bool
}

func (mockServerTransportStream) Method() string { return "" }
func (msts *mockServerTransportStream) SetHeader(md metadata.MD) error {
if msts.isError {
return errors.New("simulate error setting header")
}
msts.header = md
return nil
}
func (mockServerTransportStream) SendHeader(md metadata.MD) error { return nil }
func (mockServerTransportStream) SetTrailer(md metadata.MD) error { return nil }

var _ grpc.ServerTransportStream = &mockServerTransportStream{}

func TestSetVersionHeaderUnaryServerInterceptor(t *testing.T) {
version := &wfv1.Version{Version: "v3.1.0"}
mockReturn := "successful return"

t.Run("success", func(t *testing.T) {
handler := func(ctx context.Context, req interface{}) (interface{}, error) { return mockReturn, nil }
msts := &mockServerTransportStream{}
ctx := grpc.NewContextWithServerTransportStream(context.Background(), msts)
interceptor := SetVersionHeaderUnaryServerInterceptor(*version)

m, err := interceptor(ctx, nil, &grpc.UnaryServerInfo{}, handler)

require.NoError(t, err)
assert.Equal(t, mockReturn, m)
assert.Equal(t, metadata.Pairs(ArgoVersionHeader, version.Version), msts.header)
})

t.Run("upstream error handling", func(t *testing.T) {
handler := func(ctx context.Context, req interface{}) (interface{}, error) { return nil, errors.New("error") }
msts := &mockServerTransportStream{}
ctx := grpc.NewContextWithServerTransportStream(context.Background(), msts)
interceptor := SetVersionHeaderUnaryServerInterceptor(*version)

_, err := interceptor(ctx, nil, &grpc.UnaryServerInfo{}, handler)

require.Error(t, err)
assert.Empty(t, msts.header)
})

t.Run("SetHeader error handling", func(t *testing.T) {
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return mockReturn, nil
}
msts := &mockServerTransportStream{isError: true}
ctx := grpc.NewContextWithServerTransportStream(context.Background(), msts)
interceptor := SetVersionHeaderUnaryServerInterceptor(*version)

m, err := interceptor(ctx, nil, &grpc.UnaryServerInfo{}, handler)

require.NoError(t, err)
require.Equal(t, mockReturn, m)
assert.Empty(t, msts.header)
})
}

type mockServerStream struct {
header metadata.MD
isError bool
}

func (msts mockServerStream) SetHeader(md metadata.MD) error {
if msts.isError {
return errors.New("simulate error setting header")
}
msts.header.Set(ArgoVersionHeader, md.Get(ArgoVersionHeader)...)
return nil
}
func (mockServerStream) SendHeader(md metadata.MD) error { return nil }
func (mockServerStream) SetTrailer(md metadata.MD) {}
func (mockServerStream) Context() context.Context { return context.Background() }
func (mockServerStream) SendMsg(m any) error { return nil }
func (mockServerStream) RecvMsg(m any) error { return nil }

var _ grpc.ServerStream = &mockServerStream{}

func TestSetVersionHeaderStreamServerInterceptor(t *testing.T) {
version := &wfv1.Version{Version: "v3.1.0"}

t.Run("success", func(t *testing.T) {
handler := func(srv any, stream grpc.ServerStream) error { return nil }
msts := &mockServerStream{header: metadata.New(nil)}
interceptor := SetVersionHeaderStreamServerInterceptor(*version)

err := interceptor(nil, msts, nil, handler)

require.NoError(t, err)
assert.Equal(t, metadata.Pairs(ArgoVersionHeader, version.Version), msts.header)
})

t.Run("upstream error handling", func(t *testing.T) {
handler := func(srv any, stream grpc.ServerStream) error {
return errors.New("test error")
}
msts := &mockServerStream{header: metadata.New(nil)}
interceptor := SetVersionHeaderStreamServerInterceptor(*version)

err := interceptor(nil, msts, nil, handler)

require.Error(t, err, "test error")
assert.Empty(t, msts.header)
})

t.Run("SetHeader error handling", func(t *testing.T) {
handler := func(srv any, stream grpc.ServerStream) error { return nil }
msts := &mockServerStream{isError: true}
interceptor := SetVersionHeaderStreamServerInterceptor(*version)

err := interceptor(nil, msts, nil, handler)

require.NoError(t, err)
assert.Empty(t, msts.header)
})
}
Loading