Skip to content

Commit

Permalink
Record ACI events back to kubernetes (virtual-kubelet#470)
Browse files Browse the repository at this point in the history
Signed-off-by: JACQUES Francois <Francois.JACQUES@murex.com>
  • Loading branch information
JACQUES Francois authored and JACQUES Francois committed Apr 27, 2023
1 parent b7c2ace commit f9494c3
Show file tree
Hide file tree
Showing 12 changed files with 356 additions and 49 deletions.
13 changes: 6 additions & 7 deletions cmd/virtual-kubelet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,13 +198,12 @@ func main() {
}
return nil
}

k8sClient, err := nodeutil.ClientsetFromEnv(kubeConfigPath)
if err != nil {
log.G(ctx).Fatal(err)
}
withClient := func(cfg *nodeutil.NodeConfig) error {
client, err := nodeutil.ClientsetFromEnv(kubeConfigPath)
if err != nil {
return err
}
return nodeutil.WithClient(client)(cfg)
return nodeutil.WithClient(k8sClient)(cfg)
}

run := func(ctx context.Context) error {
Expand All @@ -223,7 +222,7 @@ func main() {
}
p, err := azproviderv2.NewACIProvider(ctx, cfgPath, azConfig, azACIAPIs, cfg,
nodeName, operatingSystem, os.Getenv("VKUBELET_POD_IP"),
int32(listenPort), clusterDomain)
int32(listenPort), clusterDomain, k8sClient)
p.ConfigureNode(ctx, cfg.Node)
return p, nil, err
},
Expand Down
9 changes: 8 additions & 1 deletion pkg/featureflag/feature_flag.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@ import (
const (
InitContainerFeature = "init-container"
ConfidentialComputeFeature = "confidential-compute"

// Events : support ACI to K8s event translation and broadcasting
Events = "events"
)

var enabledFeatures = []string{
InitContainerFeature,
ConfidentialComputeFeature,
Events,
}

type FlagIdentifier struct {
Expand All @@ -30,6 +34,8 @@ func InitFeatureFlag(ctx context.Context) *FlagIdentifier {
var featureFlags FlagIdentifier
featureFlags.enabledFeatures = enabledFeatures

log.G(ctx).Infof("features %v enabled", enabledFeatures)

return &featureFlags
}

Expand All @@ -42,9 +48,10 @@ func (fi *FlagIdentifier) IsEnabled(ctx context.Context, feature string) bool {
}
for _, feat := range fi.enabledFeatures {
if feat == feature {
log.G(ctx).Infof("feature %s is enabled", feature)
log.G(ctx).Debugf("feature %s is enabled", feature)
return true
}
}
log.G(ctx).Debugf("feature %s is disabled", feature)
return false
}
5 changes: 5 additions & 0 deletions pkg/featureflag/feature_flag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ func TestIsEnabled(t *testing.T) {
feature: ConfidentialComputeFeature,
shouldEnabled: true,
},
{
description: fmt.Sprintf(" %s feature should be enabled", Events),
feature: Events,
shouldEnabled: true,
},
}
for i, tc := range cases {
t.Run(tc.description, func(t *testing.T) {
Expand Down
64 changes: 60 additions & 4 deletions pkg/provider/aci.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"fmt"
"io"
"os"
"path"
"reflect"
"strings"
"time"
Expand All @@ -33,10 +34,16 @@ import (
"github.com/virtual-kubelet/virtual-kubelet/trace"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
corev1listers "k8s.io/client-go/listers/core/v1"

"github.com/cpuguy83/dockercfg"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/reference"
)

const (
Expand Down Expand Up @@ -80,6 +87,7 @@ type ACIProvider struct {
podsL corev1listers.PodLister
enabledFeatures *featureflag.FlagIdentifier
providerNetwork network.ProviderNetwork
eventRecorder record.EventRecorder

resourceGroup string
region string
Expand Down Expand Up @@ -170,7 +178,7 @@ func isValidACIRegion(region string) bool {
}

// NewACIProvider creates a new ACIProvider.
func NewACIProvider(ctx context.Context, config string, azConfig auth.Config, azAPIs client.AzClientsInterface, pCfg nodeutil.ProviderConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32, clusterDomain string) (*ACIProvider, error) {
func NewACIProvider(ctx context.Context, config string, azConfig auth.Config, azAPIs client.AzClientsInterface, pCfg nodeutil.ProviderConfig, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32, clusterDomain string, kubeClient kubernetes.Interface) (*ACIProvider, error) {
var p ACIProvider
var err error

Expand Down Expand Up @@ -208,6 +216,12 @@ func NewACIProvider(ctx context.Context, config string, azConfig auth.Config, az
if p.providerNetwork.VnetResourceGroup == "" {
p.providerNetwork.VnetResourceGroup = p.resourceGroup
}

providerEB := record.NewBroadcaster()
providerEB.StartLogging(log.G(ctx).Infof)
providerEB.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: kubeClient.CoreV1().Events(v1.NamespaceAll)})
p.eventRecorder = providerEB.NewRecorder(scheme.Scheme, v1.EventSource{Component: path.Join(nodeName, "pod-controller")})

// If the log analytics file has been specified, load workspace credentials from the file
if logAnalyticsAuthFile := os.Getenv("LOG_ANALYTICS_AUTH_LOCATION"); logAnalyticsAuthFile != "" {
p.diagnostics, err = analytics.NewContainerGroupDiagnosticsFromFile(logAnalyticsAuthFile)
Expand Down Expand Up @@ -764,9 +778,11 @@ func (p *ACIProvider) NotifyPods(ctx context.Context, notifierCb func(*v1.Pod))

// Capture the notifier to be used for communicating updates to VK
p.tracker = &PodsTracker{
pods: p.podsL,
updateCb: notifierCb,
handler: p,
pods: p.podsL,
updateCb: notifierCb,
handler: p,
lastEventCheck: time.UnixMicro(0),
eventRecorder: p.eventRecorder,
}

go p.tracker.StartTracking(ctx)
Expand Down Expand Up @@ -803,6 +819,46 @@ func (p *ACIProvider) FetchPodStatus(ctx context.Context, ns, name string) (*v1.
return p.GetPodStatus(ctx, ns, name)
}

func (p *ACIProvider) FetchPodEvents(ctx context.Context, pod *v1.Pod, evtSink func(timestamp *time.Time, object runtime.Object, eventtype, reason, messageFmt string, args ...interface{})) error {
ctx, span := trace.StartSpan(ctx, "ACIProvider.FetchPodEvents")
defer span.End()
if !p.enabledFeatures.IsEnabled(ctx, featureflag.Events) {
return nil
}

ctx = addAzureAttributes(ctx, span, p)
cgName := containerGroupName(pod.Namespace, pod.Name)
cg, err := p.azClientsAPIs.GetContainerGroup(ctx, p.resourceGroup, cgName)
if err != nil {
return err
}
if *cg.Tags["NodeName"] != p.nodeName {
return errors.Wrapf(err, "container group %s found with mismatching node", cgName)
}

if cg.Properties != nil && cg.Properties.InstanceView != nil && cg.Properties.InstanceView.Events != nil {
for _, evt := range cg.Properties.InstanceView.Events {
evtSink(evt.LastTimestamp, pod, *evt.Type, *evt.Name, *evt.Message)
}
}

if cg.Properties != nil && cg.Properties.Containers != nil {
for _, container := range cg.Properties.Containers {
if container.Properties != nil && container.Properties.InstanceView != nil && container.Properties.InstanceView.Events != nil {
for _, evt := range container.Properties.InstanceView.Events {
podReference, err := reference.GetReference(scheme.Scheme, pod)
if err != nil {
log.G(ctx).WithError(err).Warnf("cannot get k8s object reference from pod %s in namespace %s", pod.Name, pod.Namespace)
}
podReference.FieldPath = fmt.Sprintf("spec.containers{%s}", *container.Name)
evtSink(evt.LastTimestamp, podReference, *evt.Type, *evt.Name, *evt.Message)
}
}
}
}
return nil
}

// CleanupPod interface impl
func (p *ACIProvider) CleanupPod(ctx context.Context, ns, name string) error {
ctx, span := trace.StartSpan(ctx, "ACIProvider.CleanupPod")
Expand Down
2 changes: 1 addition & 1 deletion pkg/provider/aci_confidential_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func TestCreatePodWithConfidentialComputeProperties(t *testing.T) {
ctx := context.TODO()

provider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl),
NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl))
NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl), nil)
if err != nil {
t.Fatal("Unable to create test provider", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/provider/aci_init_container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func TestCreatePodWithInitContainers(t *testing.T) {
ctx := context.TODO()

provider, err := createTestProvider(aciMocks, NewMockConfigMapLister(mockCtrl),
NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl))
NewMockSecretLister(mockCtrl), NewMockPodLister(mockCtrl), nil)
if err != nil {
t.Fatal("Unable to create test provider", err)
}
Expand Down
Loading

0 comments on commit f9494c3

Please sign in to comment.