Skip to content

Commit

Permalink
Record ACI events back to kubernetes (virtual-kubelet#470)
Browse files Browse the repository at this point in the history
Signed-off-by: JACQUES Francois <Francois.JACQUES@murex.com>
  • Loading branch information
JACQUES Francois authored and JACQUES Francois committed Feb 27, 2023
1 parent bfe2b34 commit 1b86dca
Show file tree
Hide file tree
Showing 11 changed files with 225 additions and 29 deletions.
10 changes: 9 additions & 1 deletion cmd/virtual-kubelet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,11 @@ import (
"github.com/virtual-kubelet/node-cli/provider"
"github.com/virtual-kubelet/virtual-kubelet/log"
logruslogger "github.com/virtual-kubelet/virtual-kubelet/log/logrus"
"github.com/virtual-kubelet/virtual-kubelet/node/nodeutil"
"github.com/virtual-kubelet/virtual-kubelet/trace"
"github.com/virtual-kubelet/virtual-kubelet/trace/opencensus"

"k8s.io/client-go/kubernetes"
)

var (
Expand Down Expand Up @@ -73,6 +76,7 @@ func main() {
}

var azACIAPIs *client.AzClientsAPIs
var kubeClient kubernetes.Interface
azConfig := auth.Config{}

if vkVersion {
Expand All @@ -86,14 +90,18 @@ func main() {
if err != nil {
log.G(ctx).Fatal(err)
}
kubeClient, err = nodeutil.ClientsetFromEnv(o.KubeConfigPath)
if err != nil {
log.G(ctx).Fatal(err)
}
}
run := func(ctx context.Context) error {
node, err := cli.New(ctx,
cli.WithBaseOpts(o),
cli.WithCLIVersion(buildVersion, buildTime),
cli.WithProvider("azure", func(cfg provider.InitConfig) (provider.Provider, error) {
if vkVersion {
return azproviderv2.NewACIProvider(ctx, cfg.ConfigPath, azConfig, azACIAPIs, cfg.ResourceManager, cfg.NodeName, cfg.OperatingSystem, cfg.InternalIP, cfg.DaemonPort, cfg.KubeClusterDomain)
return azproviderv2.NewACIProvider(ctx, cfg.ConfigPath, azConfig, azACIAPIs, cfg.ResourceManager, cfg.NodeName, cfg.OperatingSystem, cfg.InternalIP, cfg.DaemonPort, cfg.KubeClusterDomain, kubeClient)
} else {
return azproviderv1.NewACIProvider(cfg.ConfigPath, cfg.ResourceManager, cfg.NodeName, cfg.OperatingSystem, cfg.InternalIP, cfg.DaemonPort, cfg.KubeClusterDomain)
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ require (
github.com/cespare/xxhash/v2 v2.1.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/docker/spdystream v0.0.0-20170912183627-bc6354cbbc29 // indirect
github.com/evanphx/json-patch v4.9.0+incompatible // indirect
github.com/go-logr/logr v0.3.0 // indirect
github.com/go-openapi/jsonpointer v0.19.3 // indirect
github.com/go-openapi/jsonreference v0.19.3 // indirect
Expand Down
6 changes: 5 additions & 1 deletion pkg/featureflag/feature_flag.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,17 @@ import (
)

const (
InitContainerFeature = "init-container"
InitContainerFeature = "init-container"
ConfidentialComputeFeature = "confidential-compute"

// Events : support ACI to K8s event translation and broadcasting
Events = "events"
)

var enabledFeatures = []string{
InitContainerFeature,
ConfidentialComputeFeature,
Events,
}

type FlagIdentifier struct {
Expand Down
63 changes: 59 additions & 4 deletions pkg/provider/aci.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,13 @@ import (
"github.com/virtual-kubelet/virtual-kubelet/trace"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/reference"
)

const (
Expand Down Expand Up @@ -75,6 +81,7 @@ type ACIProvider struct {
containerGroupExtensions []*azaciv2.DeploymentExtensionSpec
enabledFeatures *featureflag.FlagIdentifier
providernetwork network.ProviderNetwork
eventRecorder record.EventRecorder

resourceGroup string
region string
Expand Down Expand Up @@ -165,7 +172,7 @@ func isValidACIRegion(region string) bool {
}

// NewACIProvider creates a new ACIProvider.
func NewACIProvider(ctx context.Context, config string, azConfig auth.Config, azAPIs client.AzClientsInterface, rm *manager.ResourceManager, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32, clusterDomain string) (*ACIProvider, error) {
func NewACIProvider(ctx context.Context, config string, azConfig auth.Config, azAPIs client.AzClientsInterface, rm *manager.ResourceManager, nodeName, operatingSystem string, internalIP string, daemonEndpointPort int32, clusterDomain string, kubeClient kubernetes.Interface) (*ACIProvider, error) {
var p ACIProvider
var err error

Expand Down Expand Up @@ -198,6 +205,11 @@ func NewACIProvider(ctx context.Context, config string, azConfig auth.Config, az
p.providernetwork.VnetResourceGroup = azConfig.AKSCredential.VNetResourceGroup
}

providerEB := record.NewBroadcaster()
providerEB.StartLogging(log.G(ctx).Infof)
providerEB.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: kubeClient.CoreV1().Events(v1.NamespaceAll)})
p.eventRecorder = providerEB.NewRecorder(scheme.Scheme, v1.EventSource{Component: "virtual kubelet"})

if p.providernetwork.VnetResourceGroup == "" {
p.providernetwork.VnetResourceGroup = p.resourceGroup
}
Expand Down Expand Up @@ -714,9 +726,11 @@ func (p *ACIProvider) NotifyPods(ctx context.Context, notifierCb func(*v1.Pod))

// Capture the notifier to be used for communicating updates to VK
p.tracker = &PodsTracker{
rm: p.resourceManager,
updateCb: notifierCb,
handler: p,
rm: p.resourceManager,
updateCb: notifierCb,
handler: p,
lastEventCheck: time.UnixMicro(0),
eventRecorder: p.eventRecorder,
}

go p.tracker.StartTracking(ctx)
Expand Down Expand Up @@ -753,6 +767,47 @@ func (p *ACIProvider) FetchPodStatus(ctx context.Context, ns, name string) (*v1.
return p.GetPodStatus(ctx, ns, name)
}

func (p *ACIProvider) FetchPodEvents(ctx context.Context, pod *v1.Pod, evtSink func(timestamp *time.Time, object runtime.Object, eventtype, reason, messageFmt string, args ...interface{})) error {
ctx, span := trace.StartSpan(ctx, "ACIProvider.FetchPodEvents")
defer span.End()
if !p.enabledFeatures.IsEnabled(ctx, featureflag.Events) {
return nil
}

ctx = addAzureAttributes(ctx, span, p)
cgName := containerGroupName(pod.Namespace, pod.Name)
cg, err := p.azClientsAPIs.GetContainerGroup(ctx, p.resourceGroup, cgName)
if err != nil {
log.G(ctx).Errorf("error getting container group %s", cgName)
return err
}
if *cg.Tags["NodeName"] != p.nodeName {
return errors.Wrapf(err, "container group %s found with mismatching node", cgName)
}

if cg.Properties != nil && cg.Properties.InstanceView != nil && cg.Properties.InstanceView.Events != nil {
for _, evt := range cg.Properties.InstanceView.Events {
evtSink(evt.LastTimestamp, pod, *evt.Type, *evt.Name, *evt.Message)
}
}

if cg.Properties != nil && cg.Properties.Containers != nil {
for _, container := range cg.Properties.Containers {
if container.Properties != nil && container.Properties.InstanceView != nil && container.Properties.InstanceView.Events != nil {
for _, evt := range container.Properties.InstanceView.Events {
podReference, err := reference.GetReference(scheme.Scheme, pod)
if err != nil {
log.G(ctx).WithError(err).Warnf("cannot get k8s object reference from pod %s in namespace %s", pod.Name, pod.Namespace)
}
podReference.FieldPath = fmt.Sprintf("spec.containers{%s}", *container.Name)
evtSink(evt.LastTimestamp, podReference, *evt.Type, *evt.Name, *evt.Message)
}
}
}
}
return nil
}

// CleanupPod interface impl
func (p *ACIProvider) CleanupPod(ctx context.Context, ns, name string) error {
ctx, span := trace.StartSpan(ctx, "ACIProvider.CleanupPod")
Expand Down
2 changes: 1 addition & 1 deletion pkg/provider/aci_confidential_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func TestCreatePodWithConfidentialComputeProperties(t *testing.T) {
t.Fatal("Unable to prepare the mocks for resourceManager", err)
}

provider, err := createTestProvider(aciMocks, resourceManager)
provider, err := createTestProvider(aciMocks, resourceManager, nil)
if err != nil {
t.Fatal("Unable to create test provider", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/provider/aci_init_container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func TestCreatePodWithInitContainers(t *testing.T) {
t.Fatal("Unable to prepare the mocks for resourceManager", err)
}

provider, err := createTestProvider(aciMocks, resourceManager)
provider, err := createTestProvider(aciMocks, resourceManager, nil)
if err != nil {
t.Fatal("Unable to create test provider", err)
}
Expand Down
Loading

0 comments on commit 1b86dca

Please sign in to comment.