Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend Volume Runtime Interface for cross-cluster Events #1092

Merged
merged 2 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions broker/volumebroker/server/event_list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// SPDX-FileCopyrightText: 2024 SAP SE or an SAP affiliate company and IronCore contributors
// SPDX-License-Identifier: Apache-2.0

package server

import (
"context"
"fmt"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

storagev1alpha1 "github.com/ironcore-dev/ironcore/api/storage/v1alpha1"
"github.com/ironcore-dev/ironcore/broker/volumebroker/apiutils"
irievent "github.com/ironcore-dev/ironcore/iri/apis/event/v1alpha1"
iri "github.com/ironcore-dev/ironcore/iri/apis/volume/v1alpha1"
)

const (
InvolvedObjectKind = "Volume"
InvolvedObjectKindSelector = "involvedObject.kind"
InvolvedObjectAPIVersionSelector = "involvedObject.apiVersion"
)

func (s *Server) listEvents(ctx context.Context) ([]*irievent.Event, error) {
log := ctrl.LoggerFrom(ctx)
volumeEventList := &v1.EventList{}
selectorField := fields.Set{
InvolvedObjectKindSelector: InvolvedObjectKind,
InvolvedObjectAPIVersionSelector: storagev1alpha1.SchemeGroupVersion.String(),
}
if err := s.client.List(ctx, volumeEventList,
client.InNamespace(s.namespace), client.MatchingFieldsSelector{Selector: selectorField.AsSelector()},
); err != nil {
return nil, err
}

var iriEvents []*irievent.Event
for _, volumeEvent := range volumeEventList.Items {
ironcoreVolume, err := s.getIronCoreVolume(ctx, volumeEvent.InvolvedObject.Name)
if err != nil {
log.V(1).Info("Unable to get ironcore volume", "VolumeName", volumeEvent.InvolvedObject.Name)
continue
}
volumeObjectMetadata, err := apiutils.GetObjectMetadata(&ironcoreVolume.ObjectMeta)
if err != nil {
continue
}
iriEvent := &irievent.Event{
Spec: &irievent.EventSpec{
InvolvedObjectMeta: volumeObjectMetadata,
Reason: volumeEvent.Reason,
Message: volumeEvent.Message,
Type: volumeEvent.Type,
EventTime: volumeEvent.LastTimestamp.Unix(),
},
}
iriEvents = append(iriEvents, iriEvent)
}
return iriEvents, nil
}

func (s *Server) filterEvents(events []*irievent.Event, filter *iri.EventFilter) []*irievent.Event {
if filter == nil {
return events
}

var (
res []*irievent.Event
sel = labels.SelectorFromSet(filter.LabelSelector)
)
for _, iriEvent := range events {
if !sel.Matches(labels.Set(iriEvent.Spec.InvolvedObjectMeta.Labels)) {
continue
}

if filter.EventsFromTime > 0 && filter.EventsToTime > 0 {
if iriEvent.Spec.EventTime < filter.EventsFromTime || iriEvent.Spec.EventTime > filter.EventsToTime {
continue
}
}

res = append(res, iriEvent)
}
return res
}

func (s *Server) ListEvents(ctx context.Context, req *iri.ListEventsRequest) (*iri.ListEventsResponse, error) {
iriEvents, err := s.listEvents(ctx)
if err != nil {
return nil, fmt.Errorf("error listing volume events : %w", err)
}

iriEvents = s.filterEvents(iriEvents, req.Filter)

return &iri.ListEventsResponse{
Events: iriEvents,
}, nil
}
120 changes: 120 additions & 0 deletions broker/volumebroker/server/event_list_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// SPDX-FileCopyrightText: 2024 SAP SE or an SAP affiliate company and IronCore contributors
// SPDX-License-Identifier: Apache-2.0

package server_test

import (
"time"

storagev1alpha1 "github.com/ironcore-dev/ironcore/api/storage/v1alpha1"
irievent "github.com/ironcore-dev/ironcore/iri/apis/event/v1alpha1"
iri "github.com/ironcore-dev/ironcore/iri/apis/volume/v1alpha1"

irimeta "github.com/ironcore-dev/ironcore/iri/apis/meta/v1alpha1"
volumepoolletv1alpha1 "github.com/ironcore-dev/ironcore/poollet/volumepoollet/api/v1alpha1"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)

