Skip to content

Commit

Permalink
Merge pull request #152 from terrytangyuan/cp-np-change
Browse files Browse the repository at this point in the history
[cherry-pick] fix: adds network policies to unblock traffic between components  (#145)
  • Loading branch information
Jooho authored Jan 19, 2024
2 parents 316e8f4 + e4df8a3 commit 974b7f8
Show file tree
Hide file tree
Showing 7 changed files with 259 additions and 85 deletions.
91 changes: 76 additions & 15 deletions controllers/kserve_inferenceservice_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,54 +21,64 @@ import (
kservev1beta1 "github.com/kserve/kserve/pkg/apis/serving/v1beta1"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
. "github.com/onsi/gomega/gstruct"
gomegatypes "github.com/onsi/gomega/types"
"github.com/opendatahub-io/odh-model-controller/controllers/constants"
routev1 "github.com/openshift/api/route/v1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"knative.dev/pkg/apis"
"time"
"sigs.k8s.io/controller-runtime/pkg/client"
"strings"
)

var _ = Describe("The Openshift Kserve model controller", func() {

When("creating a Kserve ServiceRuntime & InferenceService", func() {
var testNs string

BeforeEach(func() {
ctx := context.Background()
istioNamespace := &corev1.Namespace{
testNs = appendRandomNameTo("test-namespace")
testNamespace := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: constants.IstioNamespace,
Namespace: constants.IstioNamespace,
Name: testNs,
Namespace: testNs,
},
}
Expect(cli.Create(ctx, istioNamespace)).Should(Succeed())
Expect(cli.Create(ctx, testNamespace)).Should(Succeed())

inferenceServiceConfig := &corev1.ConfigMap{}
err := convertToStructuredResource(InferenceServiceConfigPath1, inferenceServiceConfig)
Expect(err).NotTo(HaveOccurred())
Expect(cli.Create(ctx, inferenceServiceConfig)).Should(Succeed())
Expect(convertToStructuredResource(InferenceServiceConfigPath1, inferenceServiceConfig)).To(Succeed())
if err := cli.Create(ctx, inferenceServiceConfig); err != nil && !errors.IsAlreadyExists(err) {
Fail(err.Error())
}

servingRuntime := &kservev1alpha1.ServingRuntime{}
err = convertToStructuredResource(KserveServingRuntimePath1, servingRuntime)
Expect(err).NotTo(HaveOccurred())
Expect(cli.Create(ctx, servingRuntime)).Should(Succeed())

Expect(convertToStructuredResource(KserveServingRuntimePath1, servingRuntime)).To(Succeed())
if err := cli.Create(ctx, servingRuntime); err != nil && !errors.IsAlreadyExists(err) {
Fail(err.Error())
}
})

It("With Kserve InferenceService a Route be created", func() {
inferenceService := &kservev1beta1.InferenceService{}
err := convertToStructuredResource(KserveInferenceServicePath1, inferenceService)
Expect(err).NotTo(HaveOccurred())
inferenceService.SetNamespace(testNs)

Expect(cli.Create(ctx, inferenceService)).Should(Succeed())

By("By checking that the controller has not created the Route")
//time.Sleep(5000 * time.Millisecond)
Consistently(func() error {
route := &routev1.Route{}
key := types.NamespacedName{Name: getKServeRouteName(inferenceService), Namespace: constants.IstioNamespace}
err = cli.Get(ctx, key, route)
return err
}, time.Second*1, interval).Should(HaveOccurred())
}, timeout, interval).Should(HaveOccurred())

deployedInferenceService := &kservev1beta1.InferenceService{}
err = cli.Get(ctx, types.NamespacedName{Name: inferenceService.Name, Namespace: inferenceService.Namespace}, deployedInferenceService)
Expand All @@ -87,11 +97,62 @@ var _ = Describe("The Openshift Kserve model controller", func() {
key := types.NamespacedName{Name: getKServeRouteName(inferenceService), Namespace: constants.IstioNamespace}
err = cli.Get(ctx, key, route)
return err
}, timeout, interval).ShouldNot(HaveOccurred())
}, timeout, interval).Should(Succeed())
})

