diff --git a/cmd/virtual-kubelet/main.go b/cmd/virtual-kubelet/main.go index 18001601..9cf20039 100644 --- a/cmd/virtual-kubelet/main.go +++ b/cmd/virtual-kubelet/main.go @@ -198,13 +198,12 @@ func main() { } return nil } - + k8sClient, err := nodeutil.ClientsetFromEnv(kubeConfigPath) + if err != nil { + log.G(ctx).Fatal(err) + } withClient := func(cfg *nodeutil.NodeConfig) error { - client, err := nodeutil.ClientsetFromEnv(kubeConfigPath) - if err != nil { - return err - } - return nodeutil.WithClient(client)(cfg) + return nodeutil.WithClient(k8sClient)(cfg) } run := func(ctx context.Context) error { @@ -223,7 +222,7 @@ func main() { } p, err := azproviderv2.NewACIProvider(ctx, cfgPath, azConfig, azACIAPIs, cfg, nodeName, operatingSystem, os.Getenv("VKUBELET_POD_IP"), - int32(listenPort), clusterDomain) + int32(listenPort), clusterDomain, k8sClient) p.ConfigureNode(ctx, cfg.Node) return p, nil, err }, diff --git a/pkg/featureflag/feature_flag.go b/pkg/featureflag/feature_flag.go index 91f70618..7b7fc3db 100644 --- a/pkg/featureflag/feature_flag.go +++ b/pkg/featureflag/feature_flag.go @@ -13,11 +13,15 @@ import ( const ( InitContainerFeature = "init-container" ConfidentialComputeFeature = "confidential-compute" + + // Events : support ACI to K8s event translation and broadcasting + Events = "events" ) var enabledFeatures = []string{ InitContainerFeature, ConfidentialComputeFeature, + Events, } type FlagIdentifier struct { @@ -30,6 +34,8 @@ func InitFeatureFlag(ctx context.Context) *FlagIdentifier { var featureFlags FlagIdentifier featureFlags.enabledFeatures = enabledFeatures + log.G(ctx).Infof("features %v enabled", enabledFeatures) + return &featureFlags } @@ -42,9 +48,10 @@ func (fi *FlagIdentifier) IsEnabled(ctx context.Context, feature string) bool { } for _, feat := range fi.enabledFeatures { if feat == feature { - log.G(ctx).Infof("feature %s is enabled", feature) + log.G(ctx).Debugf("feature %s is enabled", feature) return true } } + log.G(ctx).Debugf("feature %s is disabled", feature) return false } diff --git a/pkg/featureflag/feature_flag_test.go b/pkg/featureflag/feature_flag_test.go index 157b034b..1fc52793 100644 --- a/pkg/featureflag/feature_flag_test.go +++ b/pkg/featureflag/feature_flag_test.go @@ -29,6 +29,11 @@ func TestIsEnabled(t *testing.T) { feature: ConfidentialComputeFeature, shouldEnabled: true, }, + { + description: fmt.Sprintf(" %s feature should be enabled", Events), + feature: Events, + shouldEnabled: true, + }, } for i, tc := range cases { t.Run(tc.description, func(t *testing.T) { diff --git a/pkg/provider/aci.go b/pkg/provider/aci.go index 74943766..e7ac9e6c 100644 --- a/pkg/provider/aci.go +++ b/pkg/provider/aci.go @@ -11,6 +11,7 @@ import ( "fmt" "io" "os" + "path" "reflect" "strings" "time" @@ -33,10 +34,16 @@ import ( "github.com/virtual-kubelet/virtual-kubelet/trace" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/intstr" corev1listers "k8s.io/client-go/listers/core/v1" "github.com/cpuguy83/dockercfg" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + corev1client "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/tools/reference" ) const ( @@ -80,6 +87,7 @@ type ACIProvider struct { podsL corev1listers.PodLister enabledFeatures *featureflag.FlagIdentifier providernetwork network.ProviderNetwork + eventRecorder record.EventRecorder resourceGroup string region string @@ -170,7 +178,7 @@ func isValidACIRegion(region string) bool { } // NewACIProvider creates a new ACIProvider. -func NewACIProvider(ctx context.Context, config string, azConfig auth.Config, azAPIs client.AzClientsInterface, pCfg nodeutil.ProviderConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32, clusterDomain string) (*ACIProvider, error) { +func NewACIProvider(ctx context.Context, config string, azConfig auth.Config, azAPIs client.AzClientsInterface, pCfg nodeutil.ProviderConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32, clusterDomain string, kubeClient kubernetes.Interface) (*ACIProvider, error) { var p ACIProvider var err error @@ -205,6 +213,11 @@ func NewACIProvider(ctx context.Context, config string, azConfig auth.Config, az p.providernetwork.VnetResourceGroup = azConfig.AKSCredential.VNetResourceGroup } + providerEB := record.NewBroadcaster() + providerEB.StartLogging(log.G(ctx).Infof) + providerEB.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: kubeClient.CoreV1().Events(v1.NamespaceAll)}) + p.eventRecorder = providerEB.NewRecorder(scheme.Scheme, v1.EventSource{Component: path.Join(nodeName, "pod-controller")}) + if p.providernetwork.VnetResourceGroup == "" { p.providernetwork.VnetResourceGroup = p.resourceGroup } @@ -758,9 +771,11 @@ func (p *ACIProvider) NotifyPods(ctx context.Context, notifierCb func(*v1.Pod)) // Capture the notifier to be used for communicating updates to VK p.tracker = &PodsTracker{ - pods: p.podsL, - updateCb: notifierCb, - handler: p, + pods: p.podsL, + updateCb: notifierCb, + handler: p, + lastEventCheck: time.UnixMicro(0), + eventRecorder: p.eventRecorder, } go p.tracker.StartTracking(ctx) @@ -797,6 +812,46 @@ func (p *ACIProvider) FetchPodStatus(ctx context.Context, ns, name string) (*v1. return p.GetPodStatus(ctx, ns, name) } +func (p *ACIProvider) FetchPodEvents(ctx context.Context, pod *v1.Pod, evtSink func(timestamp *time.Time, object runtime.Object, eventtype, reason, messageFmt string, args ...interface{})) error { + ctx, span := trace.StartSpan(ctx, "ACIProvider.FetchPodEvents") + defer span.End() + if !p.enabledFeatures.IsEnabled(ctx, featureflag.Events) { + return nil + } + + ctx = addAzureAttributes(ctx, span, p) + cgName := containerGroupName(pod.Namespace, pod.Name) + cg, err := p.azClientsAPIs.GetContainerGroup(ctx, p.resourceGroup, cgName) + if err != nil { + return err + } + if *cg.Tags["NodeName"] != p.nodeName { + return errors.Wrapf(err, "container group %s found with mismatching node", cgName) + } + + if cg.Properties != nil && cg.Properties.InstanceView != nil && cg.Properties.InstanceView.Events != nil { + for _, evt := range cg.Properties.InstanceView.Events { + evtSink(evt.LastTimestamp, pod, *evt.Type, *evt.Name, *evt.Message) + } + } + + if cg.Properties != nil && cg.Properties.Containers != nil { + for _, container := range cg.Properties.Containers { + if container.Properties != nil && container.Properties.InstanceView != nil && container.Properties.InstanceView.Events != nil { + for _, evt := range container.Properties.InstanceView.Events { + podReference, err := reference.GetReference(scheme.Scheme, pod) + if err != nil { + log.G(ctx).WithError(err).Warnf("cannot get k8s object reference from pod %s in namespace %s", pod.Name, pod.Namespace) + } + podReference.FieldPath = fmt.Sprintf("spec.containers{%s}", *container.Name) + evtSink(evt.LastTimestamp, podReference, *evt.Type, *evt.Name, *evt.Message) + } + } + } + } + return nil +} + // CleanupPod interface impl func (p *ACIProvider) CleanupPod(ctx context.Context, ns, name string) error { ctx, span := trace.StartSpan(ctx, "ACIProvider.CleanupPod") diff --git a/pkg/provider/aci_confidential_test.go b/pkg/provider/aci_confidential_test.go index 68afddcf..f22257c0 100644 --- a/pkg/provider/aci_confidential_test.go +++ b/pkg/provider/aci_confidential_test.go @@ -123,7 +123,7 @@ func TestCreatePodWithConfidentialComputeProperties(t *testing.T) { ctx := context.TODO() provider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), - NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl)) + NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl), nil) if err != nil { t.Fatal("Unable to create test provider", err) } diff --git a/pkg/provider/aci_init_container_test.go b/pkg/provider/aci_init_container_test.go index bbc4e6ea..5d3c15b8 100644 --- a/pkg/provider/aci_init_container_test.go +++ b/pkg/provider/aci_init_container_test.go @@ -183,7 +183,7 @@ func TestCreatePodWithInitContainers(t *testing.T) { ctx := context.TODO() provider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), - NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl)) + NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl), nil) if err != nil { t.Fatal("Unable to create test provider", err) } diff --git a/pkg/provider/aci_test.go b/pkg/provider/aci_test.go index 2d12faf4..162cb6b5 100644 --- a/pkg/provider/aci_test.go +++ b/pkg/provider/aci_test.go @@ -31,6 +31,10 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/record" ) const ( @@ -199,7 +203,7 @@ func TestCreatePodWithoutResourceSpec(t *testing.T) { return nil } provider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), - NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl)) + NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl), nil) if err != nil { t.Fatal("failed to create the test provider", err) } @@ -249,7 +253,7 @@ func TestCreatePodWithWindowsOS(t *testing.T) { return nil } provider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), - NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl)) + NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl), nil) if err != nil { t.Fatal("failed to create the test provider", err) } @@ -318,7 +322,7 @@ func TestCreatePodWithResourceRequestOnly(t *testing.T) { } provider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), - NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl)) + NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl), nil) if err != nil { t.Fatal("failed to create the test provider", err) } @@ -375,7 +379,7 @@ func TestCreatePodWithGPU(t *testing.T) { } provider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), - NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl)) + NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl), nil) if err != nil { t.Fatal("failed to create the test provider", err) } @@ -413,7 +417,7 @@ func TestCreatePodWithGPUSKU(t *testing.T) { } provider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), - NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl)) + NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl), nil) if err != nil { t.Fatal("failed to create the test provider", err) } @@ -476,7 +480,7 @@ func TestCreatePodWithResourceRequestAndLimit(t *testing.T) { pod := testsutil.CreatePodObj(podName, podNamespace) provider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), - NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl)) + NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl), nil) if err != nil { t.Fatal("failed to create the test provider", err) } @@ -499,7 +503,7 @@ func TestGetPodsWithEmptyList(t *testing.T) { } provider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), - NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl)) + NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl), nil) if err != nil { t.Fatal("failed to create the test provider", err) } @@ -570,7 +574,7 @@ func TestGetPodsWithoutResourceRequestsLimits(t *testing.T) { } provider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), - NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl)) + NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl), nil) if err != nil { t.Fatal("failed to create the test provider", err) } @@ -621,7 +625,7 @@ func TestGetPodWithoutResourceRequestsLimits(t *testing.T) { return result, nil } provider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), - NewMockSecretLister(mockCtrl), podLister) + NewMockSecretLister(mockCtrl), podLister, nil) if err != nil { t.Fatal("failed to create the test provider", err) } @@ -708,7 +712,7 @@ func createNewACIMock() *MockACIProvider { }) } -func createTestProvider(aciMocks *MockACIProvider, configMapMocker *MockConfigMapLister, secretMocker *MockSecretLister, podMocker *MockPodLister) (*ACIProvider, error) { +func createTestProvider(aciMocks *MockACIProvider, configMapMocker *MockConfigMapLister, secretMocker *MockSecretLister, podMocker *MockPodLister, kubeClient kubernetes.Interface) (*ACIProvider, error) { ctx := context.TODO() err := setAuthConfig() @@ -716,6 +720,10 @@ func createTestProvider(aciMocks *MockACIProvider, configMapMocker *MockConfigMa return nil, err } + if kubeClient == nil { + kubeClient = fake.NewSimpleClientset() + } + err = os.Setenv("ACI_VNET_NAME", fakeVnetName) if err != nil { return nil, err @@ -750,7 +758,7 @@ func createTestProvider(aciMocks *MockACIProvider, configMapMocker *MockConfigMa cfg.Node.Name = fakeNodeName cfg.Node.Status.NodeInfo.OperatingSystem = operatingSystem - provider, err := NewACIProvider(ctx, "example.toml", azConfig, aciMocks, cfg, fakeNodeName, operatingSystem, "0.0.0.0", 10250, "cluster.local") + provider, err := NewACIProvider(ctx, "example.toml", azConfig, aciMocks, cfg, fakeNodeName, operatingSystem, "0.0.0.0", 10250, "cluster.local", kubeClient) if err != nil { return nil, err } @@ -785,7 +793,7 @@ func TestConfigureNode(t *testing.T) { } aciMocks := createNewACIMock() provider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), - NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl)) + NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl), nil) if err != nil { t.Fatal("failed to create the test provider", err) } @@ -821,7 +829,7 @@ func TestCreatePodWithNamedLivenessProbe(t *testing.T) { pod := testsutil.CreatePodObj(podName, podNamespace) provider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), - NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl)) + NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl), nil) if err != nil { t.Fatal("failed to create the test provider", err) } @@ -858,7 +866,7 @@ func TestCreatePodWithLivenessProbe(t *testing.T) { pod := testsutil.CreatePodObj(podName, podNamespace) provider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), - NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl)) + NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl), nil) if err != nil { t.Fatal("failed to create the test provider", err) } @@ -975,7 +983,7 @@ func TestCreatePodWithReadinessProbe(t *testing.T) { pod := testsutil.CreatePodObj(podName, podNamespace) provider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), - NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl)) + NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl), nil) if err != nil { t.Fatal("failed to create the test provider", err) } @@ -1109,7 +1117,7 @@ func TestCreatedPodWithContainerPort(t *testing.T) { } provider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), - NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl)) + NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl), nil) if err != nil { t.Fatal("Unable to create test provider", err) } @@ -1149,7 +1157,7 @@ func TestGetPodWithContainerID(t *testing.T) { } provider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), - NewMockSecretLister(mockCtrl), podLister) + NewMockSecretLister(mockCtrl), podLister, nil) if err != nil { t.Fatal("failed to create the test provider", err) } @@ -1314,7 +1322,7 @@ func TestDeleteContainerGroup(t *testing.T) { } provider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), - NewMockSecretLister(mockCtrl), podLister) + NewMockSecretLister(mockCtrl), podLister, nil) if err != nil { t.Fatal("failed to create the test provider", err) } @@ -1860,3 +1868,84 @@ func TestGetGPUSKU(t *testing.T) { }) } } + +func TestFetchStandardPodsEvents(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + }, + Spec: v1.PodSpec{}, + } + + cg := testsutil.CreateContainerGroupObj(containerGroupName(podNamespace, podNamespace), "", "Succeeded", + testsutil.CreateACIContainersListObj(runningState, "Initializing", + testsutil.CgCreationTime.Add(time.Second*2), + testsutil.CgCreationTime.Add(time.Second*3), + false, false, false), "Succeeded") + now := time.Now().Add(-10 * time.Second) + cg.Properties.InstanceView.Events = []*azaciv2.Event{ + testsutil.CreateContainerGroupEvent(1, now, now, "mount failed", "cg event", "Warning"), + } + cg.Properties.Containers[0].Properties.InstanceView.Events = []*azaciv2.Event{ + testsutil.CreateContainerGroupEvent(1, now, now, "container started", "cg container event", "Normal"), + } + + aciMocks := createNewACIMock() + aciMocks.MockGetContainerGroup = func(ctx context.Context, resourceGroup, containerGroupName string) (*azaciv2.ContainerGroup, error) { + return cg, nil + } + aciMocks.MockGetContainerGroupInfo = func(ctx context.Context, resourceGroup, namespace, name, nodeName string) (*azaciv2.ContainerGroup, error) { + return aciMocks.MockGetContainerGroup(ctx, resourceGroup, containerGroupName(name, namespace)) + } + podLister := NewMockPodLister(mockCtrl) + podLister.EXPECT().List(labels.Everything()).Times(2).Return([]*v1.Pod{pod}, nil) + + provider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), + NewMockSecretLister(mockCtrl), podLister, nil) + if err != nil { + t.Fatal("failed to create the test provider", err) + } + + podsTracker := &PodsTracker{ + pods: podLister, + updateCb: func(pod *v1.Pod) {}, + handler: provider, + lastEventCheck: time.UnixMicro(0), + } + + fakeRecorder := record.NewFakeRecorder(2) + podsTracker.eventRecorder = fakeRecorder + podsTracker.updatePodsLoop(context.Background()) + close(fakeRecorder.Events) + broadcastEvents := make([]string, 0) + for evt := range fakeRecorder.Events { + broadcastEvents = append(broadcastEvents, evt) + } + assert.DeepEqual(t, []string{ + "Warning cg event mount failed", + "Normal cg container event container started", + }, broadcastEvents) + + fakeRecorder = record.NewFakeRecorder(2) + podsTracker.eventRecorder = fakeRecorder + now = time.Now().Add(10 * time.Second) + cg.Properties.InstanceView.Events = []*azaciv2.Event{ + testsutil.CreateContainerGroupEvent(1, now, now, "mount failed 2", "cg event", "InvalidType"), + } + cg.Properties.Containers[0].Properties.InstanceView.Events = []*azaciv2.Event{ + testsutil.CreateContainerGroupEvent(1, now, now, "container started 2", "cg container event", "Normal"), + } + podsTracker.updatePodsLoop(context.Background()) + close(fakeRecorder.Events) + broadcastEvents = broadcastEvents[0:0] + for evt := range fakeRecorder.Events { + broadcastEvents = append(broadcastEvents, evt) + } + assert.DeepEqual(t, []string{ + "Warning cg event mount failed 2", + "Normal cg container event container started 2", + }, broadcastEvents) +} diff --git a/pkg/provider/aci_volumes_test.go b/pkg/provider/aci_volumes_test.go index 21b6abc8..bd633eca 100644 --- a/pkg/provider/aci_volumes_test.go +++ b/pkg/provider/aci_volumes_test.go @@ -44,7 +44,7 @@ func TestCreatedPodWithAzureFilesVolume(t *testing.T) { aciMocks := createNewACIMock() provider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), - mockSecretLister, NewMockPodLister(mockCtrl)) + mockSecretLister, NewMockPodLister(mockCtrl), nil) if err != nil { t.Fatal("Unable to create test provider", err) } @@ -306,7 +306,7 @@ func TestCreatePodWithProjectedVolume(t *testing.T) { pod.Spec.Volumes = fakeVolumes provider, err := createTestProvider(aciMocks, configMapLister, - secretLister, NewMockPodLister(mockCtrl)) + secretLister, NewMockPodLister(mockCtrl), nil) if err != nil { t.Fatal("Unable to create test provider", err) } @@ -476,7 +476,7 @@ func TestCreatePodWithCSIVolume(t *testing.T) { pod.Spec.Volumes = tc.volumes provider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), - mockSecretLister, NewMockPodLister(mockCtrl)) + mockSecretLister, NewMockPodLister(mockCtrl), nil) if err != nil { t.Fatal("Unable to create test provider", err) } @@ -588,7 +588,7 @@ func TestGetVolumesForSecretVolume(t *testing.T) { pod.Spec.Volumes = fakePodVolumes provider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl), - mockSecretLister, NewMockPodLister(mockCtrl)) + mockSecretLister, NewMockPodLister(mockCtrl), nil) if err != nil { t.Fatal("Unable to create test provider", err) } @@ -702,7 +702,7 @@ func TestGetVolumesForConfigMapVolume(t *testing.T) { pod.Spec.Volumes = fakePodVolumes provider, err := createTestProvider(aciMocks, mockConfigMapLister, - NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl)) + NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl), nil) if err != nil { t.Fatal("Unable to create test provider", err) } diff --git a/pkg/provider/containergroup_to_pod_test.go b/pkg/provider/containergroup_to_pod_test.go index 18797dc9..0ca66c21 100644 --- a/pkg/provider/containergroup_to_pod_test.go +++ b/pkg/provider/containergroup_to_pod_test.go @@ -29,7 +29,7 @@ func TestContainerGroupToPodStatus(t *testing.T) { defer mockCtrl.Finish() provider, err := createTestProvider(createNewACIMock(), NewMockConfigMapLister(mockCtrl), - NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl)) + NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl), nil) if err != nil { t.Fatal("failed to create the test provider", err) } diff --git a/pkg/provider/podsTracker.go b/pkg/provider/podsTracker.go index 1c104b25..eed82056 100644 --- a/pkg/provider/podsTracker.go +++ b/pkg/provider/podsTracker.go @@ -14,7 +14,10 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" corev1listers "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/tools/record/util" ) const ( @@ -35,6 +38,7 @@ type PodIdentifier struct { type PodsTrackerHandler interface { ListActivePods(ctx context.Context) ([]PodIdentifier, error) FetchPodStatus(ctx context.Context, ns, name string) (*v1.PodStatus, error) + FetchPodEvents(ctx context.Context, pod *v1.Pod, evtSink func(timestamp *time.Time, object runtime.Object, eventtype, reason, messageFmt string, args ...interface{})) error CleanupPod(ctx context.Context, ns, name string) error } @@ -42,6 +46,9 @@ type PodsTracker struct { pods corev1listers.PodLister updateCb func(*v1.Pod) handler PodsTrackerHandler + + lastEventCheck time.Time + eventRecorder record.EventRecorder } // StartTracking starts the background tracking for created pods. @@ -148,6 +155,23 @@ func (pt *PodsTracker) processPodUpdates(ctx context.Context, pod *v1.Pod) bool ctx, span := trace.StartSpan(ctx, "PodsTracker.processPodUpdates") defer span.End() + lastEventCheck := pt.lastEventCheck + err := pt.handler.FetchPodEvents(ctx, pod, func(timestamp *time.Time, object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) { + if timestamp == nil || timestamp.After(pt.lastEventCheck) { + if !util.ValidateEventType(eventtype) { + eventtype = v1.EventTypeWarning + } + if timestamp != nil && timestamp.After(lastEventCheck) { + lastEventCheck = *timestamp + } + pt.eventRecorder.Eventf(object, eventtype, reason, messageFmt, args...) + } + }) + pt.lastEventCheck = lastEventCheck + if err != nil { + log.G(ctx).WithError(err).Warnf("cannot fetch aci events for pod %s in namespace %s", pod.Name, pod.Namespace) + } + if pt.shouldSkipPodStatusUpdate(pod) { log.G(ctx).Infof("pod %s will skip pod status update", pod.Name) return false diff --git a/pkg/tests/utils.go b/pkg/tests/utils.go index bb5c033e..5d48f3e4 100644 --- a/pkg/tests/utils.go +++ b/pkg/tests/utils.go @@ -370,3 +370,14 @@ func CreatePodsList(podNames []string, podNameSpace string) []*corev1.Pod { } return result } + +func CreateContainerGroupEvent(count int32, firstTimestamp time.Time, lastTimestamp time.Time, message, name, eventType string) *azaciv2.Event { + return &azaciv2.Event{ + Count: &count, + FirstTimestamp: &firstTimestamp, + LastTimestamp: &lastTimestamp, + Message: &message, + Name: &name, + Type: &eventType, + } +}