Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TLS, fine-grain Authorization, and List subscriptions #390

Merged
merged 8 commits into from
Aug 29, 2024
Merged
2 changes: 1 addition & 1 deletion .github/workflows/ci-workflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ env:
jobs:
build:
name: "Build"
runs-on: ubuntu-20.04
runs-on: ubuntu-22.04
steps:
- name: "Checkout"
uses: actions/checkout@v2
Expand Down
2 changes: 1 addition & 1 deletion allocator/allocator_key_space.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const (
ItemsPrefix = "/items/"
// MembersPrefix prefixes Member keys, eg "root/members/zone#suffix"
MembersPrefix = "/members/"
// AssignmentsPrefix prefixes Assignment keys, eg "prefix/assign/item-id#zone#member-suffix#slot"
// AssignmentsPrefix prefixes Assignment keys, eg "root/assign/item-id#zone#member-suffix#slot"
AssignmentsPrefix = "/assign/"
// '#' is selected as separator, because it's the first visual ASCII character
// which is not interpreted by shells (preceding visual characters are " and !).
Expand Down
166 changes: 166 additions & 0 deletions auth/auth.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package auth

import (
"context"
"encoding/base64"
"errors"
"fmt"
"strconv"
"strings"
"time"

"github.com/golang-jwt/jwt/v5"
pb "go.gazette.dev/core/broker/protocol"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)

// NewKeyedAuth returns a KeyedAuth that implements Authorizer and Verifier using
// the given pre-shared secret keys, which are base64 encoded and separate by
// whitespace and/or commas.
//
// The first key is used for signing Authorizations, and any key may verify
// a presented Authorization.
//
// The special value `AA==` (the base64 encoding of a single zero byte)
// will allow requests missing an authorization header to proceed, and should
// only be used temporarily for rollout of authorization in an existing cluster.
func NewKeyedAuth(base64Keys string) (*KeyedAuth, error) {
var keys jwt.VerificationKeySet
var allowMissing bool

for i, key := range strings.Fields(strings.ReplaceAll(base64Keys, ",", " ")) {
if key == "AA==" {
allowMissing = true
} else if b, err := base64.StdEncoding.DecodeString(key); err != nil {
return nil, fmt.Errorf("failed to decode key at index %d: %w", i, err)
} else {
keys.Keys = append(keys.Keys, b)
}
}
if len(keys.Keys) == 0 {
return nil, fmt.Errorf("at least one key must be provided")
}
return &KeyedAuth{keys, allowMissing}, nil
}

// KeyedAuth implements the pb.Authorizer and pb.Verifier
// interfaces using symmetric, pre-shared keys.
type KeyedAuth struct {
jwt.VerificationKeySet
allowMissing bool
}

func (k *KeyedAuth) Authorize(ctx context.Context, claims pb.Claims, exp time.Duration) (context.Context, error) {
var now = time.Now()
claims.IssuedAt = &jwt.NumericDate{Time: now}
claims.ExpiresAt = &jwt.NumericDate{Time: now.Add(exp)}
var token, err = jwt.NewWithClaims(jwt.SigningMethodHS256, claims).SignedString(k.Keys[0])

if err != nil {
return nil, err
}
return metadata.AppendToOutgoingContext(ctx, "authorization", fmt.Sprintf("Bearer %s", token)), nil
}

func (k *KeyedAuth) Verify(ctx context.Context, require pb.Capability) (context.Context, context.CancelFunc, pb.Claims, error) {
if claims, err := verifyWithKeys(ctx, require, k.VerificationKeySet, k.allowMissing); err != nil {
return nil, func() {}, claims, status.Error(codes.Unauthenticated, err.Error())
} else {
ctx, cancel := context.WithDeadline(ctx, claims.ExpiresAt.Time)
return ctx, cancel, claims, nil
}
}

// NewNoopAuth returns an Authorizer and Verifier which does nothing.
func NewNoopAuth() interface {
pb.Authorizer
pb.Verifier
} {
return &noop{}
}

type noop struct{}

func (k *noop) Authorize(ctx context.Context, claims pb.Claims, exp time.Duration) (context.Context, error) {
return ctx, nil
}
func (v *noop) Verify(ctx context.Context, require pb.Capability) (context.Context, context.CancelFunc, pb.Claims, error) {
return ctx, func() {}, pb.Claims{Capability: require}, nil
}

func verifyWithKeys(ctx context.Context, require pb.Capability, keys jwt.VerificationKeySet, allowMissing bool) (pb.Claims, error) {
var md, _ = metadata.FromIncomingContext(ctx)
var auth = md.Get("authorization")

if len(auth) == 0 {
if allowMissing {
return pb.Claims{
Capability: require,
RegisteredClaims: jwt.RegisteredClaims{
ExpiresAt: jwt.NewNumericDate(time.Now().Add(time.Hour)),
},
}, nil
}
return errClaims, ErrMissingAuth
} else if !strings.HasPrefix(auth[0], "Bearer ") {
return errClaims, ErrNotBearer
}
var bearer = strings.TrimPrefix(auth[0], "Bearer ")
var claims pb.Claims

if token, err := jwt.ParseWithClaims(bearer, &claims,
func(token *jwt.Token) (interface{}, error) { return keys, nil },
jwt.WithExpirationRequired(),
jwt.WithIssuedAt(),
jwt.WithLeeway(time.Second*5),
jwt.WithValidMethods([]string{"HS256", "HS384"}),
); err != nil {
return errClaims, fmt.Errorf("verifying Authorization: %w", err)
} else if !token.Valid {
panic("token.Valid must be true")
} else if err = verifyCapability(claims.Capability, require); err != nil {
return errClaims, err
} else {
return claims, nil
}
}

func verifyCapability(actual, require pb.Capability) error {
if actual&require == require {
return nil
}

// Nicer messages for common capabilities.
for _, i := range []struct {
cap pb.Capability
name string
}{
{pb.Capability_LIST, "LIST"},
{pb.Capability_APPLY, "APPLY"},
{pb.Capability_READ, "READ"},
{pb.Capability_APPEND, "APPEND"},
{pb.Capability_REPLICATE, "REPLICATE"},
} {
if require&i.cap != 0 && actual&i.cap == 0 {
return fmt.Errorf("authorization is missing required %s capability", i.name)
}
}

return fmt.Errorf("authorization is missing required capability (have %s, but require %s)",
strconv.FormatUint(uint64(actual), 2), strconv.FormatUint(uint64(require), 2))
}

var (
ErrMissingAuth = errors.New("missing or empty Authorization token")
ErrNotBearer = errors.New("invalid or unsupported Authorization header (expected 'Bearer')")

// errClaims is a defense-in-depth sentinel LabelSelector that won't match anything,
// just in case a caller fails to properly error-check a verification result.
errClaims = pb.Claims{
Selector: pb.LabelSelector{
Include: pb.MustLabelSet("this-label-will", "never-match"),
},
}
)
54 changes: 54 additions & 0 deletions auth/auth_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package auth_test

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.gazette.dev/core/auth"
pb "go.gazette.dev/core/broker/protocol"
"google.golang.org/grpc/metadata"
)