It("should create required network policies when KServe is used", func() {
// given
inferenceService := &kservev1beta1.InferenceService{}
Expect(convertToStructuredResource(KserveInferenceServicePath1, inferenceService)).To(Succeed())
inferenceService.SetNamespace(testNs)

// when
Expect(cli.Create(ctx, inferenceService)).Should(Succeed())

// then
By("ensuring that the controller has created required network policies")
networkPolicies := &v1.NetworkPolicyList{}
Eventually(func() []v1.NetworkPolicy {
err := cli.List(ctx, networkPolicies, client.InNamespace(inferenceService.Namespace))
if err != nil {
Fail(err.Error())
}
return networkPolicies.Items
}, timeout, interval).Should(
ContainElements(
withMatchingNestedField("ObjectMeta.Name", Equal("allow-from-openshift-monitoring-ns")),
withMatchingNestedField("ObjectMeta.Name", Equal("allow-openshift-ingress")),
withMatchingNestedField("ObjectMeta.Name", Equal("allow-from-opendatahub-ns")),
),
)
})

})
})

func withMatchingNestedField(path string, matcher gomegatypes.GomegaMatcher) gomegatypes.GomegaMatcher {
if path == "" {
Fail("cannot handle empty path")
}

fields := strings.Split(path, ".")

// Reverse the path, so we start composing matchers from the leaf up
for i, j := 0, len(fields)-1; i < j; i, j = i+1, j-1 {
fields[i], fields[j] = fields[j], fields[i]
}

matchFields := MatchFields(IgnoreExtras,
Fields{fields[0]: matcher},
)

for i := 1; i < len(fields); i++ {
matchFields = MatchFields(IgnoreExtras, Fields{fields[i]: matchFields})
}

return matchFields
}