var _ = Describe("ListEvents", func() {
ns, srv := SetupTest()
volumeClass := SetupVolumeClass()

It("should correctly list events", func(ctx SpecContext) {
By("creating volume")
Expect(storagev1alpha1.AddToScheme(scheme.Scheme)).To(Succeed())

k8sManager, err := ctrl.NewManager(cfg, ctrl.Options{
Scheme: scheme.Scheme,
})
Expect(err).ToNot(HaveOccurred())

res, err := srv.CreateVolume(ctx, &iri.CreateVolumeRequest{
Volume: &iri.Volume{
Metadata: &irimeta.ObjectMetadata{
Labels: map[string]string{
volumepoolletv1alpha1.VolumeUIDLabel: "foobar",
},
},
Spec: &iri.VolumeSpec{
Class: volumeClass.Name,
Resources: &iri.VolumeResources{
StorageBytes: 100,
},
},
},
})

Expect(err).NotTo(HaveOccurred())
Expect(res).NotTo(BeNil())

By("getting the ironcore volume")
ironcoreVolume := &storagev1alpha1.Volume{}
ironcoreVolumeKey := client.ObjectKey{Namespace: ns.Name, Name: res.Volume.Metadata.Id}
Expect(k8sClient.Get(ctx, ironcoreVolumeKey, ironcoreVolume)).To(Succeed())

By("generating the volume events")
eventGeneratedTime := time.Now()
eventRecorder := k8sManager.GetEventRecorderFor("test-recorder")
eventRecorder.Event(ironcoreVolume, corev1.EventTypeNormal, "testing", "this is test event")

By("listing the volume events with no filters")
Eventually(func(g Gomega) []*irievent.Event {
resp, err := srv.ListEvents(ctx, &iri.ListEventsRequest{})
g.Expect(err).NotTo(HaveOccurred())
return resp.Events
}).Should(ConsistOf(
HaveField("Spec", SatisfyAll(
HaveField("InvolvedObjectMeta.Id", Equal(ironcoreVolume.Name)),
HaveField("Reason", Equal("testing")),
HaveField("Message", Equal("this is test event")),
HaveField("Type", Equal(corev1.EventTypeNormal)),
)),
),
)

By("listing the volume events with matching label and time filters")
resp, err := srv.ListEvents(ctx, &iri.ListEventsRequest{Filter: &iri.EventFilter{
LabelSelector: map[string]string{volumepoolletv1alpha1.VolumeUIDLabel: "foobar"},
EventsFromTime: eventGeneratedTime.Unix(),
EventsToTime: time.Now().Unix(),
}})

Expect(err).NotTo(HaveOccurred())

Expect(resp.Events).To(ConsistOf(
HaveField("Spec", SatisfyAll(
HaveField("InvolvedObjectMeta.Id", Equal(ironcoreVolume.Name)),
HaveField("Reason", Equal("testing")),
HaveField("Message", Equal("this is test event")),
HaveField("Type", Equal(corev1.EventTypeNormal)),
)),
),
)

By("listing the volume events with non matching label filter")
resp, err = srv.ListEvents(ctx, &iri.ListEventsRequest{Filter: &iri.EventFilter{
LabelSelector: map[string]string{"foo": "bar"},
EventsFromTime: eventGeneratedTime.Unix(),
EventsToTime: time.Now().Unix(),
}})
Expect(err).NotTo(HaveOccurred())

Expect(resp.Events).To(BeEmpty())

By("listing the volume events with matching label filter and non matching time filter")
resp, err = srv.ListEvents(ctx, &iri.ListEventsRequest{Filter: &iri.EventFilter{
LabelSelector: map[string]string{volumepoolletv1alpha1.VolumeUIDLabel: "foobar"},
EventsFromTime: eventGeneratedTime.Add(-10 * time.Minute).Unix(),
EventsToTime: eventGeneratedTime.Add(-5 * time.Minute).Unix(),
}})
Expect(err).NotTo(HaveOccurred())

Expect(resp.Events).To(BeEmpty())
})
})
1 change: 1 addition & 0 deletions broker/volumebroker/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func setOptionsDefaults(o *Options) {

var _ iri.VolumeRuntimeServer = (*Server)(nil)

//+kubebuilder:rbac:groups="",resources=events,verbs=get;list;watch
//+kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=storage.ironcore.dev,resources=volumes,verbs=get;list;watch;create;update;patch;delete

Expand Down
166 changes: 166 additions & 0 deletions broker/volumebroker/server/server_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
// SPDX-FileCopyrightText: 2023 SAP SE or an SAP affiliate company and IronCore contributors
// SPDX-License-Identifier: Apache-2.0

package server_test

import (
"context"
"testing"
"time"

"github.com/ironcore-dev/controller-utils/buildutils"
"github.com/ironcore-dev/controller-utils/modutils"
corev1alpha1 "github.com/ironcore-dev/ironcore/api/core/v1alpha1"
storagev1alpha1 "github.com/ironcore-dev/ironcore/api/storage/v1alpha1"
"github.com/ironcore-dev/ironcore/broker/common/idgen"
"github.com/ironcore-dev/ironcore/broker/volumebroker/server"
utilsenvtest "github.com/ironcore-dev/ironcore/utils/envtest"
"github.com/ironcore-dev/ironcore/utils/envtest/apiserver"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
. "sigs.k8s.io/controller-runtime/pkg/envtest/komega"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
)

var (
cfg *rest.Config
testEnv *envtest.Environment
testEnvExt *utilsenvtest.EnvironmentExtensions
k8sClient client.Client
)

const (
eventuallyTimeout = 3 * time.Second
pollingInterval = 50 * time.Millisecond
consistentlyDuration = 1 * time.Second
apiServiceTimeout = 5 * time.Minute
)

func TestServer(t *testing.T) {
SetDefaultConsistentlyPollingInterval(pollingInterval)
SetDefaultEventuallyPollingInterval(pollingInterval)
SetDefaultEventuallyTimeout(eventuallyTimeout)
SetDefaultConsistentlyDuration(consistentlyDuration)

RegisterFailHandler(Fail)
RunSpecs(t, "Server Suite")
}

var _ = BeforeSuite(func() {
logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true)))