func TestKeyedAuthCases(t *testing.T) {
ka1, err := auth.NewKeyedAuth("c2VjcmV0,b3RoZXI=")
require.NoError(t, err)
ka2, err := auth.NewKeyedAuth("b3RoZXI=,c2VjcmV0")
require.NoError(t, err)
kaM, err := auth.NewKeyedAuth("YXNkZg==,AA==")
require.NoError(t, err)

// Authorize with one KeyedAuth...
ctx, err := ka1.Authorize(context.Background(),
pb.Claims{
Capability: pb.Capability_APPEND | pb.Capability_LIST,
Selector: pb.MustLabelSelector("hi=there"),
}, time.Hour)
require.NoError(t, err)

var md, _ = metadata.FromOutgoingContext(ctx)
ctx = metadata.NewIncomingContext(ctx, md)

// ...and verify with the other.
_, cancel, claims, err := ka2.Verify(ctx, pb.Capability_APPEND)
require.NoError(t, err)
cancel()
require.Equal(t, pb.MustLabelSelector("hi=there"), claims.Selector)

// Unless the capability doesn't match.
_, _, _, err = ka2.Verify(ctx, pb.Capability_REPLICATE)
require.EqualError(t, err,
"rpc error: code = Unauthenticated desc = authorization is missing required REPLICATE capability")

// A KeyedAuth with a diferent key rejects it.
_, _, _, err = kaM.Verify(ctx, pb.Capability_APPEND)
require.EqualError(t, err,
"rpc error: code = Unauthenticated desc = verifying Authorization: token signature is invalid: signature is invalid")

// A KeyedAuth that allows pass-through will accept a request without a token.
_, cancel, claims, err = kaM.Verify(context.Background(), pb.Capability_READ)
require.NoError(t, err)
cancel()
require.Equal(t, pb.MustLabelSelector(""), claims.Selector)
}
19 changes: 14 additions & 5 deletions broker/append_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