func getKServeRouteName(isvc *kservev1beta1.InferenceService) string {
return isvc.Name + "-" + isvc.Namespace
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,35 +98,35 @@ func (r *KserveIstioPeerAuthenticationReconciler) getExistingResource(ctx contex
return r.peerAuthenticationHandler.FetchPeerAuthentication(ctx, log, types.NamespacedName{Name: peerAuthenticationName, Namespace: isvc.Namespace})
}

func (r *KserveIstioPeerAuthenticationReconciler) processDelta(ctx context.Context, log logr.Logger, desiredPod *istiosecv1beta1.PeerAuthentication, existingPod *istiosecv1beta1.PeerAuthentication) (err error) {
func (r *KserveIstioPeerAuthenticationReconciler) processDelta(ctx context.Context, log logr.Logger, desiredPeerAuthentication *istiosecv1beta1.PeerAuthentication, existingPeerAuthentication *istiosecv1beta1.PeerAuthentication) (err error) {
comparator := comparators.GetPeerAuthenticationComparator()
delta := r.deltaProcessor.ComputeDelta(comparator, desiredPod, existingPod)
delta := r.deltaProcessor.ComputeDelta(comparator, desiredPeerAuthentication, existingPeerAuthentication)

if !delta.HasChanges() {
log.V(1).Info("No delta found")
return nil
}

if delta.IsAdded() {
log.V(1).Info("Delta found", "create", desiredPod.GetName())
if err = r.client.Create(ctx, desiredPod); err != nil {
log.V(1).Info("Delta found", "create", desiredPeerAuthentication.GetName())
if err = r.client.Create(ctx, desiredPeerAuthentication); err != nil {
return
}
}
if delta.IsUpdated() {
log.V(1).Info("Delta found", "update", existingPod.GetName())
rp := existingPod.DeepCopy()
rp.Spec.Selector = desiredPod.Spec.Selector
rp.Spec.Mtls = desiredPod.Spec.Mtls
rp.Spec.PortLevelMtls = desiredPod.Spec.PortLevelMtls
log.V(1).Info("Delta found", "update", existingPeerAuthentication.GetName())
rp := existingPeerAuthentication.DeepCopy()
rp.Spec.Selector = desiredPeerAuthentication.Spec.Selector
rp.Spec.Mtls = desiredPeerAuthentication.Spec.Mtls
rp.Spec.PortLevelMtls = desiredPeerAuthentication.Spec.PortLevelMtls

if err = r.client.Update(ctx, rp); err != nil {
return
}
}
if delta.IsRemoved() {
log.V(1).Info("Delta found", "delete", existingPod.GetName())
if err = r.client.Delete(ctx, existingPod); err != nil {
log.V(1).Info("Delta found", "delete", existingPeerAuthentication.GetName())
if err = r.client.Delete(ctx, existingPeerAuthentication); err != nil {
return
}
}
Expand Down
18 changes: 9 additions & 9 deletions controllers/reconcilers/kserve_istio_podmonitor_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,33 +100,33 @@ func (r *KserveIstioPodMonitorReconciler) getExistingResource(ctx context.Contex
return r.podMonitorHandler.FetchPodMonitor(ctx, log, types.NamespacedName{Name: istioPodMonitorName, Namespace: isvc.Namespace})
}

func (r *KserveIstioPodMonitorReconciler) processDelta(ctx context.Context, log logr.Logger, desiredPod *v1.PodMonitor, existingPod *v1.PodMonitor) (err error) {
func (r *KserveIstioPodMonitorReconciler) processDelta(ctx context.Context, log logr.Logger, desiredPodMonitor *v1.PodMonitor, existingPodMonitor *v1.PodMonitor) (err error) {
comparator := comparators.GetPodMonitorComparator()
delta := r.deltaProcessor.ComputeDelta(comparator, desiredPod, existingPod)
delta := r.deltaProcessor.ComputeDelta(comparator, desiredPodMonitor, existingPodMonitor)

if !delta.HasChanges() {
log.V(1).Info("No delta found")
return nil
}

if delta.IsAdded() {
log.V(1).Info("Delta found", "create", desiredPod.GetName())
if err = r.client.Create(ctx, desiredPod); err != nil {
log.V(1).Info("Delta found", "create", desiredPodMonitor.GetName())
if err = r.client.Create(ctx, desiredPodMonitor); err != nil {
return
}
}
if delta.IsUpdated() {
log.V(1).Info("Delta found", "update", existingPod.GetName())
rp := existingPod.DeepCopy()
rp.Spec = desiredPod.Spec
log.V(1).Info("Delta found", "update", existingPodMonitor.GetName())
rp := existingPodMonitor.DeepCopy()
rp.Spec = desiredPodMonitor.Spec

if err = r.client.Update(ctx, rp); err != nil {
return
}
}
if delta.IsRemoved() {
log.V(1).Info("Delta found", "delete", existingPod.GetName())
if err = r.client.Delete(ctx, existingPod); err != nil {
log.V(1).Info("Delta found", "delete", existingPodMonitor.GetName())
if err = r.client.Delete(ctx, existingPodMonitor); err != nil {
return
}
}
Expand Down
22 changes: 11 additions & 11 deletions controllers/reconcilers/kserve_istio_servicemonitor_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,35 +98,35 @@ func (r *KserveIstioServiceMonitorReconciler) getExistingResource(ctx context.Co
return r.serviceMonitorHandler.FetchServiceMonitor(ctx, log, types.NamespacedName{Name: istioServiceMonitorName, Namespace: isvc.Namespace})
}

func (r *KserveIstioServiceMonitorReconciler) processDelta(ctx context.Context, log logr.Logger, desiredService *v1.ServiceMonitor, existingService *v1.ServiceMonitor) (err error) {
func (r *KserveIstioServiceMonitorReconciler) processDelta(ctx context.Context, log logr.Logger, desiredServiceMonitor *v1.ServiceMonitor, existingServiceMonitor *v1.ServiceMonitor) (err error) {
comparator := comparators.GetServiceMonitorComparator()
delta := r.deltaProcessor.ComputeDelta(comparator, desiredService, existingService)
delta := r.deltaProcessor.ComputeDelta(comparator, desiredServiceMonitor, existingServiceMonitor)

if !delta.HasChanges() {
log.V(1).Info("No delta found")
return nil
}

if delta.IsAdded() {
log.V(1).Info("Delta found", "create", desiredService.GetName())
if err = r.client.Create(ctx, desiredService); err != nil {
log.V(1).Info("Delta found", "create", desiredServiceMonitor.GetName())
if err = r.client.Create(ctx, desiredServiceMonitor); err != nil {
return
}
}
if delta.IsUpdated() {
log.V(1).Info("Delta found", "update", existingService.GetName())
rp := existingService.DeepCopy()
rp.Annotations = desiredService.Annotations
rp.Labels = desiredService.Labels
rp.Spec = desiredService.Spec
log.V(1).Info("Delta found", "update", existingServiceMonitor.GetName())
rp := existingServiceMonitor.DeepCopy()
rp.Annotations = desiredServiceMonitor.Annotations
rp.Labels = desiredServiceMonitor.Labels
rp.Spec = desiredServiceMonitor.Spec

if err = r.client.Update(ctx, rp); err != nil {
return
}
}
if delta.IsRemoved() {
log.V(1).Info("Delta found", "delete", existingService.GetName())
if err = r.client.Delete(ctx, existingService); err != nil {
log.V(1).Info("Delta found", "delete", existingServiceMonitor.GetName())
if err = r.client.Delete(ctx, existingServiceMonitor); err != nil {
return
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,33 +102,33 @@ func (r *KserveMetricsServiceMonitorReconciler) getExistingResource(ctx context.
return r.serviceMonitorHandler.FetchServiceMonitor(ctx, log, types.NamespacedName{Name: getMetricsServiceMonitorName(isvc), Namespace: isvc.Namespace})
}

func (r *KserveMetricsServiceMonitorReconciler) processDelta(ctx context.Context, log logr.Logger, desiredService *v1.ServiceMonitor, existingService *v1.ServiceMonitor) (err error) {
func (r *KserveMetricsServiceMonitorReconciler) processDelta(ctx context.Context, log logr.Logger, desiredServiceMonitor *v1.ServiceMonitor, existingServiceMonitor *v1.ServiceMonitor) (err error) {
comparator := comparators.GetServiceMonitorComparator()
delta := r.deltaProcessor.ComputeDelta(comparator, desiredService, existingService)
delta := r.deltaProcessor.ComputeDelta(comparator, desiredServiceMonitor, existingServiceMonitor)

if !delta.HasChanges() {
log.V(1).Info("No delta found")
return nil
}

if delta.IsAdded() {
log.V(1).Info("Delta found", "create", desiredService.GetName())
if err = r.client.Create(ctx, desiredService); err != nil {
log.V(1).Info("Delta found", "create", desiredServiceMonitor.GetName())
if err = r.client.Create(ctx, desiredServiceMonitor); err != nil {
return
}
}
if delta.IsUpdated() {
log.V(1).Info("Delta found", "update", existingService.GetName())
rp := existingService.DeepCopy()
rp.Spec = desiredService.Spec
log.V(1).Info("Delta found", "update", existingServiceMonitor.GetName())
rp := existingServiceMonitor.DeepCopy()
rp.Spec = desiredServiceMonitor.Spec

if err = r.client.Update(ctx, rp); err != nil {
return
}
}
if delta.IsRemoved() {
log.V(1).Info("Delta found", "delete", existingService.GetName())
if err = r.client.Delete(ctx, existingService); err != nil {
log.V(1).Info("Delta found", "delete", existingServiceMonitor.GetName())
if err = r.client.Delete(ctx, existingServiceMonitor); err != nil {
return
}
}
Expand Down
Loading

0 comments on commit 974b7f8

Please sign in to comment.