From ce1df1e439778c6670a88d7fe89f08d098e2a25b Mon Sep 17 00:00:00 2001 From: Or Ozeri Date: Thu, 18 Apr 2024 20:31:20 +0300 Subject: [PATCH] pkg/policyengine: Cleanup This commit moves code from the policyengine package to the authz package. It also includes a re-writing of the load balancer. Signed-off-by: Or Ozeri --- CODEOWNERS | 14 +- cmd/cl-controlplane/app/server.go | 14 +- demos/iperf3/kind/README.md | 8 +- .../policies}/allowAll.json | 0 .../policies}/allowToReviews.json | 0 .../policies}/deny_from_gw1.json | 0 pkg/apis/clusterlink.net/v1alpha1/import.go | 14 +- .../authz}/connectivitypdp/accesspolicy.go | 0 .../connectivitypdp/connectivity_pdp.go | 100 +++--- .../connectivitypdp/connectivity_pdp_test.go | 88 ++--- .../connectivitypdp/test_data/all_layers.yaml | 0 .../mixed_policies_and_other_resources.yaml | 0 .../connectivitypdp/test_data/not_a_yaml | 0 .../test_data/privileged_and_regular.yaml | 0 .../test_data/simple_privileged.yaml | 0 pkg/controlplane/authz/controllers.go | 7 +- pkg/controlplane/authz/loadbalancer.go | 186 +++++++++++ pkg/controlplane/authz/manager.go | 316 +++++++++++------- pkg/controlplane/authz/server.go | 18 +- pkg/controlplane/control/controllers.go | 2 +- pkg/controlplane/control/manager.go | 31 +- pkg/controlplane/control/peer.go | 14 +- pkg/controlplane/rest/accesspolicy.go | 2 +- pkg/controlplane/rest/export.go | 44 ++- pkg/controlplane/rest/import.go | 9 - pkg/controlplane/rest/peer.go | 28 +- pkg/controlplane/store/types.go | 8 +- pkg/policyengine/PolicyDispatcher.go | 207 ------------ pkg/policyengine/PolicyDispatcher_test.go | 280 ---------------- .../connectivitypdp/connection_request.go | 43 --- pkg/policyengine/loadBalancer.go | 160 --------- pkg/policyengine/loadBalancer_test.go | 177 ---------- tests/e2e/k8s/test_basic.go | 37 +- tests/e2e/k8s/test_loadbalancing.go | 208 ++++++++++++ tests/e2e/k8s/test_policy.go | 20 +- tests/e2e/k8s/util/clusterlink.go | 10 + tests/k8s.sh | 2 +- .../content/en/docs/main/concepts/policies.md | 4 +- 38 files changed, 851 insertions(+), 1200 deletions(-) rename {pkg/policyengine/examples => examples/policies}/allowAll.json (100%) rename {pkg/policyengine/examples => examples/policies}/allowToReviews.json (100%) rename {pkg/policyengine/examples => examples/policies}/deny_from_gw1.json (100%) rename pkg/{policyengine => controlplane/authz}/connectivitypdp/accesspolicy.go (100%) rename pkg/{policyengine => controlplane/authz}/connectivitypdp/connectivity_pdp.go (75%) rename pkg/{policyengine => controlplane/authz}/connectivitypdp/connectivity_pdp_test.go (71%) rename pkg/{policyengine => controlplane/authz}/connectivitypdp/test_data/all_layers.yaml (100%) rename pkg/{policyengine => controlplane/authz}/connectivitypdp/test_data/mixed_policies_and_other_resources.yaml (100%) rename pkg/{policyengine => controlplane/authz}/connectivitypdp/test_data/not_a_yaml (100%) rename pkg/{policyengine => controlplane/authz}/connectivitypdp/test_data/privileged_and_regular.yaml (100%) rename pkg/{policyengine => controlplane/authz}/connectivitypdp/test_data/simple_privileged.yaml (100%) create mode 100644 pkg/controlplane/authz/loadbalancer.go delete mode 100644 pkg/policyengine/PolicyDispatcher.go delete mode 100644 pkg/policyengine/PolicyDispatcher_test.go delete mode 100644 pkg/policyengine/connectivitypdp/connection_request.go delete mode 100644 pkg/policyengine/loadBalancer.go delete mode 100644 pkg/policyengine/loadBalancer_test.go create mode 100644 tests/e2e/k8s/test_loadbalancing.go diff --git a/CODEOWNERS b/CODEOWNERS index 027bee1a..600d3bdf 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -2,11 +2,11 @@ # the repo. Unless a later match takes precedence, # Order is important; the last matching pattern takes the most # precedence. -* @elevran -cmd/ @kfirtoledo @orozery -demos/ @kfirtoledo -pkg/ @elevran @kfirtoledo @orozery -pkg/dataplane/go @praveingk @orozery -pkg/policyengine/ @zivnevo -website/ @elevran @michalmalka +* @elevran +cmd/ @kfirtoledo @orozery +demos/ @kfirtoledo +pkg/ @elevran @kfirtoledo @orozery +pkg/dataplane/go @praveingk @orozery +pkg/controlplane/authz/ @zivnevo +website/ @elevran @michalmalka diff --git a/cmd/cl-controlplane/app/server.go b/cmd/cl-controlplane/app/server.go index f4f63832..9f6793df 100644 --- a/cmd/cl-controlplane/app/server.go +++ b/cmd/cl-controlplane/app/server.go @@ -189,7 +189,7 @@ func (o *Options) Run() error { httpServer := utilrest.NewServer("controlplane-http", parsedCertData.ServerConfig()) grpcServer := grpc.NewServer("controlplane-grpc", parsedCertData.ServerConfig()) - authzManager, err := authz.NewManager(parsedCertData) + authzManager, err := authz.NewManager(parsedCertData, mgr.GetClient(), namespace) if err != nil { return fmt.Errorf("cannot create authorization manager: %w", err) } @@ -240,10 +240,16 @@ func (o *Options) Run() error { cprest.RegisterHandlers(restManager, httpServer) - controlManager.SetGetMergeImportListCallback(restManager.GetMergeImportList) + authzManager.SetGetImportCallback(restManager.GetK8sImport) + authzManager.SetGetExportCallback(restManager.GetK8sExport) + authzManager.SetGetPeerCallback(restManager.GetK8sPeer) controlManager.SetGetImportCallback(restManager.GetK8sImport) - controlManager.SetStatusCallback(func(pr *v1alpha1.Peer) { - authzManager.AddPeer(pr) + controlManager.SetGetMergeImportListCallback(restManager.GetMergeImportList) + controlManager.SetPeerStatusCallback(func(pr *v1alpha1.Peer) { + restManager.UpdatePeerStatus(pr.Name, &pr.Status) + }) + controlManager.SetExportStatusCallback(func(export *v1alpha1.Export) { + restManager.UpdateExportStatus(export.Name, &export.Status) }) } diff --git a/demos/iperf3/kind/README.md b/demos/iperf3/kind/README.md index 4ac37329..4d9b9919 100644 --- a/demos/iperf3/kind/README.md +++ b/demos/iperf3/kind/README.md @@ -164,16 +164,16 @@ When running Kind cluster on macOS run instead the following: ### Step 7: Create access policy In this step, we create a policy that allow to all traffic from peer1 and peer2: - gwctl --myid peer1 create policy --type access --policyFile $PROJECT_DIR/pkg/policyengine/examples/allowAll.json - gwctl --myid peer2 create policy --type access --policyFile $PROJECT_DIR/pkg/policyengine/examples/allowAll.json + gwctl --myid peer1 create policy --type access --policyFile $PROJECT_DIR/examples/policies/allowAll.json + gwctl --myid peer2 create policy --type access --policyFile $PROJECT_DIR/examples/policies/allowAll.json When running Kind cluster on macOS run instead the following: kubectl config use-context kind-peer1 - kubectl cp $PROJECT_DIR/pkg/policyengine/examples/allowAll.json gwctl:/tmp/allowAll.json + kubectl cp $PROJECT_DIR/examples/policies/allowAll.json gwctl:/tmp/allowAll.json kubectl exec -i $GWCTL1 -- gwctl create policy --type access --policyFile /tmp/allowAll.json kubectl config use-context kind-peer2 - kubectl cp $PROJECT_DIR/pkg/policyengine/examples/allowAll.json gwctl:/tmp/allowAll.json + kubectl cp $PROJECT_DIR/examples/policies//allowAll.json gwctl:/tmp/allowAll.json kubectl exec -i $GWCTL2 -- gwctl create policy --type access --policyFile /tmp/allowAll.json ### Final Step : Test Service connectivity diff --git a/pkg/policyengine/examples/allowAll.json b/examples/policies/allowAll.json similarity index 100% rename from pkg/policyengine/examples/allowAll.json rename to examples/policies/allowAll.json diff --git a/pkg/policyengine/examples/allowToReviews.json b/examples/policies/allowToReviews.json similarity index 100% rename from pkg/policyengine/examples/allowToReviews.json rename to examples/policies/allowToReviews.json diff --git a/pkg/policyengine/examples/deny_from_gw1.json b/examples/policies/deny_from_gw1.json similarity index 100% rename from pkg/policyengine/examples/deny_from_gw1.json rename to examples/policies/deny_from_gw1.json diff --git a/pkg/apis/clusterlink.net/v1alpha1/import.go b/pkg/apis/clusterlink.net/v1alpha1/import.go index d9b140d9..abd8e2dc 100644 --- a/pkg/apis/clusterlink.net/v1alpha1/import.go +++ b/pkg/apis/clusterlink.net/v1alpha1/import.go @@ -42,6 +42,17 @@ type ImportSource struct { ExportNamespace string `json:"exportNamespace"` } +// LBScheme represents a load balancing scheme. +type LBScheme string + +const ( + LBSchemeRandom LBScheme = "random" + LBSchemeRoundRobin LBScheme = "round-robin" + LBSchemeStatic LBScheme = "static" + + LBSchemeDefault = LBSchemeRoundRobin +) + // ImportSpec contains all attributes of an imported service. type ImportSpec struct { // Port of the imported service. @@ -53,8 +64,7 @@ type ImportSpec struct { Sources []ImportSource `json:"sources"` // +kubebuilder:default="round-robin" // LBScheme is the load-balancing scheme to use (e.g., random, static, round-robin) - LBScheme string `json:"lbScheme"` - // TODO: Make LBScheme a proper type (when backwards compatibility is no longer needed) + LBScheme LBScheme `json:"lbScheme"` } const ( diff --git a/pkg/policyengine/connectivitypdp/accesspolicy.go b/pkg/controlplane/authz/connectivitypdp/accesspolicy.go similarity index 100% rename from pkg/policyengine/connectivitypdp/accesspolicy.go rename to pkg/controlplane/authz/connectivitypdp/accesspolicy.go diff --git a/pkg/policyengine/connectivitypdp/connectivity_pdp.go b/pkg/controlplane/authz/connectivitypdp/connectivity_pdp.go similarity index 75% rename from pkg/policyengine/connectivitypdp/connectivity_pdp.go rename to pkg/controlplane/authz/connectivitypdp/connectivity_pdp.go index 35bf749b..db944f06 100644 --- a/pkg/policyengine/connectivitypdp/connectivity_pdp.go +++ b/pkg/controlplane/authz/connectivitypdp/connectivity_pdp.go @@ -127,45 +127,32 @@ func (pdp *PDP) DeletePolicy(policyName types.NamespacedName, privileged bool) e return pdp.regularPolicies.deletePolicy(policyName) } -// Decide makes allow/deny decisions for the queried connections between src and each of destinations in dests. -// The decision, as well as the deciding policy, are recorded in the returned slice of DestinationDecision structs. -// The order of destinations in dests is preserved in the returned slice. -func (pdp *PDP) Decide(src WorkloadAttrs, dests []WorkloadAttrs, ns string) ([]DestinationDecision, error) { - decisions := make([]DestinationDecision, len(dests)) - for i, dest := range dests { - decisions[i] = DestinationDecision{Destination: dest} - } +// Decide makes allow/deny decisions for the queried connection between src and dest. +// The decision, as well as the deciding policy, is recorded in the returned DestinationDecision struct. +func (pdp *PDP) Decide(src, dest WorkloadAttrs, ns string) (*DestinationDecision, error) { + decision := DestinationDecision{Destination: dest} - allDestsDecided, err := pdp.privilegedPolicies.decide(src, decisions, ns) + decided, err := pdp.privilegedPolicies.decide(src, &decision, ns) if err != nil { return nil, err } - if allDestsDecided { - return decisions, nil + if decided { + return &decision, nil } - allDestsDecided, err = pdp.regularPolicies.decide(src, decisions, ns) + decided, err = pdp.regularPolicies.decide(src, &decision, ns) if err != nil { return nil, err } - if allDestsDecided { - return decisions, nil + if decided { + return &decision, nil } - // For all undecided destination (for which no policy matched) set the default deny action - denyUndecidedDestinations(decisions) - return decisions, nil -} - -func denyUndecidedDestinations(dest []DestinationDecision) { - for i := range dest { - dd := &dest[i] - if dd.Decision == DecisionUndecided { - dd.Decision = DecisionDeny - dd.MatchedBy = DefaultDenyPolicyName - dd.PrivilegedMatch = false - } - } + // for an undecided destination (no policy matched) set the default deny action + decision.Decision = DecisionDeny + decision.MatchedBy = DefaultDenyPolicyName + decision.PrivilegedMatch = false + return &decision, nil } func newPolicyTier(privileged bool) policyTier { @@ -223,62 +210,53 @@ func (pt *policyTier) unsafeDeletePolicy(policyName types.NamespacedName) error return nil } -// decide first checks whether any of the tier's deny policies matches any of the not-yet-decided connections -// between src and each of the destinations in dests. If one policy does, the relevant DestinationDecision will +// decide first checks whether any of the tier's deny policies matches any of the not-yet-decided connection +// between src and dest. If one policy does, the DestinationDecision will // be updated to reflect the connection been denied. -// The function then checks whether any of the tier's allow policies matches any of the remaining undecided connections, -// and will similarly update the relevant DestinationDecision of any matching connection. -// returns whether all destinations were decided and an error (if occurred). -func (pt *policyTier) decide(src WorkloadAttrs, dests []DestinationDecision, ns string) (bool, error) { +// If the connection is not decided, the function then checks whether any of the tier's allow policies matches, +// and will similarly update the DestinationDecision. +// returns whether the destination was decided and an error (if occurred). +func (pt *policyTier) decide(src WorkloadAttrs, dest *DestinationDecision, ns string) (bool, error) { pt.lock.RLock() // allowing multiple simultaneous calls to decide() to be served defer pt.lock.RUnlock() - allDecided, err := pt.denyPolicies.decide(src, dests, pt.privileged, ns) + decided, err := pt.denyPolicies.decide(src, dest, pt.privileged, ns) if err != nil { return false, err } - if allDecided { + if decided { return true, nil } - allDecided, err = pt.allowPolicies.decide(src, dests, pt.privileged, ns) + decided, err = pt.allowPolicies.decide(src, dest, pt.privileged, ns) if err != nil { return false, err } - return allDecided, nil + return decided, nil } // decide iterates over all policies in a connPolicyMap and checks if they make a connectivity decision (allow/deny) -// on the not-yet-decided connections between src and each of the destinations in dests. -// returns whether all destinations were decided and an error (if occurred). -func (cpm connPolicyMap) decide(src WorkloadAttrs, dests []DestinationDecision, privileged bool, ns string) (bool, error) { +// on the not-yet-decided connection between src and dest. +// returns whether the destination was decided and an error (if occurred). +func (cpm connPolicyMap) decide(src WorkloadAttrs, dest *DestinationDecision, privileged bool, ns string) (bool, error) { // for when there are no policies in cpm (some destinations are undecided, otherwise we shouldn't be here) - allDecided := false for policyName, policy := range cpm { if !privileged && policyName.Namespace != ns { // Only consider non-privileged policies from the given namespace continue } - allDecided = true // assume all destinations were decided, unless we find a destination which is not - for i := range dests { - dest := &dests[i] - if dest.Decision == DecisionUndecided { - decision, err := accessPolicyDecide(policy, src, dest.Destination) - if err != nil { - return false, err - } - if decision == DecisionUndecided { - allDecided = false // policy didn't match dest - not all dests are decided - } else { // policy matched - we now have a decision for dest - dest.Decision = decision - dest.MatchedBy = policyName.String() - dest.PrivilegedMatch = privileged - } - } + + decision, err := accessPolicyDecide(policy, src, dest.Destination) + if err != nil { + return false, err } - if allDecided { - break + if decision != DecisionUndecided { // policy matched - we now have a decision for dest + dest.Decision = decision + dest.MatchedBy = policyName.String() + dest.PrivilegedMatch = privileged + return true, nil } } - return allDecided, nil + + return false, nil } // accessPolicyDecide returns a policy's decision on a given connection. diff --git a/pkg/policyengine/connectivitypdp/connectivity_pdp_test.go b/pkg/controlplane/authz/connectivitypdp/connectivity_pdp_test.go similarity index 71% rename from pkg/policyengine/connectivitypdp/connectivity_pdp_test.go rename to pkg/controlplane/authz/connectivitypdp/connectivity_pdp_test.go index 74323b11..138c848c 100644 --- a/pkg/policyengine/connectivitypdp/connectivity_pdp_test.go +++ b/pkg/controlplane/authz/connectivitypdp/connectivity_pdp_test.go @@ -28,7 +28,7 @@ import ( "k8s.io/apimachinery/pkg/util/yaml" "github.com/clusterlink-net/clusterlink/pkg/apis/clusterlink.net/v1alpha1" - "github.com/clusterlink-net/clusterlink/pkg/policyengine/connectivitypdp" + "github.com/clusterlink-net/clusterlink/pkg/controlplane/authz/connectivitypdp" ) const ( @@ -66,30 +66,27 @@ func TestPrivilegedVsRegular(t *testing.T) { } pdp := connectivitypdp.NewPDP() - dests := []connectivitypdp.WorkloadAttrs{trivialLabel} - decisions, err := pdp.Decide(trivialLabel, dests, defaultNS) + decision, err := pdp.Decide(trivialLabel, trivialLabel, defaultNS) require.Nil(t, err) - require.Equal(t, connectivitypdp.DecisionDeny, decisions[0].Decision) // default deny - require.Equal(t, connectivitypdp.DefaultDenyPolicyName, decisions[0].MatchedBy) - require.Equal(t, false, decisions[0].PrivilegedMatch) + require.Equal(t, connectivitypdp.DecisionDeny, decision.Decision) // default deny + require.Equal(t, connectivitypdp.DefaultDenyPolicyName, decision.MatchedBy) + require.Equal(t, false, decision.PrivilegedMatch) err = pdp.AddOrUpdatePolicy(connectivitypdp.PolicyFromCR(&trivialConnPol)) require.Nil(t, err) - dests = []connectivitypdp.WorkloadAttrs{trivialLabel} - decisions, err = pdp.Decide(trivialLabel, dests, defaultNS) + decision, err = pdp.Decide(trivialLabel, trivialLabel, defaultNS) require.Nil(t, err) - require.Equal(t, connectivitypdp.DecisionAllow, decisions[0].Decision) // regular allow policy allows connection - require.Equal(t, types.NamespacedName{Name: "reg", Namespace: defaultNS}.String(), decisions[0].MatchedBy) - require.Equal(t, false, decisions[0].PrivilegedMatch) + require.Equal(t, connectivitypdp.DecisionAllow, decision.Decision) // regular allow policy allows connection + require.Equal(t, types.NamespacedName{Name: "reg", Namespace: defaultNS}.String(), decision.MatchedBy) + require.Equal(t, false, decision.PrivilegedMatch) err = pdp.AddOrUpdatePolicy(connectivitypdp.PolicyFromPrivilegedCR(&trivialPrivConnPol)) require.Nil(t, err) - dests = []connectivitypdp.WorkloadAttrs{trivialLabel} - decisions, err = pdp.Decide(trivialLabel, dests, defaultNS) + decision, err = pdp.Decide(trivialLabel, trivialLabel, defaultNS) require.Nil(t, err) - require.Equal(t, connectivitypdp.DecisionDeny, decisions[0].Decision) // privileged deny policy denies connection - require.Equal(t, types.NamespacedName{Name: "priv"}.String(), decisions[0].MatchedBy) - require.Equal(t, true, decisions[0].PrivilegedMatch) + require.Equal(t, connectivitypdp.DecisionDeny, decision.Decision) // privileged deny policy denies connection + require.Equal(t, types.NamespacedName{Name: "priv"}.String(), decision.MatchedBy) + require.Equal(t, true, decision.PrivilegedMatch) } // TestAllLayers starts with one policy per layer (allow/deny X privileged/non-privileged) @@ -103,66 +100,69 @@ func TestAllLayers(t *testing.T) { err := addPoliciesFromFile(pdp, fileInTestDir("all_layers.yaml")) require.Nil(t, err) + decision, err := pdp.Decide(trivialLabel, trivialLabel, defaultNS) + require.Nil(t, err) + require.Equal(t, connectivitypdp.DecisionDeny, decision.Decision) // default deny + require.Equal(t, connectivitypdp.DefaultDenyPolicyName, decision.MatchedBy) + require.Equal(t, false, decision.PrivilegedMatch) + require.Equal(t, trivialLabel, decision.Destination) + nonMeteringLabel := connectivitypdp.WorkloadAttrs{"workloadName": "non-metering-service"} + decision, err = pdp.Decide(trivialLabel, nonMeteringLabel, defaultNS) + require.Nil(t, err) + require.Equal(t, connectivitypdp.DecisionAllow, decision.Decision) // regular allow + require.Equal(t, false, decision.PrivilegedMatch) + meteringLabel := connectivitypdp.WorkloadAttrs{"workloadName": "global-metering-service"} + decision, err = pdp.Decide(trivialLabel, meteringLabel, defaultNS) + require.Nil(t, err) + require.Equal(t, connectivitypdp.DecisionDeny, decision.Decision) // regular deny + require.Equal(t, false, decision.PrivilegedMatch) + privateMeteringLabel := connectivitypdp.WorkloadAttrs{"workloadName": "global-metering-service", "environment": "prod"} - dests := []connectivitypdp.WorkloadAttrs{trivialLabel, nonMeteringLabel, meteringLabel, privateMeteringLabel} - decisions, err := pdp.Decide(trivialLabel, dests, defaultNS) + decision, err = pdp.Decide(trivialLabel, privateMeteringLabel, defaultNS) require.Nil(t, err) - require.Equal(t, connectivitypdp.DecisionDeny, decisions[0].Decision) // default deny - require.Equal(t, connectivitypdp.DefaultDenyPolicyName, decisions[0].MatchedBy) - require.Equal(t, false, decisions[0].PrivilegedMatch) - require.Equal(t, trivialLabel, decisions[0].Destination) - require.Equal(t, connectivitypdp.DecisionAllow, decisions[1].Decision) // regular allow - require.Equal(t, false, decisions[1].PrivilegedMatch) - require.Equal(t, connectivitypdp.DecisionDeny, decisions[2].Decision) // regular deny - require.Equal(t, false, decisions[2].PrivilegedMatch) - require.Equal(t, connectivitypdp.DecisionAllow, decisions[3].Decision) // privileged allow - require.Equal(t, true, decisions[3].PrivilegedMatch) + require.Equal(t, connectivitypdp.DecisionAllow, decision.Decision) // privileged allow + require.Equal(t, true, decision.PrivilegedMatch) privateLabel := map[string]string{"classification": "private", "environment": "prod"} - dests = []connectivitypdp.WorkloadAttrs{privateMeteringLabel} - decisions, err = pdp.Decide(privateLabel, dests, defaultNS) + decision, err = pdp.Decide(privateLabel, privateMeteringLabel, defaultNS) require.Nil(t, err) - require.Equal(t, connectivitypdp.DecisionDeny, decisions[0].Decision) // privileged deny - require.Equal(t, true, decisions[0].PrivilegedMatch) + require.Equal(t, connectivitypdp.DecisionDeny, decision.Decision) // privileged deny + require.Equal(t, true, decision.PrivilegedMatch) privDenyPolicy := getNameOfFirstPolicyInPDP(pdp, v1alpha1.AccessPolicyActionDeny, true) require.NotEmpty(t, privDenyPolicy) err = pdp.DeletePolicy(types.NamespacedName{Name: privDenyPolicy}, true) require.Nil(t, err) - dests = []connectivitypdp.WorkloadAttrs{privateMeteringLabel} - decisions, err = pdp.Decide(privateLabel, dests, defaultNS) + decision, err = pdp.Decide(privateLabel, privateMeteringLabel, defaultNS) require.Nil(t, err) // no privileged deny, so privileged allow matches - require.Equal(t, connectivitypdp.DecisionAllow, decisions[0].Decision) + require.Equal(t, connectivitypdp.DecisionAllow, decision.Decision) privAllowPolicy := getNameOfFirstPolicyInPDP(pdp, v1alpha1.AccessPolicyActionAllow, true) require.NotEmpty(t, privAllowPolicy) err = pdp.DeletePolicy(types.NamespacedName{Name: privAllowPolicy}, true) require.Nil(t, err) - dests = []connectivitypdp.WorkloadAttrs{privateMeteringLabel} - decisions, err = pdp.Decide(privateLabel, dests, defaultNS) + decision, err = pdp.Decide(privateLabel, privateMeteringLabel, defaultNS) require.Nil(t, err) - require.Equal(t, connectivitypdp.DecisionDeny, decisions[0].Decision) // no privileged allow, so regular deny matches + require.Equal(t, connectivitypdp.DecisionDeny, decision.Decision) // no privileged allow, so regular deny matches regDenyPolicy := getNameOfFirstPolicyInPDP(pdp, v1alpha1.AccessPolicyActionDeny, false) require.NotEmpty(t, regDenyPolicy) err = pdp.DeletePolicy(types.NamespacedName{Name: regDenyPolicy, Namespace: defaultNS}, false) require.Nil(t, err) - dests = []connectivitypdp.WorkloadAttrs{privateMeteringLabel} - decisions, err = pdp.Decide(privateLabel, dests, defaultNS) + decision, err = pdp.Decide(privateLabel, privateMeteringLabel, defaultNS) require.Nil(t, err) - require.Equal(t, connectivitypdp.DecisionAllow, decisions[0].Decision) // no regular deny, so regular allow matches + require.Equal(t, connectivitypdp.DecisionAllow, decision.Decision) // no regular deny, so regular allow matches regAllowPolicy := getNameOfFirstPolicyInPDP(pdp, v1alpha1.AccessPolicyActionAllow, false) require.NotEmpty(t, regAllowPolicy) err = pdp.DeletePolicy(types.NamespacedName{Name: regAllowPolicy, Namespace: defaultNS}, false) require.Nil(t, err) - dests = []connectivitypdp.WorkloadAttrs{privateMeteringLabel} - decisions, err = pdp.Decide(privateLabel, dests, defaultNS) + decision, err = pdp.Decide(privateLabel, privateMeteringLabel, defaultNS) require.Nil(t, err) - require.Equal(t, connectivitypdp.DecisionDeny, decisions[0].Decision) // no regular allow, so default deny matches + require.Equal(t, connectivitypdp.DecisionDeny, decision.Decision) // no regular allow, so default deny matches } func getNameOfFirstPolicyInPDP(pdp *connectivitypdp.PDP, action v1alpha1.AccessPolicyAction, privileged bool) string { diff --git a/pkg/policyengine/connectivitypdp/test_data/all_layers.yaml b/pkg/controlplane/authz/connectivitypdp/test_data/all_layers.yaml similarity index 100% rename from pkg/policyengine/connectivitypdp/test_data/all_layers.yaml rename to pkg/controlplane/authz/connectivitypdp/test_data/all_layers.yaml diff --git a/pkg/policyengine/connectivitypdp/test_data/mixed_policies_and_other_resources.yaml b/pkg/controlplane/authz/connectivitypdp/test_data/mixed_policies_and_other_resources.yaml similarity index 100% rename from pkg/policyengine/connectivitypdp/test_data/mixed_policies_and_other_resources.yaml rename to pkg/controlplane/authz/connectivitypdp/test_data/mixed_policies_and_other_resources.yaml diff --git a/pkg/policyengine/connectivitypdp/test_data/not_a_yaml b/pkg/controlplane/authz/connectivitypdp/test_data/not_a_yaml similarity index 100% rename from pkg/policyengine/connectivitypdp/test_data/not_a_yaml rename to pkg/controlplane/authz/connectivitypdp/test_data/not_a_yaml diff --git a/pkg/policyengine/connectivitypdp/test_data/privileged_and_regular.yaml b/pkg/controlplane/authz/connectivitypdp/test_data/privileged_and_regular.yaml similarity index 100% rename from pkg/policyengine/connectivitypdp/test_data/privileged_and_regular.yaml rename to pkg/controlplane/authz/connectivitypdp/test_data/privileged_and_regular.yaml diff --git a/pkg/policyengine/connectivitypdp/test_data/simple_privileged.yaml b/pkg/controlplane/authz/connectivitypdp/test_data/simple_privileged.yaml similarity index 100% rename from pkg/policyengine/connectivitypdp/test_data/simple_privileged.yaml rename to pkg/controlplane/authz/connectivitypdp/test_data/simple_privileged.yaml diff --git a/pkg/controlplane/authz/controllers.go b/pkg/controlplane/authz/controllers.go index 6ecbfca2..b5adfde1 100644 --- a/pkg/controlplane/authz/controllers.go +++ b/pkg/controlplane/authz/controllers.go @@ -21,7 +21,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "github.com/clusterlink-net/clusterlink/pkg/apis/clusterlink.net/v1alpha1" - "github.com/clusterlink-net/clusterlink/pkg/policyengine/connectivitypdp" + "github.com/clusterlink-net/clusterlink/pkg/controlplane/authz/connectivitypdp" "github.com/clusterlink-net/clusterlink/pkg/util/controller" ) @@ -78,11 +78,10 @@ func CreateControllers(mgr *Manager, controllerManager ctrl.Manager, crdMode boo Name: "authz.import", Object: &v1alpha1.Import{}, AddHandler: func(ctx context.Context, object any) error { - mgr.AddImport(object.(*v1alpha1.Import)) return nil }, DeleteHandler: func(ctx context.Context, name types.NamespacedName) error { - return mgr.DeleteImport(name) + return nil }, }) if err != nil { @@ -93,11 +92,9 @@ func CreateControllers(mgr *Manager, controllerManager ctrl.Manager, crdMode boo Name: "authz.export", Object: &v1alpha1.Export{}, AddHandler: func(ctx context.Context, object any) error { - mgr.AddExport(object.(*v1alpha1.Export)) return nil }, DeleteHandler: func(ctx context.Context, name types.NamespacedName) error { - mgr.DeleteExport(name) return nil }, }) diff --git a/pkg/controlplane/authz/loadbalancer.go b/pkg/controlplane/authz/loadbalancer.go new file mode 100644 index 00000000..bfec8018 --- /dev/null +++ b/pkg/controlplane/authz/loadbalancer.go @@ -0,0 +1,186 @@ +// Copyright 2023 The ClusterLink Authors. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package authz + +import ( + "fmt" + "math/rand" + "sync" + "sync/atomic" + + "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/types" + + crds "github.com/clusterlink-net/clusterlink/pkg/apis/clusterlink.net/v1alpha1" +) + +type importState struct { + roundRobinCounter atomic.Uint32 +} + +type LoadBalancer struct { + lock sync.RWMutex + states map[types.NamespacedName]*importState + + logger *logrus.Entry +} + +type LoadBalancingResult struct { + imp *crds.Import + currentIndex int + failed map[int]interface{} + delayed []int +} + +func (r *LoadBalancingResult) Get() *crds.ImportSource { + if r.currentIndex == -1 { + return nil + } + return &r.imp.Spec.Sources[r.currentIndex] +} + +func (r *LoadBalancingResult) IsDelayed() bool { + return len(r.imp.Spec.Sources) == len(r.failed) +} + +func (r *LoadBalancingResult) Delay() { + r.delayed = append(r.delayed, r.currentIndex) +} + +func NewLoadBalancingResult(imp *crds.Import) *LoadBalancingResult { + return &LoadBalancingResult{ + imp: imp, + currentIndex: -1, + failed: make(map[int]interface{}), + } +} + +// NewLoadBalancer returns a new instance of a LoadBalancer object. +func NewLoadBalancer() *LoadBalancer { + logger := logrus.WithField("component", "controlplane.authz.loadbalancer") + + return &LoadBalancer{ + states: make(map[types.NamespacedName]*importState), + logger: logger, + } +} + +func (lb *LoadBalancer) selectRandom(result *LoadBalancingResult) { + sources := &result.imp.Spec.Sources + candidateCount := len(*sources) + index := rand.Intn(candidateCount) //nolint:gosec // G404: use of weak random is fine for load balancing + for i := 0; i < candidateCount; i++ { + if _, ok := result.failed[index]; !ok { + result.currentIndex = index + return + } + + index++ + if index == candidateCount { + index = 0 + } + } +} + +func (lb *LoadBalancer) selectRoundRobin(result *LoadBalancingResult) { + imp := result.imp + sourceCount := len(imp.Spec.Sources) + + name := types.NamespacedName{ + Namespace: imp.Namespace, + Name: imp.Name, + } + + lb.lock.RLock() + state := lb.states[name] + lb.lock.RUnlock() + + if state == nil { + lb.lock.Lock() + state = lb.states[name] + if state == nil { + state = &importState{} + lb.states[name] = state + } + lb.lock.Unlock() + } + + counter := state.roundRobinCounter.Add(1) + + if result.currentIndex != -1 { + result.currentIndex++ + if result.currentIndex == sourceCount { + result.currentIndex = 0 + } + return + } + + result.currentIndex = int(counter) % sourceCount +} + +func (lb *LoadBalancer) selectStatic(result *LoadBalancingResult) { + result.currentIndex++ +} + +// Select one of the import sources, based on the set load balancing scheme. +func (lb *LoadBalancer) Select(result *LoadBalancingResult) error { + if result.currentIndex != -1 { + result.failed[result.currentIndex] = nil + } + + imp := result.imp + sources := &imp.Spec.Sources + if len(result.failed) == len(*sources) { + if len(result.delayed) > 0 { + result.currentIndex = result.delayed[0] + result.delayed = result.delayed[1:] + + lb.logger.WithFields(logrus.Fields{ + "import-name": imp.Name, + "import-namespace": imp.Namespace, + "result-index": result.currentIndex, + }).Info("Select delayed") + return nil + } + return fmt.Errorf("tried out all %d sources", len(imp.Spec.Sources)) + } + + scheme := getScheme(imp) + switch scheme { + case crds.LBSchemeRandom: + lb.selectRandom(result) + case crds.LBSchemeRoundRobin: + lb.selectRoundRobin(result) + case crds.LBSchemeStatic: + lb.selectStatic(result) + } + + lb.logger.WithFields(logrus.Fields{ + "import-name": imp.Name, + "import-namespace": imp.Namespace, + "scheme": scheme, + "attempt": len(result.failed), + "result-index": result.currentIndex, + }).Info("Select") + + return nil +} + +func getScheme(imp *crds.Import) crds.LBScheme { + if imp.Spec.LBScheme == "" { + return crds.LBSchemeDefault + } + + return imp.Spec.LBScheme +} diff --git a/pkg/controlplane/authz/manager.go b/pkg/controlplane/authz/manager.go index 04083f94..93c3dee9 100644 --- a/pkg/controlplane/authz/manager.go +++ b/pkg/controlplane/authz/manager.go @@ -14,6 +14,7 @@ package authz import ( + "context" "crypto/rand" "crypto/rsa" "fmt" @@ -24,29 +25,32 @@ import ( "github.com/lestrrat-go/jwx/jwt" "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" "github.com/clusterlink-net/clusterlink/pkg/apis/clusterlink.net/v1alpha1" cpapi "github.com/clusterlink-net/clusterlink/pkg/controlplane/api" + "github.com/clusterlink-net/clusterlink/pkg/controlplane/authz/connectivitypdp" "github.com/clusterlink-net/clusterlink/pkg/controlplane/peer" - "github.com/clusterlink-net/clusterlink/pkg/policyengine" - "github.com/clusterlink-net/clusterlink/pkg/policyengine/connectivitypdp" "github.com/clusterlink-net/clusterlink/pkg/util/tls" ) const ( // the number of seconds a JWT access token is valid before it expires. jwtExpirySeconds = 5 + + ServiceNameLabel = "clusterlink/metadata.serviceName" + ServiceNamespaceLabel = "clusterlink/metadata.serviceNamespace" + GatewayNameLabel = "clusterlink/metadata.gatewayName" ) // egressAuthorizationRequest (from local dataplane) // represents a request for accessing an imported service. type egressAuthorizationRequest struct { // ImportName is the name of the requested imported service. - ImportName string - // ImportNamespace is the namespace of the requested imported service. - ImportNamespace string + ImportName types.NamespacedName // IP address of the client connecting to the service. IP string } @@ -66,9 +70,7 @@ type egressAuthorizationResponse struct { // ingressAuthorizationRequest (to remote peer controlplane) represents a request for accessing an exported service. type ingressAuthorizationRequest struct { // Service is the name of the requested exported service. - ServiceName string - // ServiceNamespace is the namespace of the requested exported service. - ServiceNamespace string + ServiceName types.NamespacedName } // ingressAuthorizationResponse (from remote peer controlplane) represents a response for an ingressAuthorizationRequest. @@ -89,7 +91,11 @@ type podInfo struct { // Manager manages the authorization dataplane connections. type Manager struct { - policyDecider policyengine.PolicyDecider + client client.Client + namespace string + + loadBalancer *LoadBalancer + connectivityPDP *connectivitypdp.PDP peerTLS *tls.ParsedCertData peerLock sync.RWMutex @@ -102,25 +108,38 @@ type Manager struct { jwkSignKey jwk.Key jwkVerifyKey jwk.Key + // callback for getting an import (for non-CRD mode) + getImportCallback func(name string, imp *v1alpha1.Import) error + // callback for getting an export (for non-CRD mode) + getExportCallback func(name string, imp *v1alpha1.Export) error + // callback for getting a peer (for non-CRD mode) + getPeerCallback func(name string, pr *v1alpha1.Peer) error + logger *logrus.Entry } +func (m *Manager) SetGetImportCallback(callback func(name string, imp *v1alpha1.Import) error) { + m.getImportCallback = callback +} + +func (m *Manager) SetGetExportCallback(callback func(name string, imp *v1alpha1.Export) error) { + m.getExportCallback = callback +} + +func (m *Manager) SetGetPeerCallback(callback func(name string, pr *v1alpha1.Peer) error) { + m.getPeerCallback = callback +} + // AddPeer defines a new route target for egress dataplane connections. func (m *Manager) AddPeer(pr *v1alpha1.Peer) { m.logger.Infof("Adding peer '%s'.", pr.Name) // initialize peer client - client := peer.NewClient(pr, m.peerTLS.ClientConfig(pr.Name)) + cl := peer.NewClient(pr, m.peerTLS.ClientConfig(pr.Name)) m.peerLock.Lock() - m.peerClient[pr.Name] = client + m.peerClient[pr.Name] = cl m.peerLock.Unlock() - - if meta.IsStatusConditionTrue(pr.Status.Conditions, v1alpha1.PeerReachable) { - m.policyDecider.AddPeer(pr.Name) - } else { - m.policyDecider.DeletePeer(pr.Name) - } } // DeletePeer removes the possibility for egress dataplane connections to be routed to a given peer. @@ -130,46 +149,16 @@ func (m *Manager) DeletePeer(name string) { m.peerLock.Lock() delete(m.peerClient, name) m.peerLock.Unlock() - - m.policyDecider.DeletePeer(name) -} - -// AddImport adds a listening socket for an imported remote service. -func (m *Manager) AddImport(imp *v1alpha1.Import) { - m.logger.Infof("Adding import '%s/%s'.", imp.Namespace, imp.Name) - m.policyDecider.AddImport(imp) -} - -// DeleteImport removes the listening socket of a previously imported service. -func (m *Manager) DeleteImport(name types.NamespacedName) error { - m.logger.Infof("Deleting import '%v'.", name) - m.policyDecider.DeleteImport(name) - return nil -} - -// AddExport defines a new route target for ingress dataplane connections. -func (m *Manager) AddExport(export *v1alpha1.Export) { - m.logger.Infof("Adding export '%s/%s'.", export.Namespace, export.Name) - - // TODO: m.policyDecider.AddExport() -} - -// DeleteExport removes the possibility for ingress dataplane connections to access a given service. -func (m *Manager) DeleteExport(name types.NamespacedName) { - m.logger.Infof("Deleting export '%v'.", name) - - // TODO: pass on namespace - m.policyDecider.DeleteExport(name.Name) } // AddAccessPolicy adds an access policy to allow/deny specific connections. func (m *Manager) AddAccessPolicy(policy *connectivitypdp.AccessPolicy) error { - return m.policyDecider.AddAccessPolicy(policy) + return m.connectivityPDP.AddOrUpdatePolicy(policy) } // DeleteAccessPolicy removes an access policy to allow/deny specific connections. func (m *Manager) DeleteAccessPolicy(name types.NamespacedName, privileged bool) error { - return m.policyDecider.DeleteAccessPolicy(name, privileged) + return m.connectivityPDP.DeletePolicy(name, privileged) } // deletePod deletes pod to ipToPod list. @@ -200,82 +189,121 @@ func (m *Manager) addPod(pod *v1.Pod) { } } -// getLabelsFromIP returns the labels associated with Pod with the specified IP address. -func (m *Manager) getLabelsFromIP(ip string) map[string]string { +// getPodInfoByIP returns the information about the Pod with the specified IP address. +func (m *Manager) getPodInfoByIP(ip string) *podInfo { m.podLock.RLock() defer m.podLock.RUnlock() if p, ipExsit := m.ipToPod[ip]; ipExsit { if pInfo, podExist := m.podList[p]; podExist { - return pInfo.labels + return &pInfo } } return nil } // authorizeEgress authorizes a request for accessing an imported service. -func (m *Manager) authorizeEgress(req *egressAuthorizationRequest) (*egressAuthorizationResponse, error) { +func (m *Manager) authorizeEgress(ctx context.Context, req *egressAuthorizationRequest) (*egressAuthorizationResponse, error) { m.logger.Infof("Received egress authorization request: %v.", req) - connReq := connectivitypdp.ConnectionRequest{ - DstSvcName: req.ImportName, - DstSvcNamespace: req.ImportNamespace, - Direction: connectivitypdp.Outgoing, - } - srcLabels := m.getLabelsFromIP(req.IP) - if src, ok := srcLabels["app"]; ok { // TODO: Add support for labels other than just the "app" key. - m.logger.Infof("Received egress authorization srcLabels[app]: %v.", srcLabels["app"]) - connReq.SrcWorkloadAttrs = connectivitypdp.WorkloadAttrs{policyengine.ServiceNameLabel: src} - } + srcAttributes := connectivitypdp.WorkloadAttrs{} + podInfo := m.getPodInfoByIP(req.IP) + if podInfo != nil { + srcAttributes[ServiceNamespaceLabel] = podInfo.namespace - authResp, err := m.policyDecider.AuthorizeAndRouteConnection(&connReq) - if err != nil { - return nil, err + if src, ok := podInfo.labels["app"]; ok { // TODO: Add support for labels other than just the "app" key. + m.logger.Infof("Received egress authorization srcLabels[app]: %v.", podInfo.labels["app"]) + srcAttributes[ServiceNameLabel] = src + } } - if authResp.Action != v1alpha1.AccessPolicyActionAllow { - return &egressAuthorizationResponse{Allowed: false}, nil + var imp v1alpha1.Import + if err := m.getImport(ctx, req.ImportName, &imp); err != nil { + return nil, fmt.Errorf("cannot get import %v: %w", req.ImportName, err) } - target := authResp.DstPeer + lbResult := NewLoadBalancingResult(&imp) + for { + if err := m.loadBalancer.Select(lbResult); err != nil { + return nil, fmt.Errorf("cannot select import source: %w", err) + } - m.peerLock.RLock() - client, ok := m.peerClient[target] - m.peerLock.RUnlock() + importSource := lbResult.Get() - if !ok { - return nil, fmt.Errorf("missing client for peer: %s", target) - } + var pr v1alpha1.Peer + if err := m.getPeer(ctx, importSource.Peer, &pr); err != nil { + return nil, fmt.Errorf("cannot get peer '%s': %w", importSource.Peer, err) + } - DstName := authResp.DstName - DstNamespace := authResp.DstNamespace - if DstName == "" { // TODO- remove when controlplane will support only CRD mode. - DstName = req.ImportName - } + if !meta.IsStatusConditionTrue(pr.Status.Conditions, v1alpha1.PeerReachable) { + if !lbResult.IsDelayed() { + lbResult.Delay() + continue + } + } - if DstNamespace == "" { // TODO- remove when controlplane will support only CRD mode. - DstNamespace = req.ImportNamespace - } + dstAttributes := connectivitypdp.WorkloadAttrs{ + ServiceNameLabel: imp.Name, + ServiceNamespaceLabel: imp.Namespace, + GatewayNameLabel: importSource.Peer, + } + decision, err := m.connectivityPDP.Decide(srcAttributes, dstAttributes, req.ImportName.Namespace) + if err != nil { + return nil, fmt.Errorf("error deciding on an egress connection: %w", err) + } - serverResp, err := client.Authorize(&cpapi.AuthorizationRequest{ - ServiceName: DstName, - ServiceNamespace: DstNamespace, - }) - if err != nil { - return nil, fmt.Errorf("unable to get access token from peer: %w", err) - } + if decision.Decision != connectivitypdp.DecisionAllow { + continue + } - resp := &egressAuthorizationResponse{ - ServiceExists: serverResp.ServiceExists, - Allowed: serverResp.Allowed, - } + m.peerLock.RLock() + cl, ok := m.peerClient[importSource.Peer] + m.peerLock.RUnlock() - if serverResp.Allowed { - resp.RemotePeerCluster = cpapi.RemotePeerClusterName(target) - resp.AccessToken = serverResp.AccessToken - } + if !ok { + return nil, fmt.Errorf("missing client for peer: %s", importSource.Peer) + } - return resp, nil + DstName := importSource.ExportName + DstNamespace := importSource.ExportNamespace + if DstName == "" { // TODO- remove when controlplane will support only CRD mode. + DstName = req.ImportName.Name + } + + if DstNamespace == "" { // TODO- remove when controlplane will support only CRD mode. + DstNamespace = req.ImportName.Namespace + } + + peerResp, err := cl.Authorize(&cpapi.AuthorizationRequest{ + ServiceName: DstName, + ServiceNamespace: DstNamespace, + }) + if err != nil { + m.logger.Infof("Unable to get access token from peer: %v", err) + continue + } + + if !peerResp.ServiceExists { + m.logger.Infof( + "Peer %s does not have an import source for %v", + importSource.Peer, req.ImportName) + continue + } + + if !peerResp.Allowed { + m.logger.Infof( + "Peer %s did not allow connection to import %v: %v", + importSource.Peer, req.ImportName, err) + continue + } + + return &egressAuthorizationResponse{ + ServiceExists: true, + Allowed: true, + RemotePeerCluster: cpapi.RemotePeerClusterName(importSource.Peer), + AccessToken: peerResp.AccessToken, + }, nil + } } // parseAuthorizationHeader verifies an access token for an ingress dataplane connection. @@ -305,25 +333,42 @@ func (m *Manager) parseAuthorizationHeader(token string) (string, error) { } // authorizeIngress authorizes a request for accessing an exported service. -func (m *Manager) authorizeIngress(req *ingressAuthorizationRequest, pr string) (*ingressAuthorizationResponse, error) { +func (m *Manager) authorizeIngress( + ctx context.Context, + req *ingressAuthorizationRequest, + pr string, +) (*ingressAuthorizationResponse, error) { m.logger.Infof("Received ingress authorization request: %v.", req) resp := &ingressAuthorizationResponse{} - // TODO: set this from autoResp below + // check that a corresponding export exists + exportName := types.NamespacedName{ + Namespace: req.ServiceName.Namespace, + Name: req.ServiceName.Name, + } + var export v1alpha1.Export + if err := m.getExport(ctx, exportName, &export); err != nil { + if errors.IsNotFound(err) || !meta.IsStatusConditionTrue(export.Status.Conditions, v1alpha1.ExportValid) { + return resp, nil + } + + return nil, fmt.Errorf("cannot get export %v: %w", exportName, err) + } + resp.ServiceExists = true - connReq := connectivitypdp.ConnectionRequest{ - DstSvcName: req.ServiceName, - DstSvcNamespace: req.ServiceNamespace, - Direction: connectivitypdp.Incoming, - SrcWorkloadAttrs: connectivitypdp.WorkloadAttrs{policyengine.GatewayNameLabel: pr}, + srcAttributes := connectivitypdp.WorkloadAttrs{GatewayNameLabel: pr} + dstAttributes := connectivitypdp.WorkloadAttrs{ + ServiceNameLabel: req.ServiceName.Name, + ServiceNamespaceLabel: req.ServiceName.Namespace, } - authResp, err := m.policyDecider.AuthorizeAndRouteConnection(&connReq) + decision, err := m.connectivityPDP.Decide(srcAttributes, dstAttributes, req.ServiceName.Namespace) if err != nil { - return nil, err + return nil, fmt.Errorf("error deciding on an ingress connection: %w", err) } - if authResp.Action != v1alpha1.AccessPolicyActionAllow { + + if decision.Decision != connectivitypdp.DecisionAllow { resp.Allowed = false return resp, nil } @@ -333,8 +378,8 @@ func (m *Manager) authorizeIngress(req *ingressAuthorizationRequest, pr string) // TODO: include client name as a claim token, err := jwt.NewBuilder(). Expiration(time.Now().Add(time.Second*jwtExpirySeconds)). - Claim(cpapi.ExportNameJWTClaim, req.ServiceName). - Claim(cpapi.ExportNamespaceJWTClaim, req.ServiceNamespace). + Claim(cpapi.ExportNameJWTClaim, req.ServiceName.Name). + Claim(cpapi.ExportNamespaceJWTClaim, req.ServiceName.Namespace). Build() if err != nil { return nil, fmt.Errorf("unable to generate access token: %w", err) @@ -350,8 +395,36 @@ func (m *Manager) authorizeIngress(req *ingressAuthorizationRequest, pr string) return resp, nil } +func (m *Manager) getImport(ctx context.Context, name types.NamespacedName, imp *v1alpha1.Import) error { + if m.getImportCallback != nil { + return m.getImportCallback(name.Name, imp) + } + + return m.client.Get(ctx, name, imp) +} + +func (m *Manager) getExport(ctx context.Context, name types.NamespacedName, export *v1alpha1.Export) error { + if m.getExportCallback != nil { + return m.getExportCallback(name.Name, export) + } + + return m.client.Get(ctx, name, export) +} + +func (m *Manager) getPeer(ctx context.Context, name string, pr *v1alpha1.Peer) error { + if m.getPeerCallback != nil { + return m.getPeerCallback(name, pr) + } + + peerName := types.NamespacedName{ + Name: name, + Namespace: m.namespace, + } + return m.client.Get(ctx, peerName, pr) +} + // NewManager returns a new authorization manager. -func NewManager(peerTLS *tls.ParsedCertData) (*Manager, error) { +func NewManager(peerTLS *tls.ParsedCertData, cl client.Client, namespace string) (*Manager, error) { // generate RSA key-pair for JWT signing // TODO: instead of generating, read from k8s secret rsaKey, err := rsa.GenerateKey(rand.Reader, 2048) @@ -370,13 +443,16 @@ func NewManager(peerTLS *tls.ParsedCertData) (*Manager, error) { } return &Manager{ - policyDecider: policyengine.NewPolicyHandler(), - peerTLS: peerTLS, - peerClient: make(map[string]*peer.Client), - jwkSignKey: jwkSignKey, - jwkVerifyKey: jwkVerifyKey, - ipToPod: make(map[string]types.NamespacedName), - podList: make(map[types.NamespacedName]podInfo), - logger: logrus.WithField("component", "controlplane.authz.manager"), + client: cl, + namespace: namespace, + connectivityPDP: connectivitypdp.NewPDP(), + loadBalancer: NewLoadBalancer(), + peerTLS: peerTLS, + peerClient: make(map[string]*peer.Client), + jwkSignKey: jwkSignKey, + jwkVerifyKey: jwkVerifyKey, + ipToPod: make(map[string]types.NamespacedName), + podList: make(map[types.NamespacedName]podInfo), + logger: logrus.WithField("component", "controlplane.authz.manager"), }, nil } diff --git a/pkg/controlplane/authz/server.go b/pkg/controlplane/authz/server.go index ee7eb78f..4891d62e 100644 --- a/pkg/controlplane/authz/server.go +++ b/pkg/controlplane/authz/server.go @@ -20,6 +20,7 @@ import ( "strings" "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/types" "github.com/clusterlink-net/clusterlink/pkg/controlplane/api" utilhttp "github.com/clusterlink-net/clusterlink/pkg/util/http" @@ -71,10 +72,12 @@ func (s *server) DataplaneEgressAuthorize(w http.ResponseWriter, r *http.Request return } - resp, err := s.manager.authorizeEgress(&egressAuthorizationRequest{ - ImportName: importName, - ImportNamespace: importNamespace, - IP: ip, + resp, err := s.manager.authorizeEgress(r.Context(), &egressAuthorizationRequest{ + ImportName: types.NamespacedName{ + Namespace: importNamespace, + Name: importName, + }, + IP: ip, }) switch { @@ -140,9 +143,12 @@ func (s *server) PeerAuthorize(w http.ResponseWriter, r *http.Request) { peerName := r.TLS.PeerCertificates[0].DNSNames[0] resp, err := s.manager.authorizeIngress( + r.Context(), &ingressAuthorizationRequest{ - ServiceName: req.ServiceName, - ServiceNamespace: req.ServiceNamespace, + ServiceName: types.NamespacedName{ + Namespace: req.ServiceNamespace, + Name: req.ServiceName, + }, }, peerName) switch { diff --git a/pkg/controlplane/control/controllers.go b/pkg/controlplane/control/controllers.go index dc1ae37c..c295ea2e 100644 --- a/pkg/controlplane/control/controllers.go +++ b/pkg/controlplane/control/controllers.go @@ -62,7 +62,7 @@ func CreateControllers(mgr *Manager, controllerManager ctrl.Manager, crdMode boo Name: "control.export", Object: &v1alpha1.Export{}, AddHandler: func(ctx context.Context, object any) error { - return mgr.addExport(ctx, object.(*v1alpha1.Export)) + return mgr.AddExport(ctx, object.(*v1alpha1.Export)) }, DeleteHandler: func(ctx context.Context, name types.NamespacedName) error { return nil diff --git a/pkg/controlplane/control/manager.go b/pkg/controlplane/control/manager.go index 859cb194..cee8f4fb 100644 --- a/pkg/controlplane/control/manager.go +++ b/pkg/controlplane/control/manager.go @@ -147,6 +147,8 @@ type Manager struct { getMergeImportListCallback func() *v1alpha1.ImportList // callback for getting an import (for non-CRD mode) getImportCallback func(name string, imp *v1alpha1.Import) error + // callback for setting the status of an export (for non-CRD mode) + exportStatusCallback func(*v1alpha1.Export) logger *logrus.Entry } @@ -306,8 +308,12 @@ func (m *Manager) DeleteImport(ctx context.Context, name types.NamespacedName) e return errors.Join(errs...) } -// addExport defines a new route target for ingress dataplane connections. -func (m *Manager) addExport(ctx context.Context, export *v1alpha1.Export) (err error) { +func (m *Manager) SetExportStatusCallback(callback func(*v1alpha1.Export)) { + m.exportStatusCallback = callback +} + +// AddExport defines a new route target for ingress dataplane connections. +func (m *Manager) AddExport(ctx context.Context, export *v1alpha1.Export) (err error) { m.logger.Infof("Adding export '%s/%s'.", export.Namespace, export.Name) defer func() { @@ -330,15 +336,20 @@ func (m *Manager) addExport(ctx context.Context, export *v1alpha1.Export) (err e m.logger.Infof( "Updating export '%s/%s' status: %v.", export.Namespace, export.Name, *conditions) - statusError := m.client.Status().Update(ctx, export) - if statusError != nil { - if err == nil { - err = statusError + if m.exportStatusCallback != nil { + m.exportStatusCallback(export) + } else { + // CRD-mode + statusError := m.client.Status().Update(ctx, export) + if statusError != nil { + if err == nil { + err = statusError + return + } + + m.logger.Warnf("Error updating export status: %v.", statusError) return } - - m.logger.Warnf("Error updating export status: %v.", statusError) - return } } @@ -467,7 +478,7 @@ func (m *Manager) checkExportService(ctx context.Context, name types.NamespacedN return nil } - return m.addExport(ctx, &export) + return m.AddExport(ctx, &export) } func (m *Manager) checkImportService(ctx context.Context, name types.NamespacedName) error { diff --git a/pkg/controlplane/control/peer.go b/pkg/controlplane/control/peer.go index 69f5a066..bb02c87f 100644 --- a/pkg/controlplane/control/peer.go +++ b/pkg/controlplane/control/peer.go @@ -54,9 +54,9 @@ type peerMonitor struct { // peerManager manages peers status. type peerManager struct { - client client.Client - peerTLS *tls.ParsedCertData - statusCallback func(*v1alpha1.Peer) + client client.Client + peerTLS *tls.ParsedCertData + peerStatusCallback func(*v1alpha1.Peer) lock sync.Mutex monitors map[string]*peerMonitor @@ -190,8 +190,8 @@ func (m *peerManager) Name() string { return "peerManager" } -func (m *peerManager) SetStatusCallback(callback func(*v1alpha1.Peer)) { - m.statusCallback = callback +func (m *peerManager) SetPeerStatusCallback(callback func(*v1alpha1.Peer)) { + m.peerStatusCallback = callback } // Start the peer manager. @@ -216,8 +216,8 @@ func (m *peerManager) Start() error { currPeer := monitor.Peer() - if m.statusCallback != nil { - m.statusCallback(&currPeer) + if m.peerStatusCallback != nil { + m.peerStatusCallback(&currPeer) } else { // CRD-mode err := m.client.Status().Update(context.Background(), &currPeer) diff --git a/pkg/controlplane/rest/accesspolicy.go b/pkg/controlplane/rest/accesspolicy.go index 5a4a39d6..a301e2a2 100644 --- a/pkg/controlplane/rest/accesspolicy.go +++ b/pkg/controlplane/rest/accesspolicy.go @@ -21,8 +21,8 @@ import ( "k8s.io/apimachinery/pkg/types" "github.com/clusterlink-net/clusterlink/pkg/apis/clusterlink.net/v1alpha1" + "github.com/clusterlink-net/clusterlink/pkg/controlplane/authz/connectivitypdp" "github.com/clusterlink-net/clusterlink/pkg/controlplane/store" - "github.com/clusterlink-net/clusterlink/pkg/policyengine/connectivitypdp" ) type accessPolicyHandler struct { diff --git a/pkg/controlplane/rest/export.go b/pkg/controlplane/rest/export.go index e24616a3..6c29dc75 100644 --- a/pkg/controlplane/rest/export.go +++ b/pkg/controlplane/rest/export.go @@ -14,10 +14,13 @@ package rest import ( + "context" "encoding/json" "fmt" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "github.com/clusterlink-net/clusterlink/pkg/apis/clusterlink.net/v1alpha1" @@ -38,6 +41,7 @@ func toK8SExport(export *store.Export, namespace string) *v1alpha1.Export { Host: export.ExportSpec.Host, Port: export.ExportSpec.Port, }, + Status: export.Status, } } @@ -50,7 +54,8 @@ func exportToAPI(export *store.Export) *v1alpha1.Export { ObjectMeta: metav1.ObjectMeta{ Name: export.Name, }, - Spec: export.ExportSpec, + Spec: export.ExportSpec, + Status: export.Status, } } @@ -65,8 +70,10 @@ func (m *Manager) CreateExport(export *store.Export) error { } k8sExport := toK8SExport(export, m.namespace) - m.authzManager.AddExport(k8sExport) - return m.xdsManager.AddExport(k8sExport) + if err := m.xdsManager.AddExport(k8sExport); err != nil { + return err + } + return m.controlManager.AddExport(context.Background(), k8sExport) } // UpdateExport updates a new route target for ingress dataplane connections. @@ -81,8 +88,23 @@ func (m *Manager) UpdateExport(export *store.Export) error { } k8sExport := toK8SExport(export, m.namespace) - m.authzManager.AddExport(k8sExport) - return m.xdsManager.AddExport(k8sExport) + if err := m.xdsManager.AddExport(k8sExport); err != nil { + return err + } + return m.controlManager.AddExport(context.Background(), k8sExport) +} + +// UpdateExportStatus updates the status of an existing export. +func (m *Manager) UpdateExportStatus(name string, status *v1alpha1.ExportStatus) { + m.logger.Infof("Updating status of export '%s'.", name) + + err := m.exports.Update(name, func(old *store.Export) *store.Export { + old.Status = *status + return old + }) + if err != nil { + m.logger.Errorf("Error updating status of export '%s': %v", name, err) + } } // GetExport returns an existing export. @@ -113,8 +135,6 @@ func (m *Manager) DeleteExport(name string) (*store.Export, error) { return export, err } - m.authzManager.DeleteExport(namespacedName) - return export, nil } @@ -124,6 +144,16 @@ func (m *Manager) GetAllExports() []*store.Export { return m.exports.GetAll() } +func (m *Manager) GetK8sExport(name string, export *v1alpha1.Export) error { + storeExport := m.exports.Get(name) + if storeExport == nil { + return errors.NewNotFound(schema.GroupResource{}, name) + } + + *export = *toK8SExport(storeExport, m.namespace) + return nil +} + // Decode an export. func (h *exportHandler) Decode(data []byte) (any, error) { var export v1alpha1.Export diff --git a/pkg/controlplane/rest/import.go b/pkg/controlplane/rest/import.go index 4f83d8ad..5daf1ff9 100644 --- a/pkg/controlplane/rest/import.go +++ b/pkg/controlplane/rest/import.go @@ -87,8 +87,6 @@ func (m *Manager) CreateImport(imp *store.Import) error { return err } - m.authzManager.AddImport(k8sImp) - return nil } @@ -123,8 +121,6 @@ func (m *Manager) UpdateImport(imp *store.Import) error { return err } - m.authzManager.AddImport(k8sImp) - return nil } @@ -160,11 +156,6 @@ func (m *Manager) DeleteImport(name string) (*store.Import, error) { return nil, err } - err = m.authzManager.DeleteImport(namespacedName) - if err != nil { - return nil, err - } - return imp, nil } diff --git a/pkg/controlplane/rest/peer.go b/pkg/controlplane/rest/peer.go index 6180b23f..9a8a7166 100644 --- a/pkg/controlplane/rest/peer.go +++ b/pkg/controlplane/rest/peer.go @@ -17,7 +17,9 @@ import ( "encoding/json" "fmt" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" "github.com/clusterlink-net/clusterlink/pkg/apis/clusterlink.net/v1alpha1" "github.com/clusterlink-net/clusterlink/pkg/controlplane/store" @@ -33,6 +35,7 @@ func toK8SPeer(peer *store.Peer) *v1alpha1.Peer { Spec: v1alpha1.PeerSpec{ Gateways: make([]v1alpha1.Endpoint, len(peer.Gateways)), }, + Status: peer.Status, } for i, gw := range peer.PeerSpec.Gateways { @@ -53,7 +56,7 @@ func peerToAPI(peer *store.Peer) *v1alpha1.Peer { Name: peer.Name, }, Spec: peer.PeerSpec, - Status: v1alpha1.PeerStatus{}, + Status: peer.Status, } } @@ -90,6 +93,19 @@ func (m *Manager) UpdatePeer(peer *store.Peer) error { return m.xdsManager.AddPeer(k8sPeer) } +// UpdatePeerStatus updates the status of an existing peer. +func (m *Manager) UpdatePeerStatus(name string, status *v1alpha1.PeerStatus) { + m.logger.Infof("Updating status of peer '%s'.", name) + + err := m.peers.Update(name, func(old *store.Peer) *store.Peer { + old.Status = *status + return old + }) + if err != nil { + m.logger.Errorf("Error updating status of peer '%s': %v", name, err) + } +} + // GetPeer returns an existing peer. func (m *Manager) GetPeer(name string) *store.Peer { m.logger.Infof("Getting peer '%s'.", name) @@ -126,6 +142,16 @@ func (m *Manager) GetAllPeers() []*store.Peer { return m.peers.GetAll() } +func (m *Manager) GetK8sPeer(name string, peer *v1alpha1.Peer) error { + storePeer := m.peers.Get(name) + if storePeer == nil { + return errors.NewNotFound(schema.GroupResource{}, name) + } + + *peer = *toK8SPeer(storePeer) + return nil +} + // Decode a peer. func (h *peerHandler) Decode(data []byte) (any, error) { var peer v1alpha1.Peer diff --git a/pkg/controlplane/store/types.go b/pkg/controlplane/store/types.go index 4ce08341..d9babb9a 100644 --- a/pkg/controlplane/store/types.go +++ b/pkg/controlplane/store/types.go @@ -22,13 +22,11 @@ const ( exportStoreName = "export" importStoreName = "import" accessPolicyStoreName = "accessPolicy" - lbPolicyStoreName = "lbPolicy" exportStructVersion = 1 importStructVersion = 1 peerStructVersion = 1 accessPolicyStructVersion = 1 - lbPolicyStructVersion = 1 ) // Peer represents a remote peer. @@ -36,6 +34,8 @@ type Peer struct { v1alpha1.PeerSpec // Name of the peer. Name string + // Status of the peer. + Status v1alpha1.PeerStatus // Version of the struct when object was created. Version uint32 } @@ -45,6 +45,7 @@ func NewPeer(peer *v1alpha1.Peer) *Peer { return &Peer{ PeerSpec: peer.Spec, Name: peer.Name, + Status: peer.Status, Version: peerStructVersion, } } @@ -54,6 +55,8 @@ type Export struct { v1alpha1.ExportSpec // Name of the export. Name string + // Status of the export. + Status v1alpha1.ExportStatus // Version of the struct when object was created. Version uint32 } @@ -63,6 +66,7 @@ func NewExport(export *v1alpha1.Export) *Export { return &Export{ ExportSpec: export.Spec, Name: export.Name, + Status: export.Status, Version: exportStructVersion, } } diff --git a/pkg/policyengine/PolicyDispatcher.go b/pkg/policyengine/PolicyDispatcher.go deleted file mode 100644 index 5f7e4a8a..00000000 --- a/pkg/policyengine/PolicyDispatcher.go +++ /dev/null @@ -1,207 +0,0 @@ -// Copyright 2023 The ClusterLink Authors. -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// policyengine package handles policies that govern ClusterLink behavior -package policyengine - -import ( - "github.com/sirupsen/logrus" - "k8s.io/apimachinery/pkg/types" - - crds "github.com/clusterlink-net/clusterlink/pkg/apis/clusterlink.net/v1alpha1" - "github.com/clusterlink-net/clusterlink/pkg/policyengine/connectivitypdp" -) - -const ( - ServiceNameLabel = "clusterlink/metadata.serviceName" - GatewayNameLabel = "clusterlink/metadata.gatewayName" -) - -var plog = logrus.WithField("component", "PolicyEngine") - -// PolicyDecider is an interface for entities that make policy-based decisions on various ClusterLink operations. -type PolicyDecider interface { - AddAccessPolicy(policy *connectivitypdp.AccessPolicy) error - DeleteAccessPolicy(name types.NamespacedName, privileged bool) error - - AuthorizeAndRouteConnection(connReq *connectivitypdp.ConnectionRequest) (connectivitypdp.ConnectionResponse, error) - - AddPeer(name string) - DeletePeer(name string) - - AddImport(imp *crds.Import) - DeleteImport(name types.NamespacedName) - - AddExport(exp *crds.Export) ([]string, error) // Returns a list of peers to which export is allowed - DeleteExport(name string) -} - -// PolicyHandler implements PolicyDecider using Connectivity Policies and Load-Balancing Policies. -type PolicyHandler struct { - loadBalancer *LoadBalancer - connectivityPDP *connectivitypdp.PDP - enabledPeers map[string]bool -} - -func NewPolicyHandler() PolicyDecider { - return &PolicyHandler{ - loadBalancer: NewLoadBalancer(), - connectivityPDP: connectivitypdp.NewPDP(), - enabledPeers: map[string]bool{}, - } -} - -func getServiceAttrs(serviceName, peer string) connectivitypdp.WorkloadAttrs { - ret := connectivitypdp.WorkloadAttrs{ServiceNameLabel: serviceName} - if len(peer) > 0 { - ret[GatewayNameLabel] = peer - } - return ret -} - -func getServiceAttrsForMultipleDsts(serviceName string, dsts []crds.ImportSource) []connectivitypdp.WorkloadAttrs { - res := []connectivitypdp.WorkloadAttrs{} - for _, dst := range dsts { - res = append(res, getServiceAttrs(serviceName, dst.Peer)) - } - return res -} - -func (ph *PolicyHandler) filterOutDisabledPeers(dsts []crds.ImportSource) []crds.ImportSource { - res := []crds.ImportSource{} - for _, dst := range dsts { - if ph.enabledPeers[dst.Peer] { - res = append(res, dst) - } - } - return res -} - -func (ph *PolicyHandler) decideIncomingConnection( - req *connectivitypdp.ConnectionRequest, -) (connectivitypdp.ConnectionResponse, error) { - dest := getServiceAttrs(req.DstSvcName, "") - decisions, err := ph.connectivityPDP.Decide(req.SrcWorkloadAttrs, []connectivitypdp.WorkloadAttrs{dest}, - req.DstSvcNamespace) - if err != nil { - plog.Errorf("error deciding on a connection: %v", err) - return connectivitypdp.ConnectionResponse{Action: crds.AccessPolicyActionDeny}, err - } - if decisions[0].Decision == connectivitypdp.DecisionAllow { - return connectivitypdp.ConnectionResponse{Action: crds.AccessPolicyActionAllow}, nil - } - return connectivitypdp.ConnectionResponse{Action: crds.AccessPolicyActionDeny}, nil -} - -func (ph *PolicyHandler) decideOutgoingConnection( - req *connectivitypdp.ConnectionRequest, -) (connectivitypdp.ConnectionResponse, error) { - // Get a list of possible destinations for the service (a.k.a. service sources) - dstSvcNsName := types.NamespacedName{Namespace: req.DstSvcNamespace, Name: req.DstSvcName} - svcSourceList, err := ph.loadBalancer.GetSvcSources(dstSvcNsName) - if err != nil { - plog.Errorf("error getting sources for service %s: %v", req.DstSvcName, err) - // this can be caused by a user typo - so only log this error - return connectivitypdp.ConnectionResponse{Action: crds.AccessPolicyActionDeny}, nil - } - - svcSourceList = ph.filterOutDisabledPeers(svcSourceList) - - dsts := getServiceAttrsForMultipleDsts(req.DstSvcName, svcSourceList) - decisions, err := ph.connectivityPDP.Decide(req.SrcWorkloadAttrs, dsts, req.DstSvcNamespace) - if err != nil { - plog.Errorf("error deciding on a connection: %v", err) - return connectivitypdp.ConnectionResponse{Action: crds.AccessPolicyActionDeny}, err - } - - allowedSvcSources := []crds.ImportSource{} - for idx, decision := range decisions { - if decision.Decision == connectivitypdp.DecisionAllow { - allowedSvcSources = append(allowedSvcSources, svcSourceList[idx]) - } - } - - if len(allowedSvcSources) == 0 { - plog.Infof("access policies deny connections to service %s for all its sources", req.DstSvcName) - return connectivitypdp.ConnectionResponse{Action: crds.AccessPolicyActionDeny}, nil - } - - // Perform load-balancing using the filtered peer list - tgt, err := ph.loadBalancer.LookupWith(dstSvcNsName, allowedSvcSources) - if err != nil { - return connectivitypdp.ConnectionResponse{Action: crds.AccessPolicyActionDeny}, err - } - return connectivitypdp.ConnectionResponse{ - Action: crds.AccessPolicyActionAllow, - DstPeer: tgt.Peer, - DstName: tgt.ExportName, - DstNamespace: tgt.ExportNamespace, - }, nil -} - -func (ph *PolicyHandler) AuthorizeAndRouteConnection(req *connectivitypdp.ConnectionRequest) ( - connectivitypdp.ConnectionResponse, - error, -) { - plog.Infof("New connection request : %+v", req) - - var resp connectivitypdp.ConnectionResponse - var err error - if req.Direction == connectivitypdp.Incoming { - resp, err = ph.decideIncomingConnection(req) - } else if req.Direction == connectivitypdp.Outgoing { - resp, err = ph.decideOutgoingConnection(req) - } - - plog.Infof("Response : %+v", resp) - return resp, err -} - -func (ph *PolicyHandler) AddPeer(name string) { - ph.enabledPeers[name] = true - plog.Infof("Added Peer %s", name) -} - -func (ph *PolicyHandler) DeletePeer(name string) { - delete(ph.enabledPeers, name) - plog.Infof("Removed Peer %s", name) -} - -func (ph *PolicyHandler) AddImport(imp *crds.Import) { - ph.loadBalancer.AddImport(imp) -} - -func (ph *PolicyHandler) DeleteImport(name types.NamespacedName) { - ph.loadBalancer.DeleteImport(name) -} - -func (ph *PolicyHandler) AddExport(_ *crds.Export) ([]string, error) { - retPeers := []string{} - for peer, enabled := range ph.enabledPeers { - if enabled { - retPeers = append(retPeers, peer) - } - } - return retPeers, nil -} - -func (ph *PolicyHandler) DeleteExport(_ string) { -} - -func (ph *PolicyHandler) AddAccessPolicy(policy *connectivitypdp.AccessPolicy) error { - return ph.connectivityPDP.AddOrUpdatePolicy(policy) -} - -func (ph *PolicyHandler) DeleteAccessPolicy(name types.NamespacedName, privileged bool) error { - return ph.connectivityPDP.DeletePolicy(name, privileged) -} diff --git a/pkg/policyengine/PolicyDispatcher_test.go b/pkg/policyengine/PolicyDispatcher_test.go deleted file mode 100644 index 5417c604..00000000 --- a/pkg/policyengine/PolicyDispatcher_test.go +++ /dev/null @@ -1,280 +0,0 @@ -// Copyright 2023 The ClusterLink Authors. -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package policyengine_test - -import ( - "testing" - - "github.com/stretchr/testify/require" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - - crds "github.com/clusterlink-net/clusterlink/pkg/apis/clusterlink.net/v1alpha1" - "github.com/clusterlink-net/clusterlink/pkg/policyengine" - "github.com/clusterlink-net/clusterlink/pkg/policyengine/connectivitypdp" -) - -const ( - svcName = "svc" - badSvcName = "sv" - defaultNS = "default" -) - -var ( - selectAllSelector = metav1.LabelSelector{} - simpleSelector = metav1.LabelSelector{ - MatchLabels: connectivitypdp.WorkloadAttrs{policyengine.ServiceNameLabel: svcName}, - } - simpleWorkloadSet = crds.WorkloadSetOrSelector{ - WorkloadSelector: &simpleSelector, - } - policy = crds.AccessPolicy{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-policy", - Namespace: defaultNS, - }, - Spec: crds.AccessPolicySpec{ - Action: crds.AccessPolicyActionAllow, - From: []crds.WorkloadSetOrSelector{simpleWorkloadSet}, - To: []crds.WorkloadSetOrSelector{simpleWorkloadSet}, - }, - } - - pdpPolicy = connectivitypdp.PolicyFromCR(&policy) -) - -func TestAddAndDeleteConnectivityPolicy(t *testing.T) { - ph := policyengine.NewPolicyHandler() - err := ph.AddAccessPolicy(pdpPolicy) - require.Nil(t, err) - - polName := types.NamespacedName{Namespace: policy.Namespace, Name: policy.Name} - err = ph.DeleteAccessPolicy(polName, false) - require.Nil(t, err) - - // deleting the same policy again should result in a not-found error - err = ph.DeleteAccessPolicy(polName, false) - require.NotNil(t, err) -} - -func TestAddBadPolicy(t *testing.T) { - ph := policyengine.NewPolicyHandler() - badPolicy := crds.AccessPolicy{ - ObjectMeta: metav1.ObjectMeta{ - Name: "bad-policy", - }, - } - err := ph.AddAccessPolicy(connectivitypdp.PolicyFromCR(&badPolicy)) - require.NotNil(t, err) -} - -func TestIncomingConnectionRequests(t *testing.T) { - ph := policyengine.NewPolicyHandler() - policy2 := policy - policy2.Spec.To = []crds.WorkloadSetOrSelector{{WorkloadSelector: &selectAllSelector}} - err := ph.AddAccessPolicy(connectivitypdp.PolicyFromCR(&policy2)) - require.Nil(t, err) - - srcAttrs := connectivitypdp.WorkloadAttrs{policyengine.ServiceNameLabel: svcName} - connReq := connectivitypdp.ConnectionRequest{ - SrcWorkloadAttrs: srcAttrs, - Direction: connectivitypdp.Incoming, - DstSvcNamespace: defaultNS, - } - connReqResp, err := ph.AuthorizeAndRouteConnection(&connReq) - require.Equal(t, crds.AccessPolicyActionAllow, connReqResp.Action) - require.Nil(t, err) - - srcAttrs[policyengine.ServiceNameLabel] = badSvcName - connReq = connectivitypdp.ConnectionRequest{SrcWorkloadAttrs: srcAttrs, Direction: connectivitypdp.Incoming} - connReqResp, err = ph.AuthorizeAndRouteConnection(&connReq) - require.Equal(t, crds.AccessPolicyActionDeny, connReqResp.Action) - require.Nil(t, err) -} - -func TestOutgoingConnectionRequests(t *testing.T) { - ph := policyengine.NewPolicyHandler() - simpleSelector2 := metav1.LabelSelector{MatchLabels: connectivitypdp.WorkloadAttrs{ - policyengine.ServiceNameLabel: svcName, - policyengine.GatewayNameLabel: peer2, - }} - simpleWorkloadSet2 := crds.WorkloadSetOrSelector{WorkloadSelector: &simpleSelector2} - policy2 := policy - policy2.Spec.To = []crds.WorkloadSetOrSelector{simpleWorkloadSet2} - err := ph.AddAccessPolicy(connectivitypdp.PolicyFromCR(&policy2)) - require.Nil(t, err) - - addRemoteSvc(t, svcName, []string{peer1, peer2}, "", ph) - - // Should choose between peer1 and peer2, but only peer2 is allowed by the single access policy - srcAttrs := connectivitypdp.WorkloadAttrs{policyengine.ServiceNameLabel: svcName} - badSrcAttrs := connectivitypdp.WorkloadAttrs{policyengine.ServiceNameLabel: badSvcName} - requestAttr := connectivitypdp.ConnectionRequest{ - SrcWorkloadAttrs: srcAttrs, - DstSvcName: svcName, - DstSvcNamespace: defaultNS, - Direction: connectivitypdp.Outgoing, - } - connReqResp, err := ph.AuthorizeAndRouteConnection(&requestAttr) - require.Equal(t, crds.AccessPolicyActionAllow, connReqResp.Action) - require.Equal(t, peer2, connReqResp.DstPeer) - require.Nil(t, err) - - // Src service does not match the spec of the single access policy - requestAttr = connectivitypdp.ConnectionRequest{ - SrcWorkloadAttrs: badSrcAttrs, - DstSvcName: svcName, - Direction: connectivitypdp.Outgoing, - } - connReqResp, err = ph.AuthorizeAndRouteConnection(&requestAttr) - require.Equal(t, crds.AccessPolicyActionDeny, connReqResp.Action) - require.Nil(t, err) - - // Dst service does not match the spec of the single access policy - requestAttr = connectivitypdp.ConnectionRequest{ - SrcWorkloadAttrs: srcAttrs, - DstSvcName: badSvcName, - Direction: connectivitypdp.Outgoing, - } - connReqResp, err = ph.AuthorizeAndRouteConnection(&requestAttr) - require.Equal(t, crds.AccessPolicyActionDeny, connReqResp.Action) - require.Nil(t, err) - - // peer2 is removed as a remote for the requested service, - // so now the single allow policy does not allow the remaining peers - ph.DeletePeer(peer2) - requestAttr = connectivitypdp.ConnectionRequest{ - SrcWorkloadAttrs: srcAttrs, - DstSvcName: svcName, - Direction: connectivitypdp.Outgoing, - } - connReqResp, err = ph.AuthorizeAndRouteConnection(&requestAttr) - require.Equal(t, crds.AccessPolicyActionDeny, connReqResp.Action) - require.Nil(t, err) -} - -func TestLoadBalancer(t *testing.T) { - ph := policyengine.NewPolicyHandler() - addRemoteSvc(t, svcName, []string{peer1, peer2}, "", ph) - require.Nil(t, ph.AddAccessPolicy(pdpPolicy)) - - addRemoteSvc(t, svcName, []string{peer1, peer2}, policyengine.Static, ph) - - srcAttrs := connectivitypdp.WorkloadAttrs{policyengine.ServiceNameLabel: svcName} - requestAttr := connectivitypdp.ConnectionRequest{ - SrcWorkloadAttrs: srcAttrs, - DstSvcName: svcName, - DstSvcNamespace: defaultNS, - Direction: connectivitypdp.Outgoing, - } - connReqResp, err := ph.AuthorizeAndRouteConnection(&requestAttr) - require.Nil(t, err) - require.Equal(t, crds.AccessPolicyActionAllow, connReqResp.Action) - require.Equal(t, peer1, connReqResp.DstPeer) // LB policy requires this request to be served by peer1 - - addRemoteSvc(t, svcName, []string{peer1, peer2}, "", ph) - // LB policy is deleted - the random default policy now takes effect - connReqResp, err = ph.AuthorizeAndRouteConnection(&requestAttr) - require.Nil(t, err) - require.Equal(t, crds.AccessPolicyActionAllow, connReqResp.Action) - require.Contains(t, []string{peer1, peer2}, connReqResp.DstPeer) - - ph.DeletePeer(peer1) // peer1 is deleted, so all requests should go to peer2 - connReqResp, err = ph.AuthorizeAndRouteConnection(&requestAttr) - require.Nil(t, err) - require.Equal(t, crds.AccessPolicyActionAllow, connReqResp.Action) - require.Equal(t, peer2, connReqResp.DstPeer) - - ph.DeletePeer(peer1) // deleting peer1 again should make no change - connReqResp, err = ph.AuthorizeAndRouteConnection(&requestAttr) - require.Nil(t, err) - require.Equal(t, crds.AccessPolicyActionAllow, connReqResp.Action) - require.Equal(t, peer2, connReqResp.DstPeer) - - ph.DeletePeer(peer2) // deleting peer2 should result in an deny, as there are no available peers left - connReqResp, err = ph.AuthorizeAndRouteConnection(&requestAttr) - require.Nil(t, err) - require.Equal(t, crds.AccessPolicyActionDeny, connReqResp.Action) -} - -func TestDisableEnablePeers(t *testing.T) { - ph := policyengine.NewPolicyHandler() - addRemoteSvc(t, svcName, []string{peer1, peer2}, "", ph) - require.Nil(t, ph.AddAccessPolicy(pdpPolicy)) - - addRemoteSvc(t, svcName, []string{peer1, peer2}, policyengine.Static, ph) - - srcAttrs := connectivitypdp.WorkloadAttrs{policyengine.ServiceNameLabel: svcName} - requestAttr := connectivitypdp.ConnectionRequest{ - SrcWorkloadAttrs: srcAttrs, - DstSvcName: svcName, - DstSvcNamespace: defaultNS, - Direction: connectivitypdp.Outgoing, - } - connReqResp, err := ph.AuthorizeAndRouteConnection(&requestAttr) - require.Nil(t, err) - require.Equal(t, crds.AccessPolicyActionAllow, connReqResp.Action) - require.Equal(t, peer1, connReqResp.DstPeer) // LB policy defaults this request to be served by peer1 - - ph.DeletePeer(peer1) - - connReqResp, err = ph.AuthorizeAndRouteConnection(&requestAttr) - require.Nil(t, err) - require.Equal(t, crds.AccessPolicyActionAllow, connReqResp.Action) - require.Equal(t, peer2, connReqResp.DstPeer) // peer1 is now disabled, so peer2 must be used - - ph.DeletePeer(peer2) - - connReqResp, err = ph.AuthorizeAndRouteConnection(&requestAttr) - require.Nil(t, err) - require.Equal(t, crds.AccessPolicyActionDeny, connReqResp.Action) // no enabled peers - a Deny is returned - require.Equal(t, "", connReqResp.DstPeer) - - ph.AddPeer(peer1) - ph.AddPeer(peer2) - - connReqResp, err = ph.AuthorizeAndRouteConnection(&requestAttr) - require.Nil(t, err) - require.Equal(t, crds.AccessPolicyActionAllow, connReqResp.Action) - require.Equal(t, peer1, connReqResp.DstPeer) // peer1 was re-enabled, so it is now chosen again -} - -//nolint:unparam // `svc` always receives `svcName` (allow passing other names in future) -func addRemoteSvc( - t *testing.T, - svc string, - peers []string, - lbScheme policyengine.LBScheme, - ph policyengine.PolicyDecider, -) { - t.Helper() - - srcs := []crds.ImportSource{} - for _, peer := range peers { - ph.AddPeer(peer) - srcs = append(srcs, crds.ImportSource{Peer: peer, ExportName: svc}) - } - - imp := crds.Import{ - ObjectMeta: metav1.ObjectMeta{ - Name: svcName, - Namespace: defaultNS, - }, - Spec: crds.ImportSpec{ - Sources: srcs, - LBScheme: string(lbScheme), - }, - } - ph.AddImport(&imp) -} diff --git a/pkg/policyengine/connectivitypdp/connection_request.go b/pkg/policyengine/connectivitypdp/connection_request.go deleted file mode 100644 index 487713fc..00000000 --- a/pkg/policyengine/connectivitypdp/connection_request.go +++ /dev/null @@ -1,43 +0,0 @@ -// Copyright 2023 The ClusterLink Authors. -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package connectivitypdp - -import ( - "github.com/clusterlink-net/clusterlink/pkg/apis/clusterlink.net/v1alpha1" -) - -// Direction indicates whether a given request is for an incoming or an outgoing connection. -type Direction int - -const ( - Incoming Direction = iota - Outgoing -) - -// ConnectionRequest encapsulates all the information needed to decide on a given incoming/outgoing connection. -type ConnectionRequest struct { - SrcWorkloadAttrs WorkloadAttrs - DstSvcName string - DstSvcNamespace string - - Direction Direction -} - -// ConnectionResponse encapsulates the returned decision on a given incoming incoming/outgoing connection. -type ConnectionResponse struct { - Action v1alpha1.AccessPolicyAction - DstPeer string - DstName string - DstNamespace string -} diff --git a/pkg/policyengine/loadBalancer.go b/pkg/policyengine/loadBalancer.go deleted file mode 100644 index 3c0818ce..00000000 --- a/pkg/policyengine/loadBalancer.go +++ /dev/null @@ -1,160 +0,0 @@ -// Copyright 2023 The ClusterLink Authors. -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package policyengine - -import ( - "fmt" - "math/rand" - "strings" - - "github.com/sirupsen/logrus" - "k8s.io/apimachinery/pkg/types" - - crds "github.com/clusterlink-net/clusterlink/pkg/apis/clusterlink.net/v1alpha1" -) - -var llog = logrus.WithField("component", "LoadBalancer") - -type LBScheme string - -const ( - Random LBScheme = "random" - RoundRobin LBScheme = "round-robin" - Static LBScheme = "static" -) - -// LBPolicy is being used by the CRUD interface, and is now deprecated. -type LBPolicy struct { - ServiceSrc string - ServiceDst string - Scheme LBScheme - DefaultPeer string -} - -type serviceState struct { - scheme LBScheme - impSources []crds.ImportSource - totalConnections int -} - -type LoadBalancer struct { - services map[types.NamespacedName]*serviceState // Keeps a state for each imported service -} - -// NewLoadBalancer returns a new instance of a LoadBalancer object. -func NewLoadBalancer() *LoadBalancer { - return &LoadBalancer{services: map[types.NamespacedName]*serviceState{}} -} - -// AddImport adds a new remote service for the load balancer to take decisions on. -func (lb *LoadBalancer) AddImport(imp *crds.Import) { - namespacedName := types.NamespacedName{Namespace: imp.Namespace, Name: imp.Name} - state, exists := lb.services[namespacedName] - if !exists { - lb.services[namespacedName] = &serviceState{ - scheme: LBScheme(imp.Spec.LBScheme), - impSources: imp.Spec.Sources, - totalConnections: 0, - } - } else { - state.impSources = imp.Spec.Sources - if imp.Spec.LBScheme != "" { - state.scheme = LBScheme(imp.Spec.LBScheme) - } - } -} - -// DeleteImport removes a remote service from the list of services the load balancer reasons about. -func (lb *LoadBalancer) DeleteImport(impName types.NamespacedName) { - delete(lb.services, impName) -} - -func nsNameFromFullName(fullName string) types.NamespacedName { - parts := strings.SplitN(fullName, string(types.Separator), 2) - if len(parts) == 2 { - return types.NamespacedName{Namespace: parts[0], Name: parts[1]} - } - return types.NamespacedName{Name: fullName} -} - -func (lb *LoadBalancer) lookupRandom(svc types.NamespacedName, svcSrcs []crds.ImportSource) *crds.ImportSource { - index := rand.Intn(len(svcSrcs)) //nolint:gosec // G404: use of weak random is fine for load balancing - plog.Infof("LoadBalancer selects index(%d) - source %v for service %s", index, svcSrcs[index], svc) - return &svcSrcs[index] -} - -func (lb *LoadBalancer) lookupRoundRobin(svc types.NamespacedName, svcSrcs []crds.ImportSource) *crds.ImportSource { - index := lb.services[svc].totalConnections % len(svcSrcs) - plog.Infof("LoadBalancer selects index(%d) - service source %v", index, svcSrcs[index]) - return &svcSrcs[index] -} - -func (lb *LoadBalancer) lookupStatic(svc types.NamespacedName, svcSrcs []crds.ImportSource) *crds.ImportSource { - srcs := lb.services[svc].impSources - if len(srcs) == 0 { // shouldn't happen - plog.Errorf("No sources for service %s. Resorting to random.", svc) - return lb.lookupRandom(svc, svcSrcs) - } - - defaultSrc := srcs[0] - for i := range svcSrcs { // ensure default is in the list - tgt := &svcSrcs[i] - if tgt.ExportNamespace == defaultSrc.ExportNamespace && tgt.ExportName == defaultSrc.ExportName && - tgt.Peer == defaultSrc.Peer { - plog.Infof("LoadBalancer selected default service source %v", defaultSrc) - return tgt - } - } - - plog.Errorf("Default source for service %s does not exist. "+ - "Falling back to other sources due to unavailability of default source", svc) - return lb.lookupRandom(svc, svcSrcs) -} - -// LookupWith decides which service-source to use for a given outgoing-connection request. -// The decision is based on the policy set for the service, and on its locally stored state. -func (lb *LoadBalancer) LookupWith(svc types.NamespacedName, svcSrcs []crds.ImportSource) (*crds.ImportSource, error) { - if len(svcSrcs) == 0 { - return nil, fmt.Errorf("no available sources for service %s", svc.String()) - } - - svcState, ok := lb.services[svc] - if !ok { - return nil, fmt.Errorf("unknown target service %s", svc.String()) - } - - svcState.totalConnections++ - - switch svcState.scheme { - case Random: - return lb.lookupRandom(svc, svcSrcs), nil - case RoundRobin: - return lb.lookupRoundRobin(svc, svcSrcs), nil - case Static: - return lb.lookupStatic(svc, svcSrcs), nil - default: - return lb.lookupRandom(svc, svcSrcs), nil - } -} - -// GetSvcSources returns all known sources for a given service in a slice of ImportSource objects. -func (lb *LoadBalancer) GetSvcSources(svc types.NamespacedName) ([]crds.ImportSource, error) { - svcState, ok := lb.services[svc] - if !ok || len(svcState.impSources) == 0 { - err := fmt.Errorf("no available sources for service %s", svc.String()) - plog.Error(err.Error()) - return nil, err - } - return svcState.impSources, nil -} diff --git a/pkg/policyengine/loadBalancer_test.go b/pkg/policyengine/loadBalancer_test.go deleted file mode 100644 index 7a9cc47d..00000000 --- a/pkg/policyengine/loadBalancer_test.go +++ /dev/null @@ -1,177 +0,0 @@ -// Copyright 2023 The ClusterLink Authors. -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package policyengine_test - -import ( - "testing" - - "github.com/stretchr/testify/require" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - - crds "github.com/clusterlink-net/clusterlink/pkg/apis/clusterlink.net/v1alpha1" - "github.com/clusterlink-net/clusterlink/pkg/policyengine" -) - -const ( - svc1 = "svc1" - svc2 = "svc2" - svc3 = "svc3" - peer1 = "peer1" - peer2 = "peer2" - peer3 = "peer3" - ns1 = "ns1" - ns2 = "ns2" -) - -var ( - svc1NS1 = types.NamespacedName{Namespace: ns1, Name: svc1} - svc2NS1 = types.NamespacedName{Namespace: ns1, Name: svc2} - svc3NS2 = types.NamespacedName{Namespace: ns2, Name: svc3} -) - -func makeSimpleImport(impName, impNs string, peers []string, lbScheme policyengine.LBScheme) *crds.Import { - srcs := []crds.ImportSource{} - for _, peer := range peers { - srcs = append(srcs, crds.ImportSource{Peer: peer, ExportName: impName, ExportNamespace: impNs}) - } - return &crds.Import{ - ObjectMeta: v1.ObjectMeta{Name: impName, Namespace: impNs}, - Spec: crds.ImportSpec{ - Sources: srcs, - LBScheme: string(lbScheme), - }, - } -} - -func addImports(lb *policyengine.LoadBalancer) { - // svc1 is imported from peer1 and peer2 - // svc2 is imported only from peer1 - // svc3 is imported from peer1 and peer2 - lb.AddImport(makeSimpleImport(svc1, ns1, []string{peer1, peer2}, "")) - lb.AddImport(makeSimpleImport(svc2, ns1, []string{peer1}, "")) - lb.AddImport(makeSimpleImport(svc3, ns2, []string{peer1, peer2}, "")) -} - -// We repeat lookups enough times to make sure we get all the peers allowed by the relevant policy. -func repeatLookups(t *testing.T, lb *policyengine.LoadBalancer, - svc types.NamespacedName, targets []crds.ImportSource, breakEarly bool, -) map[string]int { - t.Helper() - res := map[string]int{} - for i := 0; i < 100; i++ { - target, err := lb.LookupWith(svc, targets) - require.Nil(t, err) - - entry := target.Peer + "/" + target.ExportNamespace + "/" + target.ExportName - res[entry]++ - if breakEarly && len(res) == len(targets) { // All legal peers appeared in lookup - break - } - } - return res -} - -func TestAddAndDeleteImports(t *testing.T) { - lb := policyengine.NewLoadBalancer() - addImports(lb) - - svc1Tgts, err := lb.GetSvcSources(svc1NS1) - require.Nil(t, err) - require.Len(t, svc1Tgts, 2) - - svc2Tgts, err := lb.GetSvcSources(svc2NS1) - require.Nil(t, err) - require.Len(t, svc2Tgts, 1) - - svc3Tgts, err := lb.GetSvcSources(svc3NS2) - require.Nil(t, err) - require.Len(t, svc3Tgts, 2) - - lb.DeleteImport(svc2NS1) - _, err = lb.GetSvcSources(svc2NS1) - require.NotNil(t, err) - _, err = lb.LookupWith(svc2NS1, svc2Tgts) - require.NotNil(t, err) -} - -func TestLookupWithNoPeers(t *testing.T) { - lb := policyengine.NewLoadBalancer() - addImports(lb) - - _, err := lb.LookupWith(svc1NS1, nil) - require.NotNil(t, err) -} - -func TestRandomLookUp(t *testing.T) { - lb := policyengine.NewLoadBalancer() - addImports(lb) - - svc1Tgts, err := lb.GetSvcSources(svc1NS1) - require.Nil(t, err) - - tgt := repeatLookups(t, lb, svc1NS1, svc1Tgts, true) - require.Len(t, tgt, 2) -} - -func TestFixedPeer(t *testing.T) { - lb := policyengine.NewLoadBalancer() - addImports(lb) - - lb.AddImport(makeSimpleImport(svc1, ns1, []string{peer1, peer2}, policyengine.Static)) - - svc1Tgts, err := lb.GetSvcSources(svc1NS1) - require.Nil(t, err) - - tgts := repeatLookups(t, lb, svc1NS1, svc1Tgts, false) - require.Len(t, tgts, 1) - var tgt string - for tgt = range tgts { - break - } - require.Equal(t, "peer1/ns1/svc1", tgt) // should always select first src, which is currently peer1 - - lb.AddImport(makeSimpleImport(svc1, ns1, []string{peer2, peer1}, policyengine.Static)) // now peer2 is the first peer - - tgts = repeatLookups(t, lb, svc1NS1, svc1Tgts, false) - require.Len(t, tgts, 1) - for tgt = range tgts { - break - } - require.Equal(t, "peer2/ns1/svc1", tgt) // peer2 is now the first one - - svc1Tgts = []crds.ImportSource{svc1Tgts[0]} // limit targets to just peer1 - tgts = repeatLookups(t, lb, svc1NS1, svc1Tgts, false) - require.Len(t, tgts, 1) - for tgt = range tgts { - break - } - require.Equal(t, "peer1/ns1/svc1", tgt) // peer1 is the fallback -} - -func TestRoundRobin(t *testing.T) { - lb := policyengine.NewLoadBalancer() - addImports(lb) - - lb.AddImport(makeSimpleImport(svc1, ns1, []string{peer1, peer2}, policyengine.RoundRobin)) - - svc1Tgts, err := lb.GetSvcSources(svc1NS1) - require.Nil(t, err) - - tgts := repeatLookups(t, lb, svc1NS1, svc1Tgts, false) - require.Len(t, tgts, 2) - for _, occurrences := range tgts { - require.Equal(t, 50, occurrences) - } -} diff --git a/tests/e2e/k8s/test_basic.go b/tests/e2e/k8s/test_basic.go index fbd7d8b8..e49dee97 100644 --- a/tests/e2e/k8s/test_basic.go +++ b/tests/e2e/k8s/test_basic.go @@ -20,7 +20,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/clusterlink-net/clusterlink/pkg/apis/clusterlink.net/v1alpha1" - "github.com/clusterlink-net/clusterlink/pkg/policyengine" "github.com/clusterlink-net/clusterlink/tests/e2e/k8s/services" "github.com/clusterlink-net/clusterlink/tests/e2e/k8s/services/httpecho" "github.com/clusterlink-net/clusterlink/tests/e2e/k8s/util" @@ -158,7 +157,6 @@ func (s *TestSuite) TestControlplaneCRUD() { peerFromServer := *objects.(*v1alpha1.Peer) require.Equal(s.T(), peerFromServer.Name, peer.Name) require.Equal(s.T(), peerFromServer.Spec, peer.Spec) - require.Equal(s.T(), peerFromServer.Status, v1alpha1.PeerStatus{}) // list peers objects, err = client0.Peers.List() @@ -252,12 +250,14 @@ func (s *TestSuite) TestControlplaneCRUD() { // get export objects, err = client1.Exports.Get(export.Name) require.Nil(s.T(), err) - require.Equal(s.T(), *objects.(*v1alpha1.Export), export) + exportFromServer := *objects.(*v1alpha1.Export) + require.Equal(s.T(), export.Name, exportFromServer.Name) + require.Equal(s.T(), export.Spec, exportFromServer.Spec) // list exports objects, err = client1.Exports.List() require.Nil(s.T(), err) - require.ElementsMatch(s.T(), *objects.(*[]v1alpha1.Export), []v1alpha1.Export{export}) + require.ElementsMatch(s.T(), *objects.(*[]v1alpha1.Export), []v1alpha1.Export{exportFromServer}) // allow export to be accessed require.Nil(s.T(), client1.AccessPolicies.Create(&policy)) @@ -269,7 +269,7 @@ func (s *TestSuite) TestControlplaneCRUD() { // create false binding to verify LB policy imp.Spec.Sources = append(imp.Spec.Sources, v1alpha1.ImportSource{Peer: cl[2].Name(), ExportName: httpEchoService.Name, ExportNamespace: cl[2].Namespace()}) - imp.Spec.LBScheme = string(policyengine.Static) + imp.Spec.LBScheme = v1alpha1.LBSchemeStatic require.Nil(s.T(), client0.Imports.Update(&imp)) // verify access @@ -342,27 +342,6 @@ func (s *TestSuite) TestControlplaneCRUD() { require.Nil(s.T(), err) require.Equal(s.T(), str, cl[1].Name()) - // make cl[2] the first peer, so static LB policy will choose it - imp.Spec.Sources = []v1alpha1.ImportSource{ - {Peer: cl[2].Name(), ExportName: httpEchoService.Name, ExportNamespace: cl[2].Namespace()}, - {Peer: cl[1].Name(), ExportName: httpEchoService.Name, ExportNamespace: cl[1].Namespace()}, - } - require.Nil(s.T(), client0.Imports.Update(&imp)) - - // verify no access after update - _, err = accessService(false, &services.ConnectionResetError{}) - require.ErrorIs(s.T(), err, &services.ConnectionResetError{}) - // update LB policy back - imp.Spec.Sources = []v1alpha1.ImportSource{ - {Peer: cl[1].Name(), ExportName: httpEchoService.Name, ExportNamespace: cl[1].Namespace()}, - {Peer: cl[2].Name(), ExportName: httpEchoService.Name, ExportNamespace: cl[2].Namespace()}, - } - require.Nil(s.T(), client0.Imports.Update(&imp)) - // verify access after update back - str, err = accessService(false, nil) - require.Nil(s.T(), err) - require.Equal(s.T(), str, cl[1].Name()) - // delete import require.Nil(s.T(), client0.Imports.Delete(imp.Name)) // get import after delete @@ -438,10 +417,10 @@ func (s *TestSuite) TestControlplaneCRUD() { require.Nil(s.T(), err) require.ElementsMatch(s.T(), *objects.(*[]v1alpha1.Import), []v1alpha1.Import{importFromServer}) - // verify peers after restart + // verify 2 peers after restart objects, err = client0.Peers.List() require.Nil(s.T(), err) - require.ElementsMatch(s.T(), *objects.(*[]v1alpha1.Peer), []v1alpha1.Peer{peerFromServer, peer2}) + require.Equal(s.T(), len(*objects.(*[]v1alpha1.Peer)), 2) // verify access policies after restart objects, err = client0.AccessPolicies.List() @@ -451,7 +430,7 @@ func (s *TestSuite) TestControlplaneCRUD() { // verify exports after restart objects, err = client1.Exports.List() require.Nil(s.T(), err) - require.ElementsMatch(s.T(), *objects.(*[]v1alpha1.Export), []v1alpha1.Export{export}) + require.Equal(s.T(), len(*objects.(*[]v1alpha1.Export)), 1) // verify access after restart str, err = accessService(true, nil) diff --git a/tests/e2e/k8s/test_loadbalancing.go b/tests/e2e/k8s/test_loadbalancing.go new file mode 100644 index 00000000..28d8bf22 --- /dev/null +++ b/tests/e2e/k8s/test_loadbalancing.go @@ -0,0 +1,208 @@ +// Copyright 2023 The ClusterLink Authors. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package k8s + +import ( + "context" + + "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/clusterlink-net/clusterlink/pkg/apis/clusterlink.net/v1alpha1" + "github.com/clusterlink-net/clusterlink/tests/e2e/k8s/services/httpecho" + "github.com/clusterlink-net/clusterlink/tests/e2e/k8s/util" +) + +func (s *TestSuite) TestLoadBalancingRoundRobin() { + cl, err := s.fabric.DeployClusterlinks(3, nil) + require.Nil(s.T(), err) + + importedService := &util.Service{ + Name: "my-import", + Port: 80, + } + + imp := &v1alpha1.Import{ + ObjectMeta: metav1.ObjectMeta{ + Name: importedService.Name, + Namespace: cl[0].Namespace(), + }, + Spec: v1alpha1.ImportSpec{ + Port: importedService.Port, + }, + } + + for i := 0; i < 3; i++ { + require.Nil(s.T(), cl[0].CreatePeer(cl[i])) + + require.Nil(s.T(), cl[i].CreateService(&httpEchoService)) + require.Nil(s.T(), cl[i].CreateExport(&httpEchoService)) + require.Nil(s.T(), cl[i].CreatePolicy(util.PolicyAllowAll)) + + imp.Spec.Sources = append(imp.Spec.Sources, v1alpha1.ImportSource{ + Peer: cl[i].Name(), + ExportName: httpEchoService.Name, + ExportNamespace: cl[i].Namespace(), + }) + } + + require.Nil(s.T(), cl[0].Cluster().Resources().Create(context.Background(), imp)) + + // test default lb scheme (round-robin) + for i := 0; i < 30; i++ { + data, err := cl[0].AccessService(httpecho.GetEchoValue, importedService, true, nil) + require.Nil(s.T(), err) + require.Equal(s.T(), data, cl[(i+1)%3].Name()) + } + + // take down first source + require.Nil(s.T(), cl[0].DeleteExport(httpEchoService.Name)) + for i := 0; i < 30; i++ { + data, err := cl[0].AccessService(httpecho.GetEchoValue, importedService, false, nil) + require.Nil(s.T(), err) + require.Equal(s.T(), data, cl[1+(i%2)].Name()) + } + + // take down second source + require.Nil(s.T(), cl[1].DeleteExport(httpEchoService.Name)) + for i := 0; i < 30; i++ { + data, err := cl[0].AccessService(httpecho.GetEchoValue, importedService, false, nil) + require.Nil(s.T(), err) + require.Equal(s.T(), data, cl[2].Name()) + } +} + +func (s *TestSuite) TestLoadBalancingStatic() { + cl, err := s.fabric.DeployClusterlinks(3, nil) + require.Nil(s.T(), err) + + importedService := &util.Service{ + Name: "my-import", + Port: 80, + } + + imp := &v1alpha1.Import{ + ObjectMeta: metav1.ObjectMeta{ + Name: importedService.Name, + Namespace: cl[0].Namespace(), + }, + Spec: v1alpha1.ImportSpec{ + LBScheme: v1alpha1.LBSchemeStatic, + Port: importedService.Port, + }, + } + + for i := 0; i < 3; i++ { + require.Nil(s.T(), cl[0].CreatePeer(cl[i])) + + require.Nil(s.T(), cl[i].CreateService(&httpEchoService)) + require.Nil(s.T(), cl[i].CreateExport(&httpEchoService)) + require.Nil(s.T(), cl[i].CreatePolicy(util.PolicyAllowAll)) + + imp.Spec.Sources = append(imp.Spec.Sources, v1alpha1.ImportSource{ + Peer: cl[i].Name(), + ExportName: httpEchoService.Name, + ExportNamespace: cl[i].Namespace(), + }) + } + + require.Nil(s.T(), cl[0].Cluster().Resources().Create(context.Background(), imp)) + + // test static lb scheme + for i := 0; i < 30; i++ { + data, err := cl[0].AccessService(httpecho.GetEchoValue, importedService, true, nil) + require.Nil(s.T(), err) + require.Equal(s.T(), data, cl[0].Name()) + } + + // take down first source + require.Nil(s.T(), cl[0].DeleteExport(httpEchoService.Name)) + for i := 0; i < 30; i++ { + data, err := cl[0].AccessService(httpecho.GetEchoValue, importedService, false, nil) + require.Nil(s.T(), err) + require.Equal(s.T(), data, cl[1].Name()) + } + + // take down second source + require.Nil(s.T(), cl[1].DeleteExport(httpEchoService.Name)) + for i := 0; i < 30; i++ { + data, err := cl[0].AccessService(httpecho.GetEchoValue, importedService, false, nil) + require.Nil(s.T(), err) + require.Equal(s.T(), data, cl[2].Name()) + } +} + +func (s *TestSuite) TestLoadBalancingRandom() { + cl, err := s.fabric.DeployClusterlinks(3, nil) + require.Nil(s.T(), err) + + importedService := &util.Service{ + Name: "my-import", + Port: 80, + } + + imp := &v1alpha1.Import{ + ObjectMeta: metav1.ObjectMeta{ + Name: importedService.Name, + Namespace: cl[0].Namespace(), + }, + Spec: v1alpha1.ImportSpec{ + LBScheme: v1alpha1.LBSchemeRandom, + Port: importedService.Port, + }, + } + + for i := 0; i < 3; i++ { + require.Nil(s.T(), cl[0].CreatePeer(cl[i])) + + require.Nil(s.T(), cl[i].CreateService(&httpEchoService)) + require.Nil(s.T(), cl[i].CreateExport(&httpEchoService)) + require.Nil(s.T(), cl[i].CreatePolicy(util.PolicyAllowAll)) + + imp.Spec.Sources = append(imp.Spec.Sources, v1alpha1.ImportSource{ + Peer: cl[i].Name(), + ExportName: httpEchoService.Name, + ExportNamespace: cl[i].Namespace(), + }) + } + + require.Nil(s.T(), cl[0].Cluster().Resources().Create(context.Background(), imp)) + + // test random lb scheme + names := make(map[string]interface{}) + for i := 0; i < 100; i++ { + data, err := cl[0].AccessService(httpecho.GetEchoValue, importedService, true, nil) + require.Nil(s.T(), err) + names[data] = nil + } + require.Equal(s.T(), 3, len(names)) + + // take down first source + require.Nil(s.T(), cl[0].DeleteExport(httpEchoService.Name)) + names = make(map[string]interface{}) + for i := 0; i < 30; i++ { + data, err := cl[0].AccessService(httpecho.GetEchoValue, importedService, false, nil) + require.Nil(s.T(), err) + names[data] = nil + } + require.Equal(s.T(), 2, len(names)) + + // take down second source + require.Nil(s.T(), cl[1].DeleteExport(httpEchoService.Name)) + for i := 0; i < 30; i++ { + data, err := cl[0].AccessService(httpecho.GetEchoValue, importedService, false, nil) + require.Nil(s.T(), err) + require.Equal(s.T(), data, cl[2].Name()) + } +} diff --git a/tests/e2e/k8s/test_policy.go b/tests/e2e/k8s/test_policy.go index 365831c7..17e29b97 100644 --- a/tests/e2e/k8s/test_policy.go +++ b/tests/e2e/k8s/test_policy.go @@ -17,7 +17,7 @@ import ( "github.com/stretchr/testify/require" "github.com/clusterlink-net/clusterlink/pkg/apis/clusterlink.net/v1alpha1" - "github.com/clusterlink-net/clusterlink/pkg/policyengine" + "github.com/clusterlink-net/clusterlink/pkg/controlplane/authz" "github.com/clusterlink-net/clusterlink/tests/e2e/k8s/services" "github.com/clusterlink-net/clusterlink/tests/e2e/k8s/services/httpecho" "github.com/clusterlink-net/clusterlink/tests/e2e/k8s/util" @@ -41,13 +41,13 @@ func (s *TestSuite) TestPolicyLabels() { // In addition, create a policy to only allow traffic from cl[1] - apply in cl[0] (on ingress) allowEchoPolicyName := "allow-access-to-echo-svc" dstLabels := map[string]string{ - policyengine.ServiceNameLabel: httpEchoService.Name, - policyengine.GatewayNameLabel: cl[0].Name(), + authz.ServiceNameLabel: httpEchoService.Name, + authz.GatewayNameLabel: cl[0].Name(), } allowEchoPolicy := util.NewPolicy(allowEchoPolicyName, v1alpha1.AccessPolicyActionAllow, nil, dstLabels) require.Nil(s.T(), cl[1].CreatePolicy(allowEchoPolicy)) - srcLabels := map[string]string{policyengine.GatewayNameLabel: cl[1].Name()} + srcLabels := map[string]string{authz.GatewayNameLabel: cl[1].Name()} specificSrcPeerPolicy := util.NewPolicy("specific-peer", v1alpha1.AccessPolicyActionAllow, srcLabels, nil) require.Nil(s.T(), cl[0].CreatePolicy(specificSrcPeerPolicy)) @@ -57,7 +57,7 @@ func (s *TestSuite) TestPolicyLabels() { // 2. Add a "deny echo service" policy in cl[1] - should have a higher priority and so block the connection denyEchoPolicyName := "deny-access-to-echo" - dstLabels = map[string]string{policyengine.ServiceNameLabel: httpEchoService.Name} + dstLabels = map[string]string{authz.ServiceNameLabel: httpEchoService.Name} denyEchoPolicy := util.NewPolicy(denyEchoPolicyName, v1alpha1.AccessPolicyActionDeny, nil, dstLabels) require.Nil(s.T(), cl[1].CreatePolicy(denyEchoPolicy)) @@ -73,7 +73,7 @@ func (s *TestSuite) TestPolicyLabels() { // 4. Add a "deny peer cl0" policy in cl[1] - should have a higher priority and so block the connection denyCl0PolicyName := "deny-access-to-cl0" - dstLabels = map[string]string{policyengine.GatewayNameLabel: cl[0].Name()} + dstLabels = map[string]string{authz.GatewayNameLabel: cl[0].Name()} denyCl0Policy := util.NewPolicy(denyCl0PolicyName, v1alpha1.AccessPolicyActionDeny, nil, dstLabels) require.Nil(s.T(), cl[1].CreatePolicy(denyCl0Policy)) @@ -106,8 +106,8 @@ func (s *TestSuite) TestPolicyLabels() { require.Nil(s.T(), cl[1].DeletePolicy(allowEchoPolicyName)) badSvcLabels := map[string]string{ - policyengine.ServiceNameLabel: "bad-svc", - policyengine.GatewayNameLabel: cl[0].Name(), + authz.ServiceNameLabel: "bad-svc", + authz.GatewayNameLabel: cl[0].Name(), } badSvcPolicy := util.NewPolicy("bad-svc", v1alpha1.AccessPolicyActionAllow, nil, badSvcLabels) require.Nil(s.T(), cl[1].CreatePolicy(badSvcPolicy)) @@ -132,8 +132,8 @@ func (s *TestSuite) TestPrivilegedPolicies() { require.Nil(s.T(), cl[1].CreateImport(importedService, cl[0], httpEchoService.Name)) dstLabels := map[string]string{ - policyengine.ServiceNameLabel: httpEchoService.Name, - policyengine.GatewayNameLabel: cl[0].Name(), + authz.ServiceNameLabel: httpEchoService.Name, + authz.GatewayNameLabel: cl[0].Name(), } privDenyPolicyName := "priv-deny" diff --git a/tests/e2e/k8s/util/clusterlink.go b/tests/e2e/k8s/util/clusterlink.go index 219384ee..50cd5430 100644 --- a/tests/e2e/k8s/util/clusterlink.go +++ b/tests/e2e/k8s/util/clusterlink.go @@ -292,6 +292,16 @@ func (c *ClusterLink) GetAllExports() (*[]v1alpha1.Export, error) { } func (c *ClusterLink) DeleteExport(name string) error { + if c.crdMode { + return c.cluster.Resources().Delete( + context.Background(), + &v1alpha1.Export{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: c.namespace, + }, + }) + } return c.client.Exports.Delete(name) } diff --git a/tests/k8s.sh b/tests/k8s.sh index a4578cf9..506cbfb1 100755 --- a/tests/k8s.sh +++ b/tests/k8s.sh @@ -76,7 +76,7 @@ function test_k8s { # import kubectl exec -i gwctl -- gwctl create peer --host cl-dataplane --port 443 --name peer1 kubectl exec -i gwctl -- gwctl create import --name bla --port 9999 --peer peer1 - kubectl cp $SCRIPT_DIR/../pkg/policyengine/examples/allowAll.json gwctl:/tmp/allowAll.json + kubectl cp $SCRIPT_DIR/../examples/policies/allowAll.json gwctl:/tmp/allowAll.json kubectl exec -i gwctl -- gwctl create policy --type access --policyFile /tmp/allowAll.json # get imported service port diff --git a/website/content/en/docs/main/concepts/policies.md b/website/content/en/docs/main/concepts/policies.md index 957dd9be..c9297476 100644 --- a/website/content/en/docs/main/concepts/policies.md +++ b/website/content/en/docs/main/concepts/policies.md @@ -142,7 +142,7 @@ spec: - workloadSelector: {} ``` -More examples are available on our repo under [policyengine/examples][]. +More examples are available on our repo under [examples/policies][]. [peers]: {{< relref "peers" >}} [services]: {{< relref "services" >}} @@ -151,4 +151,4 @@ More examples are available on our repo under [policyengine/examples][]. [labels]: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/ [deployed and configured]: {{< relref "../getting-started/users#setup" >}} [Kuberenetes label selector]: https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.27/#labelselector-v1-meta -[policyengine/examples]: https://github.com/clusterlink-net/clusterlink/tree/main/pkg/policyengine/examples +[examples/policies]: https://github.com/clusterlink-net/clusterlink/tree/main/examples/policies