// Append dispatches the JournalServer.Append API.
func (svc *Service) Append(stream pb.Journal_AppendServer) (err error) {
func (svc *Service) Append(claims pb.Claims, stream pb.Journal_AppendServer) (err error) {
var (
fsm appendFSM
req *pb.AppendRequest
Expand All @@ -38,9 +38,10 @@ func (svc *Service) Append(stream pb.Journal_AppendServer) (err error) {
}

fsm = appendFSM{
svc: svc,
ctx: stream.Context(),
req: *req,
svc: svc,
ctx: stream.Context(),
claims: claims,
req: *req,
}
fsm.run(stream.Recv)

Expand Down Expand Up @@ -84,7 +85,15 @@ func (svc *Service) Append(stream pb.Journal_AppendServer) (err error) {
// proxyAppend forwards an AppendRequest to a resolved peer broker.
// Pass request by value as we'll later mutate it (via RecvMsg).
func proxyAppend(stream grpc.ServerStream, req pb.AppendRequest, jc pb.JournalClient) error {
var ctx = pb.WithDispatchRoute(stream.Context(), req.Header.Route, req.Header.ProcessId)
// We verified the client's authorization & claims and are running under its context.
// pb.AuthJournalClient will self-sign claims to proxy this journal on the client's behalf.
var ctx = pb.WithClaims(stream.Context(), pb.Claims{
Capability: pb.Capability_APPEND,
Selector: pb.LabelSelector{
Include: pb.MustLabelSet("name", req.Journal.String()),
},
})
ctx = pb.WithDispatchRoute(ctx, req.Header.Route, req.Header.ProcessId)

var client, err = jc.Append(ctx)
if err != nil {
Expand Down
25 changes: 23 additions & 2 deletions broker/append_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
func TestAppendSingle(t *testing.T) {
var ctx, etcd = pb.WithDispatchDefault(context.Background()), etcdtest.TestClient()
defer etcdtest.Cleanup()
ctx = pb.WithClaims(ctx, pb.Claims{Capability: pb.Capability_APPEND})

var broker = newTestBroker(t, etcd, pb.ProcessSpec_ID{Zone: "local", Suffix: "broker"})
setTestJournal(broker, pb.JournalSpec{Name: "a/journal", Replication: 1}, broker.id)
Expand Down Expand Up @@ -52,14 +53,14 @@ func TestAppendSingle(t *testing.T) {
func TestAppendRegisterCheckAndUpdateSequence(t *testing.T) {
var ctx, etcd = pb.WithDispatchDefault(context.Background()), etcdtest.TestClient()
defer etcdtest.Cleanup()
ctx = pb.WithClaims(ctx, pb.Claims{Capability: pb.Capability_APPEND})

var broker = newTestBroker(t, etcd, pb.ProcessSpec_ID{Zone: "local", Suffix: "broker"})
setTestJournal(broker, pb.JournalSpec{Name: "a/journal", Replication: 1}, broker.id)
broker.initialFragmentLoad()

var selector = func(s string) *pb.LabelSelector {
var sel, err = pb.ParseLabelSelector(s)
require.NoError(t, err)
var sel = pb.MustLabelSelector(s)
return &sel
}
// Run a sequence of appends, where each confirms and modifies registers.
Expand Down Expand Up @@ -125,6 +126,7 @@ func TestAppendRegisterCheckAndUpdateSequence(t *testing.T) {
func TestAppendBadlyBehavedClientCases(t *testing.T) {
var ctx, etcd = pb.WithDispatchDefault(context.Background()), etcdtest.TestClient()
defer etcdtest.Cleanup()
ctx = pb.WithClaims(ctx, pb.Claims{Capability: pb.Capability_APPEND})

var broker = newTestBroker(t, etcd, pb.ProcessSpec_ID{Zone: "local", Suffix: "broker"})
setTestJournal(broker, pb.JournalSpec{Name: "a/journal", Replication: 1}, broker.id)
Expand Down Expand Up @@ -192,6 +194,7 @@ func TestAppendBadlyBehavedClientCases(t *testing.T) {
func TestAppendRequestErrorCases(t *testing.T) {
var ctx, etcd = pb.WithDispatchDefault(context.Background()), etcdtest.TestClient()
defer etcdtest.Cleanup()
ctx = pb.WithClaims(ctx, pb.Claims{Capability: pb.Capability_APPEND})

var broker = newTestBroker(t, etcd, pb.ProcessSpec_ID{Zone: "local", Suffix: "broker"})

Expand Down Expand Up @@ -268,12 +271,30 @@ func TestAppendRequestErrorCases(t *testing.T) {
Header: *broker.header("read/only"),
}, resp)

// Case: Insufficient claimed capability.
stream, _ = broker.client().Append(pb.WithClaims(ctx, pb.Claims{Capability: pb.Capability_READ}))
_, err = stream.CloseAndRecv()
require.ErrorContains(t, err, "authorization is missing required APPEND capability")

// Case: Insufficient claimed selector.
stream, _ = broker.client().Append(pb.WithClaims(ctx, pb.Claims{
Capability: pb.Capability_APPEND,
Selector: pb.MustLabelSelector("name=something/else"),
}))
require.NoError(t, stream.Send(&pb.AppendRequest{Journal: "valid/journal", Offset: 1024}))

resp, err = stream.CloseAndRecv()
require.NoError(t, err)
require.Equal(t, pb.Status_JOURNAL_NOT_FOUND, resp.Status) // Journal not visible to these claims.
require.Len(t, resp.Header.Route.Endpoints, 0)

broker.cleanup()
}

func TestAppendProxyCases(t *testing.T) {
var ctx, etcd = pb.WithDispatchDefault(context.Background()), etcdtest.TestClient()
defer etcdtest.Cleanup()
ctx = pb.WithClaims(ctx, pb.Claims{Capability: pb.Capability_APPEND})

var broker = newTestBroker(t, etcd, pb.ProcessSpec_ID{Zone: "local", Suffix: "broker"})
var peer = newMockBroker(t, etcd, pb.ProcessSpec_ID{Zone: "peer", Suffix: "broker"})
Expand Down
Loading
Loading