var err error
By("bootstrapping test environment")
testEnv = &envtest.Environment{}
testEnvExt = &utilsenvtest.EnvironmentExtensions{
APIServiceDirectoryPaths: []string{
modutils.Dir("github.com/ironcore-dev/ironcore", "config", "apiserver", "apiservice", "bases"),
},
ErrorIfAPIServicePathIsMissing: true,
}

cfg, err = utilsenvtest.StartWithExtensions(testEnv, testEnvExt)
Expect(err).NotTo(HaveOccurred())
Expect(cfg).NotTo(BeNil())

DeferCleanup(utilsenvtest.StopWithExtensions, testEnv, testEnvExt)

Expect(storagev1alpha1.AddToScheme(scheme.Scheme)).To(Succeed())

// Init package-level k8sClient
k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme})
Expect(err).NotTo(HaveOccurred())
Expect(k8sClient).NotTo(BeNil())
SetClient(k8sClient)

apiSrv, err := apiserver.New(cfg, apiserver.Options{
MainPath: "github.com/ironcore-dev/ironcore/cmd/ironcore-apiserver",
BuildOptions: []buildutils.BuildOption{buildutils.ModModeMod},
ETCDServers: []string{testEnv.ControlPlane.Etcd.URL.String()},
Host: testEnvExt.APIServiceInstallOptions.LocalServingHost,
Port: testEnvExt.APIServiceInstallOptions.LocalServingPort,
CertDir: testEnvExt.APIServiceInstallOptions.LocalServingCertDir,
})
Expect(err).NotTo(HaveOccurred())

Expect(apiSrv.Start()).To(Succeed())
DeferCleanup(apiSrv.Stop)

Expect(utilsenvtest.WaitUntilAPIServicesReadyWithTimeout(apiServiceTimeout, testEnvExt, k8sClient, scheme.Scheme)).To(Succeed())
})

func SetupTest() (*corev1.Namespace, *server.Server) {
var (
ns = &corev1.Namespace{}
srv = &server.Server{}
volumePool = &storagev1alpha1.VolumePool{}
)

BeforeEach(func(ctx SpecContext) {
*ns = corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "test-ns-",
},
}
Expect(k8sClient.Create(ctx, ns)).To(Succeed(), "failed to create test namespace")
DeferCleanup(k8sClient.Delete, ns)

By("creating a volume pool")
*volumePool = storagev1alpha1.VolumePool{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "volumepool-",
Labels: map[string]string{
"pool": "test-pool",
},
},
Spec: storagev1alpha1.VolumePoolSpec{
ProviderID: "network-id",
},
}
Expect(k8sClient.Create(ctx, volumePool)).To(Succeed(), "failed to create test volume pool")
DeferCleanup(k8sClient.Delete, volumePool)

newSrv, err := server.New(cfg, server.Options{
Namespace: ns.Name,
VolumePoolName: volumePool.Name,
VolumePoolSelector: map[string]string{
"pool": "test-pool",
},
IDGen: idgen.Default,
})
Expect(err).NotTo(HaveOccurred())
*srv = *newSrv
})

return ns, srv
}

func SetupVolumeClass() *storagev1alpha1.VolumeClass {
volumeClass := &storagev1alpha1.VolumeClass{}

BeforeEach(func(ctx SpecContext) {
*volumeClass = storagev1alpha1.VolumeClass{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "volume-class-",
},
Capabilities: corev1alpha1.ResourceList{
corev1alpha1.ResourceIOPS: resource.MustParse("250Mi"),
corev1alpha1.ResourceTPS: resource.MustParse("1500"),
},
}
Expect(k8sClient.Create(ctx, volumeClass)).To(Succeed())
DeferCleanup(func(ctx context.Context) error {
return client.IgnoreNotFound(k8sClient.Delete(ctx, volumeClass))
})
})

return volumeClass
}
Loading