From f9494c33a8a5f429e5d1ab0fd51850107b9af301 Mon Sep 17 00:00:00 2001 From: JACQUES Francois Date: Mon, 27 Feb 2023 15:34:43 +0100 Subject: [PATCH] Record ACI events back to kubernetes (#470) Signed-off-by: JACQUES Francois --- cmd/virtual-kubelet/main.go | 13 +- pkg/featureflag/feature_flag.go | 9 +- pkg/featureflag/feature_flag_test.go | 5 + pkg/provider/aci.go | 64 +++++++++- pkg/provider/aci_confidential_test.go | 2 +- pkg/provider/aci_init_container_test.go | 2 +- pkg/provider/aci_test.go | 135 +++++++++++++++++---- pkg/provider/aci_volumes_test.go | 16 +-- pkg/provider/containergroup_to_pod_test.go | 2 +- pkg/provider/podsTracker.go | 24 ++++ pkg/provider/podsTracker_test.go | 122 ++++++++++++++++++- pkg/tests/utils.go | 11 ++ 12 files changed, 356 insertions(+), 49 deletions(-) diff --git a/cmd/virtual-kubelet/main.go b/cmd/virtual-kubelet/main.go index 18001601..9cf20039 100644 --- a/cmd/virtual-kubelet/main.go +++ b/cmd/virtual-kubelet/main.go @@ -198,13 +198,12 @@ func main() { } return nil } - + k8sClient, err := nodeutil.ClientsetFromEnv(kubeConfigPath) + if err != nil { + log.G(ctx).Fatal(err) + } withClient := func(cfg *nodeutil.NodeConfig) error { - client, err := nodeutil.ClientsetFromEnv(kubeConfigPath) - if err != nil { - return err - } - return nodeutil.WithClient(client)(cfg) + return nodeutil.WithClient(k8sClient)(cfg) } run := func(ctx context.Context) error { @@ -223,7 +222,7 @@ func main() { } p, err := azproviderv2.NewACIProvider(ctx, cfgPath, azConfig, azACIAPIs, cfg, nodeName, operatingSystem, os.Getenv("VKUBELET_POD_IP"), - int32(listenPort), clusterDomain) + int32(listenPort), clusterDomain, k8sClient) p.ConfigureNode(ctx, cfg.Node) return p, nil, err }, diff --git a/pkg/featureflag/feature_flag.go b/pkg/featureflag/feature_flag.go index 91f70618..7b7fc3db 100644 --- a/pkg/featureflag/feature_flag.go +++ b/pkg/featureflag/feature_flag.go @@ -13,11 +13,15 @@ import ( const ( InitContainerFeature = "init-container" ConfidentialComputeFeature = "confidential-compute" + + // Events : support ACI to K8s event translation and broadcasting + Events = "events" ) var enabledFeatures = []string{ InitContainerFeature, ConfidentialComputeFeature, + Events, } type FlagIdentifier struct { @@ -30,6 +34,8 @@ func InitFeatureFlag(ctx context.Context) *FlagIdentifier { var featureFlags FlagIdentifier featureFlags.enabledFeatures = enabledFeatures + log.G(ctx).Infof("features %v enabled", enabledFeatures) + return &featureFlags } @@ -42,9 +48,10 @@ func (fi *FlagIdentifier) IsEnabled(ctx context.Context, feature string) bool { } for _, feat := range fi.enabledFeatures { if feat == feature { - log.G(ctx).Infof("feature %s is enabled", feature) + log.G(ctx).Debugf("feature %s is enabled", feature) return true } } + log.G(ctx).Debugf("feature %s is disabled", feature) return false } diff --git a/pkg/featureflag/feature_flag_test.go b/pkg/featureflag/feature_flag_test.go index 157b034b..1fc52793 100644 --- a/pkg/featureflag/feature_flag_test.go +++ b/pkg/featureflag/feature_flag_test.go @@ -29,6 +29,11 @@ func TestIsEnabled(t *testing.T) { feature: ConfidentialComputeFeature, shouldEnabled: true, }, + { + description: fmt.Sprintf(" %s feature should be enabled", Events), + feature: Events, + shouldEnabled: true, + }, } for i, tc := range cases { t.Run(tc.description, func(t *testing.T) { diff --git a/pkg/provider/aci.go b/pkg/provider/aci.go index b23fef4d..461cee47 100644 --- a/pkg/provider/aci.go +++ b/pkg/provider/aci.go @@ -11,6 +11,7 @@ import ( "fmt" "io" "os" + "path" "reflect" "strings" "time" @@ -33,10 +34,16 @@ import ( "github.com/virtual-kubelet/virtual-kubelet/trace" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/intstr" corev1listers "k8s.io/client-go/listers/core/v1" "github.com/cpuguy83/dockercfg" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + corev1client "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/tools/reference" ) const ( @@ -80,6 +87,7 @@ type ACIProvider struct { podsL corev1listers.PodLister enabledFeatures *featureflag.FlagIdentifier providerNetwork network.ProviderNetwork + eventRecorder record.EventRecorder resourceGroup string region string @@ -170,7 +178,7 @@ func isValidACIRegion(region string) bool { } // NewACIProvider creates a new ACIProvider. -func NewACIProvider(ctx context.Context, config string, azConfig auth.Config, azAPIs client.AzClientsInterface, pCfg nodeutil.ProviderConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32, clusterDomain string) (*ACIProvider, error) { +func NewACIProvider(ctx context.Context, config string, azConfig auth.Config, azAPIs client.AzClientsInterface, pCfg nodeutil.ProviderConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32, clusterDomain string, kubeClient kubernetes.Interface) (*ACIProvider, error) { var p ACIProvider var err error @@ -208,6 +216,12 @@ func NewACIProvider(ctx context.Context, config string, azConfig auth.Config, az if p.providerNetwork.VnetResourceGroup == "" { p.providerNetwork.VnetResourceGroup = p.resourceGroup } + + providerEB := record.NewBroadcaster() + providerEB.StartLogging(log.G(ctx).Infof) + providerEB.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: kubeClient.CoreV1().Events(v1.NamespaceAll)}) + p.eventRecorder = providerEB.NewRecorder(scheme.Scheme, v1.EventSource{Component: path.Join(nodeName, "pod-controller")}) + // If the log analytics file has been specified, load workspace credentials from the file if logAnalyticsAuthFile := os.Getenv("LOG_ANALYTICS_AUTH_LOCATION"); logAnalyticsAuthFile != "" { p.diagnostics, err = analytics.NewContainerGroupDiagnosticsFromFile(logAnalyticsAuthFile) @@ -764,9 +778,11 @@ func (p *ACIProvider) NotifyPods(ctx context.Context, notifierCb func(*v1.Pod)) // Capture the notifier to be used for communicating updates to VK p.tracker = &PodsTracker{ - pods: p.podsL, - updateCb: notifierCb, - handler: p, + pods: p.podsL, + updateCb: notifierCb, + handler: p, + lastEventCheck: time.UnixMicro(0), + eventRecorder: p.eventRecorder, } go p.tracker.StartTracking(ctx) @@ -803,6 +819,46 @@ func (p *ACIProvider) FetchPodStatus(ctx context.Context, ns, name string) (*v1. return p.GetPodStatus(ctx, ns, name) } +func (p *ACIProvider) FetchPodEvents(ctx context.Context, pod *v1.Pod, evtSink func(timestamp *time.Time, object runtime.Object, eventtype, reason, messageFmt string, args ...interface{})) error { + ctx, span := trace.StartSpan(ctx, "ACIProvider.FetchPodEvents") + defer span.End() + if !p.enabledFeatures.IsEnabled(ctx, featureflag.Events) { + return nil + } + + ctx = addAzureAttributes(ctx, span, p) + cgName := containerGroupName(pod.Namespace, pod.Name) + cg, err := p.azClientsAPIs.GetContainerGroup(ctx, p.resourceGroup, cgName) + if err != nil { + return err + } + if *cg.Tags["NodeName"] != p.nodeName { + return errors.Wrapf(err, "container group %s found with mismatching node", cgName) + } + + if cg.Properties != nil && cg.Properties.InstanceView != nil && cg.Properties.InstanceView.Events != nil { + for _, evt := range cg.Properties.InstanceView.Events { + evtSink(evt.LastTimestamp, pod, *evt.Type, *evt.Name, *evt.Message) + } + } + + if cg.Properties != nil && cg.Properties.Containers != nil { + for _, container := range cg.Properties.Containers { + if container.Properties != nil && container.Properties.InstanceView != nil && container.Properties.InstanceView.Events != nil { + for _, evt := range container.Properties.InstanceView.Events { + podReference, err := reference.GetReference(scheme.Scheme, pod) + if err != nil { + log.G(ctx).WithError(err).Warnf("cannot get k8s object reference from pod %s in namespace %s", pod.Name, pod.Namespace) + } + podReference.FieldPath = fmt.Sprintf("spec.containers{%s}", *container.Name) + evtSink(evt.LastTimestamp, podReference, *evt.Type, *evt.Name, *evt.Message) + } + } + } + } + return nil +} + // CleanupPod interface impl func (p *ACIProvider) CleanupPod(ctx context.Context, ns, name string) error { ctx, span := trace.StartSpan(ctx, "ACIProvider.CleanupPod") diff --git a/pkg/provider/aci_confidential_test.go b/pkg/provider/aci_confidential_test.go index 68afddcf..f22257c0 100644 --- a/pkg/provider/aci_confidential_test.go +++ b/pkg/provider/aci_confidential_test.go @@ -123,7 +123,7 @@ func TestCreatePodWithConfidentialComputeProperties(t *testing.T) { ctx := context.TODO() provider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), - NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl)) + NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl), nil) if err != nil { t.Fatal("Unable to create test provider", err) } diff --git a/pkg/provider/aci_init_container_test.go b/pkg/provider/aci_init_container_test.go index bbc4e6ea..5d3c15b8 100644 --- a/pkg/provider/aci_init_container_test.go +++ b/pkg/provider/aci_init_container_test.go @@ -183,7 +183,7 @@ func TestCreatePodWithInitContainers(t *testing.T) { ctx := context.TODO() provider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), - NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl)) + NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl), nil) if err != nil { t.Fatal("Unable to create test provider", err) } diff --git a/pkg/provider/aci_test.go b/pkg/provider/aci_test.go index 90d84a48..866c7768 100644 --- a/pkg/provider/aci_test.go +++ b/pkg/provider/aci_test.go @@ -31,6 +31,10 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/record" ) const ( @@ -199,7 +203,7 @@ func TestCreatePodWithoutResourceSpec(t *testing.T) { return nil } provider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), - NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl)) + NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl), nil) if err != nil { t.Fatal("failed to create the test provider", err) } @@ -249,7 +253,7 @@ func TestCreatePodWithWindowsOS(t *testing.T) { return nil } provider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), - NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl)) + NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl), nil) if err != nil { t.Fatal("failed to create the test provider", err) } @@ -318,7 +322,7 @@ func TestCreatePodWithResourceRequestOnly(t *testing.T) { } provider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), - NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl)) + NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl), nil) if err != nil { t.Fatal("failed to create the test provider", err) } @@ -374,7 +378,7 @@ func TestCreatePodWithGPU(t *testing.T) { } provider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), - NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl)) + NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl), nil) if err != nil { t.Fatal("failed to create the test provider", err) } @@ -412,7 +416,7 @@ func TestCreatePodWithGPUSKU(t *testing.T) { } provider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), - NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl)) + NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl), nil) if err != nil { t.Fatal("failed to create the test provider", err) } @@ -477,7 +481,7 @@ func TestCreatePodWithResourceRequestAndLimit(t *testing.T) { pod := testsutil.CreatePodObj(podName, podNamespace) provider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), - NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl)) + NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl), nil) if err != nil { t.Fatal("failed to create the test provider", err) } @@ -500,7 +504,7 @@ func TestGetPodsWithEmptyList(t *testing.T) { } provider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), - NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl)) + NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl), nil) if err != nil { t.Fatal("failed to create the test provider", err) } @@ -571,7 +575,7 @@ func TestGetPodsWithoutResourceRequestsLimits(t *testing.T) { } provider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), - NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl)) + NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl), nil) if err != nil { t.Fatal("failed to create the test provider", err) } @@ -622,7 +626,7 @@ func TestGetPodWithoutResourceRequestsLimits(t *testing.T) { return result, nil } provider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), - NewMockSecretLister(mockCtrl), podLister) + NewMockSecretLister(mockCtrl), podLister, nil) if err != nil { t.Fatal("failed to create the test provider", err) } @@ -709,7 +713,7 @@ func createNewACIMock() *MockACIProvider { }) } -func createTestProvider(aciMocks *MockACIProvider, configMapMocker *MockConfigMapLister, secretMocker *MockSecretLister, podMocker *MockPodLister) (*ACIProvider, error) { +func createTestProvider(aciMocks *MockACIProvider, configMapMocker *MockConfigMapLister, secretMocker *MockSecretLister, podMocker *MockPodLister, kubeClient kubernetes.Interface) (*ACIProvider, error) { ctx := context.TODO() err := setAuthConfig() @@ -717,6 +721,10 @@ func createTestProvider(aciMocks *MockACIProvider, configMapMocker *MockConfigMa return nil, err } + if kubeClient == nil { + kubeClient = fake.NewSimpleClientset() + } + err = os.Setenv("ACI_VNET_NAME", fakeVnetName) if err != nil { return nil, err @@ -751,7 +759,7 @@ func createTestProvider(aciMocks *MockACIProvider, configMapMocker *MockConfigMa cfg.Node.Name = fakeNodeName cfg.Node.Status.NodeInfo.OperatingSystem = operatingSystem - provider, err := NewACIProvider(ctx, "example.toml", azConfig, aciMocks, cfg, fakeNodeName, operatingSystem, "0.0.0.0", 10250, "cluster.local") + provider, err := NewACIProvider(ctx, "example.toml", azConfig, aciMocks, cfg, fakeNodeName, operatingSystem, "0.0.0.0", 10250, "cluster.local", kubeClient) if err != nil { return nil, err } @@ -786,7 +794,7 @@ func TestConfigureNode(t *testing.T) { } aciMocks := createNewACIMock() provider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), - NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl)) + NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl), nil) if err != nil { t.Fatal("failed to create the test provider", err) } @@ -822,7 +830,7 @@ func TestCreatePodWithNamedLivenessProbe(t *testing.T) { pod := testsutil.CreatePodObj(podName, podNamespace) provider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), - NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl)) + NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl), nil) if err != nil { t.Fatal("failed to create the test provider", err) } @@ -859,7 +867,7 @@ func TestCreatePodWithLivenessProbe(t *testing.T) { pod := testsutil.CreatePodObj(podName, podNamespace) provider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), - NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl)) + NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl), nil) if err != nil { t.Fatal("failed to create the test provider", err) } @@ -976,7 +984,7 @@ func TestCreatePodWithReadinessProbe(t *testing.T) { pod := testsutil.CreatePodObj(podName, podNamespace) provider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), - NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl)) + NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl), nil) if err != nil { t.Fatal("failed to create the test provider", err) } @@ -1110,7 +1118,7 @@ func TestCreatedPodWithContainerPort(t *testing.T) { } provider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), - NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl)) + NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl), nil) if err != nil { t.Fatal("Unable to create test provider", err) } @@ -1150,7 +1158,7 @@ func TestGetPodWithContainerID(t *testing.T) { } provider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), - NewMockSecretLister(mockCtrl), podLister) + NewMockSecretLister(mockCtrl), podLister, nil) if err != nil { t.Fatal("failed to create the test provider", err) } @@ -1315,7 +1323,7 @@ func TestDeleteContainerGroup(t *testing.T) { } provider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), - NewMockSecretLister(mockCtrl), podLister) + NewMockSecretLister(mockCtrl), podLister, nil) if err != nil { t.Fatal("failed to create the test provider", err) } @@ -1389,7 +1397,7 @@ func TestGetPodStatus(t *testing.T) { aciMocks := createNewACIMock() provider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), - NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl)) + NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl), nil) if err != nil { t.Fatal("failed to create the test provider", err) } @@ -1552,7 +1560,7 @@ func TestGetImagePullSecretsWithDockerCfgSecret(t *testing.T) { tc.callSecretMocks(mockSecretLister) provider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), - mockSecretLister, NewMockPodLister(mockCtrl)) + mockSecretLister, NewMockPodLister(mockCtrl), nil) if err != nil { t.Fatal("failed to create the test provider", err) } @@ -1695,7 +1703,7 @@ func TestGetImagePullSecretsWithDockerConfigJSONSecret(t *testing.T) { tc.callSecretMocks(mockSecretLister) provider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), - mockSecretLister, NewMockPodLister(mockCtrl)) + mockSecretLister, NewMockPodLister(mockCtrl), nil) if err != nil { t.Fatal("failed to create the test provider", err) } @@ -1736,7 +1744,7 @@ func TestGetContainerLogs(t *testing.T) { } provider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), - NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl)) + NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl), nil) if err != nil { t.Fatal("failed to create the test provider", err) } @@ -1798,7 +1806,7 @@ func TestGetGPUSKU(t *testing.T) { aciMocks := createNewACIMock() provider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), - NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl)) + NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl), nil) if err != nil { t.Fatal("failed to create the test provider", err) } @@ -1861,3 +1869,84 @@ func TestGetGPUSKU(t *testing.T) { }) } } + +func TestFetchStandardPodsEvents(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + }, + Spec: corev1.PodSpec{}, + } + + cg := testsutil.CreateContainerGroupObj(containerGroupName(podNamespace, podNamespace), "", "Succeeded", + testsutil.CreateACIContainersListObj(runningState, "Initializing", + testsutil.CgCreationTime.Add(time.Second*2), + testsutil.CgCreationTime.Add(time.Second*3), + false, false, false), "Succeeded") + now := time.Now().Add(-10 * time.Second) + cg.Properties.InstanceView.Events = []*azaciv2.Event{ + testsutil.CreateContainerGroupEvent(1, now, now, "mount failed", "cg event", "Warning"), + } + cg.Properties.Containers[0].Properties.InstanceView.Events = []*azaciv2.Event{ + testsutil.CreateContainerGroupEvent(1, now, now, "container started", "cg container event", "Normal"), + } + + aciMocks := createNewACIMock() + aciMocks.MockGetContainerGroup = func(ctx context.Context, resourceGroup, containerGroupName string) (*azaciv2.ContainerGroup, error) { + return cg, nil + } + aciMocks.MockGetContainerGroupInfo = func(ctx context.Context, resourceGroup, namespace, name, nodeName string) (*azaciv2.ContainerGroup, error) { + return aciMocks.MockGetContainerGroup(ctx, resourceGroup, containerGroupName(name, namespace)) + } + podLister := NewMockPodLister(mockCtrl) + podLister.EXPECT().List(labels.Everything()).Times(2).Return([]*corev1.Pod{pod}, nil) + + provider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), + NewMockSecretLister(mockCtrl), podLister, nil) + if err != nil { + t.Fatal("failed to create the test provider", err) + } + + podsTracker := &PodsTracker{ + pods: podLister, + updateCb: func(pod *corev1.Pod) {}, + handler: provider, + lastEventCheck: time.UnixMicro(0), + } + + fakeRecorder := record.NewFakeRecorder(2) + podsTracker.eventRecorder = fakeRecorder + podsTracker.updatePodsLoop(context.Background()) + close(fakeRecorder.Events) + broadcastEvents := make([]string, 0) + for evt := range fakeRecorder.Events { + broadcastEvents = append(broadcastEvents, evt) + } + assert.DeepEqual(t, []string{ + "Warning cg event mount failed", + "Normal cg container event container started", + }, broadcastEvents) + + fakeRecorder = record.NewFakeRecorder(2) + podsTracker.eventRecorder = fakeRecorder + now = time.Now().Add(10 * time.Second) + cg.Properties.InstanceView.Events = []*azaciv2.Event{ + testsutil.CreateContainerGroupEvent(1, now, now, "mount failed 2", "cg event", "InvalidType"), + } + cg.Properties.Containers[0].Properties.InstanceView.Events = []*azaciv2.Event{ + testsutil.CreateContainerGroupEvent(1, now, now, "container started 2", "cg container event", "Normal"), + } + podsTracker.updatePodsLoop(context.Background()) + close(fakeRecorder.Events) + broadcastEvents = broadcastEvents[0:0] + for evt := range fakeRecorder.Events { + broadcastEvents = append(broadcastEvents, evt) + } + assert.DeepEqual(t, []string{ + "Warning cg event mount failed 2", + "Normal cg container event container started 2", + }, broadcastEvents) +} diff --git a/pkg/provider/aci_volumes_test.go b/pkg/provider/aci_volumes_test.go index 21b6abc8..503b6e3c 100644 --- a/pkg/provider/aci_volumes_test.go +++ b/pkg/provider/aci_volumes_test.go @@ -44,7 +44,7 @@ func TestCreatedPodWithAzureFilesVolume(t *testing.T) { aciMocks := createNewACIMock() provider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), - mockSecretLister, NewMockPodLister(mockCtrl)) + mockSecretLister, NewMockPodLister(mockCtrl), nil) if err != nil { t.Fatal("Unable to create test provider", err) } @@ -306,7 +306,7 @@ func TestCreatePodWithProjectedVolume(t *testing.T) { pod.Spec.Volumes = fakeVolumes provider, err := createTestProvider(aciMocks, configMapLister, - secretLister, NewMockPodLister(mockCtrl)) + secretLister, NewMockPodLister(mockCtrl), nil) if err != nil { t.Fatal("Unable to create test provider", err) } @@ -476,7 +476,7 @@ func TestCreatePodWithCSIVolume(t *testing.T) { pod.Spec.Volumes = tc.volumes provider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), - mockSecretLister, NewMockPodLister(mockCtrl)) + mockSecretLister, NewMockPodLister(mockCtrl), nil) if err != nil { t.Fatal("Unable to create test provider", err) } @@ -588,7 +588,7 @@ func TestGetVolumesForSecretVolume(t *testing.T) { pod.Spec.Volumes = fakePodVolumes provider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), - mockSecretLister, NewMockPodLister(mockCtrl)) + mockSecretLister, NewMockPodLister(mockCtrl), nil) if err != nil { t.Fatal("Unable to create test provider", err) } @@ -702,7 +702,7 @@ func TestGetVolumesForConfigMapVolume(t *testing.T) { pod.Spec.Volumes = fakePodVolumes provider, err := createTestProvider(aciMocks, mockConfigMapLister, - NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl)) + NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl), nil) if err != nil { t.Fatal("Unable to create test provider", err) } @@ -826,7 +826,7 @@ func TestGetVolumesProjectedVolSecretSource(t *testing.T) { pod.Spec.Volumes = fakePodVolumes provider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), - mockSecretLister, NewMockPodLister(mockCtrl)) + mockSecretLister, NewMockPodLister(mockCtrl), nil) if err != nil { t.Fatal("Unable to create test provider", err) } @@ -949,7 +949,7 @@ func TestGetVolumesProjectedVolConfMapSource(t *testing.T) { pod.Spec.Volumes = fakePodVolumes provider, err := createTestProvider(aciMocks, mockConfigMapLister, - NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl)) + NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl), nil) if err != nil { t.Fatal("Unable to create test provider", err) } @@ -1065,7 +1065,7 @@ func TestGetVolumesProjectedVolSvcAcctTokenSource(t *testing.T) { pod.Spec.ServiceAccountName = serviceAccountName provider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), - mockSecretLister, NewMockPodLister(mockCtrl)) + mockSecretLister, NewMockPodLister(mockCtrl), nil) if err != nil { t.Fatal("Unable to create test provider", err) } diff --git a/pkg/provider/containergroup_to_pod_test.go b/pkg/provider/containergroup_to_pod_test.go index 18797dc9..0ca66c21 100644 --- a/pkg/provider/containergroup_to_pod_test.go +++ b/pkg/provider/containergroup_to_pod_test.go @@ -29,7 +29,7 @@ func TestContainerGroupToPodStatus(t *testing.T) { defer mockCtrl.Finish() provider, err := createTestProvider(createNewACIMock(), NewMockConfigMapLister(mockCtrl), - NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl)) + NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl), nil) if err != nil { t.Fatal("failed to create the test provider", err) } diff --git a/pkg/provider/podsTracker.go b/pkg/provider/podsTracker.go index 1c104b25..eed82056 100644 --- a/pkg/provider/podsTracker.go +++ b/pkg/provider/podsTracker.go @@ -14,7 +14,10 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" corev1listers "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/tools/record/util" ) const ( @@ -35,6 +38,7 @@ type PodIdentifier struct { type PodsTrackerHandler interface { ListActivePods(ctx context.Context) ([]PodIdentifier, error) FetchPodStatus(ctx context.Context, ns, name string) (*v1.PodStatus, error) + FetchPodEvents(ctx context.Context, pod *v1.Pod, evtSink func(timestamp *time.Time, object runtime.Object, eventtype, reason, messageFmt string, args ...interface{})) error CleanupPod(ctx context.Context, ns, name string) error } @@ -42,6 +46,9 @@ type PodsTracker struct { pods corev1listers.PodLister updateCb func(*v1.Pod) handler PodsTrackerHandler + + lastEventCheck time.Time + eventRecorder record.EventRecorder } // StartTracking starts the background tracking for created pods. @@ -148,6 +155,23 @@ func (pt *PodsTracker) processPodUpdates(ctx context.Context, pod *v1.Pod) bool ctx, span := trace.StartSpan(ctx, "PodsTracker.processPodUpdates") defer span.End() + lastEventCheck := pt.lastEventCheck + err := pt.handler.FetchPodEvents(ctx, pod, func(timestamp *time.Time, object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) { + if timestamp == nil || timestamp.After(pt.lastEventCheck) { + if !util.ValidateEventType(eventtype) { + eventtype = v1.EventTypeWarning + } + if timestamp != nil && timestamp.After(lastEventCheck) { + lastEventCheck = *timestamp + } + pt.eventRecorder.Eventf(object, eventtype, reason, messageFmt, args...) + } + }) + pt.lastEventCheck = lastEventCheck + if err != nil { + log.G(ctx).WithError(err).Warnf("cannot fetch aci events for pod %s in namespace %s", pod.Name, pod.Namespace) + } + if pt.shouldSkipPodStatusUpdate(pod) { log.G(ctx).Infof("pod %s will skip pod status update", pod.Name) return false diff --git a/pkg/provider/podsTracker_test.go b/pkg/provider/podsTracker_test.go index 5e4595ef..c6c2dc72 100644 --- a/pkg/provider/podsTracker_test.go +++ b/pkg/provider/podsTracker_test.go @@ -20,6 +20,7 @@ import ( is "gotest.tools/assert/cmp" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/client-go/tools/record" ) func TestUpdatePodStatus(t *testing.T) { @@ -80,7 +81,7 @@ func TestProcessPodUpdates(t *testing.T) { aciMocks := createNewACIMock() aciProvider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), - NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl)) + NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl), nil) if err != nil { t.Fatal("failed to create the test provider", err) } @@ -131,6 +132,12 @@ func TestProcessPodUpdates(t *testing.T) { t.Run(tc.description, func(t *testing.T) { pod.Status.Phase = tc.podPhase aciMocks.MockGetContainerGroupInfo = tc.getContainerGroupMock + aciMocks.MockGetContainerGroup = func(ctx context.Context, resourceGroup, containerGroupName string) (*azaciv2.ContainerGroup, error) { + if tc.getContainerGroupMock != nil { + return tc.getContainerGroupMock(ctx, resourceGroup, podNamespace, podName, "node") + } + return nil, errdefs.NotFound("cg is not found") + } podLister := NewMockPodLister(mockCtrl) @@ -238,7 +245,7 @@ func TestCleanupDanglingPods(t *testing.T) { mockPodsNamespaceLister := NewMockPodNamespaceLister(mockCtrl) aciProvider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), - NewMockSecretLister(mockCtrl), activePodsLister) + NewMockSecretLister(mockCtrl), activePodsLister, nil) if err != nil { t.Fatal("failed to create the test provider", err) } @@ -278,7 +285,7 @@ func TestUpdatePodsLoop(t *testing.T) { aciMocks := createNewACIMock() aciProvider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), - NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl)) + NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl), nil) if err != nil { t.Fatal("failed to create the test provider", err) } @@ -345,6 +352,9 @@ func TestUpdatePodsLoop(t *testing.T) { return nil, errdefs.NotFound("cg is not found") } } + aciMocks.MockGetContainerGroup = func(ctx context.Context, resourceGroup, containerGroupName string) (*azaciv2.ContainerGroup, error) { + return aciMocks.MockGetContainerGroupInfo(ctx, resourceGroup, podNamespace, podName, "node") + } k8sPodsLister := NewMockPodLister(mockCtrl) k8sPodsLister.EXPECT().List(gomock.Any()).Return(k8sPods, nil) @@ -365,3 +375,109 @@ func TestUpdatePodsLoop(t *testing.T) { }) } } + +func TestFetchPodEvents(t *testing.T) { + podName := "pod-" + uuid.New().String() + podNamespace := "ns-" + uuid.New().String() + + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + aciMocks := createNewACIMock() + + aciProvider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), + NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl), nil) + if err != nil { + t.Fatal("failed to create the test provider", err) + } + + pod := testsutil.CreatePodObj(podName, podNamespace) + container := testsutil.CreateACIContainerObj(runningState, "Initializing", + testsutil.CgCreationTime.Add(time.Second*2), testsutil.CgCreationTime.Add(time.Second*3), + true, true, true) + + cases := []struct { + description string + podPhase v1.PodPhase + getContainerGroupMock func(ctx context.Context, resourceGroup, namespace, name, nodeName string) (*azaciv2.ContainerGroup, error) + events [][]*azaciv2.Event + expectedEvents []string + }{ + { + description: "Pod events are fetched", + podPhase: v1.PodPending, + getContainerGroupMock: func(ctx context.Context, resourceGroup, namespace, name, nodeName string) (*azaciv2.ContainerGroup, error) { + return testsutil.CreateContainerGroupObj(podName, podNamespace, "Succeeded", []*azaciv2.Container{container}, "Succeeded"), nil + }, + events: [][]*azaciv2.Event{ + { + testsutil.CreateContainerGroupEvent(1, time.Unix(1, 0), time.Unix(1, 0), "an event", "evt", "Normal"), + }, + }, + expectedEvents: []string{"evt"}, + }, + { + description: "Unordered single batch of pod events is fetched", + podPhase: v1.PodPending, + getContainerGroupMock: func(ctx context.Context, resourceGroup, namespace, name, nodeName string) (*azaciv2.ContainerGroup, error) { + return testsutil.CreateContainerGroupObj(podName, podNamespace, "Succeeded", []*azaciv2.Container{container}, "Succeeded"), nil + }, + events: [][]*azaciv2.Event{ + { + testsutil.CreateContainerGroupEvent(1, time.Unix(2, 0), time.Unix(2, 0), "first event", "evt1", "Normal"), + testsutil.CreateContainerGroupEvent(1, time.Unix(1, 0), time.Unix(1, 0), "second event", "evt2", "Normal"), + }, + }, + expectedEvents: []string{"evt1", "evt2"}, + }, + { + description: "Past events are skipped", + podPhase: v1.PodPending, + getContainerGroupMock: func(ctx context.Context, resourceGroup, namespace, name, nodeName string) (*azaciv2.ContainerGroup, error) { + return testsutil.CreateContainerGroupObj(podName, podNamespace, "Succeeded", []*azaciv2.Container{container}, "Succeeded"), nil + }, + events: [][]*azaciv2.Event{ + { + testsutil.CreateContainerGroupEvent(1, time.Unix(2, 0), time.Unix(2, 0), "first event", "evt1", "Normal"), + }, + { + testsutil.CreateContainerGroupEvent(1, time.Unix(1, 0), time.Unix(1, 0), "past event", "evt2", "Normal"), + testsutil.CreateContainerGroupEvent(1, time.Unix(3, 0), time.Unix(3, 0), "next event", "evt3", "Normal"), + }, + }, + expectedEvents: []string{"evt1", "evt3"}, + }, + } + + for _, tc := range cases { + t.Run(tc.description, func(t *testing.T) { + aciMocks.MockGetContainerGroupInfo = tc.getContainerGroupMock + aciMocks.MockGetContainerGroup = func(ctx context.Context, resourceGroup, containerGroupName string) (*azaciv2.ContainerGroup, error) { + return tc.getContainerGroupMock(ctx, resourceGroup, podNamespace, podName, "node") + } + + podLister := NewMockPodLister(mockCtrl) + eventRecorder := record.NewFakeRecorder(2) + + podsTracker := &PodsTracker{ + pods: podLister, + updateCb: func(p *v1.Pod) {}, + handler: aciProvider, + eventRecorder: eventRecorder, + } + + for _, evts := range tc.events { + container.Properties.InstanceView.Events = evts + podsTracker.processPodUpdates(context.Background(), pod) + } + + close(eventRecorder.Events) + assert.Equal(t, len(tc.expectedEvents), len(eventRecorder.Events)) + i := 0 + for evt := range eventRecorder.Events { + strings.Contains(evt, tc.expectedEvents[i]) + i++ + } + }) + } +} diff --git a/pkg/tests/utils.go b/pkg/tests/utils.go index bb5c033e..5d48f3e4 100644 --- a/pkg/tests/utils.go +++ b/pkg/tests/utils.go @@ -370,3 +370,14 @@ func CreatePodsList(podNames []string, podNameSpace string) []*corev1.Pod { } return result } + +func CreateContainerGroupEvent(count int32, firstTimestamp time.Time, lastTimestamp time.Time, message, name, eventType string) *azaciv2.Event { + return &azaciv2.Event{ + Count: &count, + FirstTimestamp: &firstTimestamp, + LastTimestamp: &lastTimestamp, + Message: &message, + Name: &name, + Type: &eventType, + } +}