From acd996245564a963f25324c4f96509da29e56b54 Mon Sep 17 00:00:00 2001 From: Naiming Shen Date: Wed, 30 Oct 2024 11:38:26 -0700 Subject: [PATCH] Implement Clustering and App related functions - change VMI to VMI ReplicaSet for kubernetes - change Pod to Pod RelicaSet for containers - change functions handling replicaset names in services - subscribe EdgeNodeInfo in domainmgr, zedmanager to get node-name for cluster - add Designated Node ID to several structs for App - not to delete domain from kubernetes if not a Designated App node - parse config for EdgeNodeClusterConfig in zedagent - handle ENClusterAppStatus publication in zedmanger in multi-node clustering case - zedmanager handling effective-activation include ENClusterAppStatus - kubevirt hypervisor changes to handle VMI/Pod ReplicaSets Signed-off-by: Naiming Shen --- pkg/pillar/base/kubevirt.go | 28 +- pkg/pillar/cipher/cipher.go | 1 + pkg/pillar/cmd/domainmgr/domainmgr.go | 139 ++- pkg/pillar/cmd/zedagent/parseconfig.go | 58 ++ pkg/pillar/cmd/zedagent/zedagent.go | 13 + pkg/pillar/cmd/zedmanager/handleclusterapp.go | 43 + pkg/pillar/cmd/zedmanager/handledomainmgr.go | 9 +- pkg/pillar/cmd/zedmanager/handlezedrouter.go | 2 +- pkg/pillar/cmd/zedmanager/updatestatus.go | 30 +- pkg/pillar/cmd/zedmanager/zedmanager.go | 139 ++- pkg/pillar/cmd/zedrouter/cni.go | 4 + pkg/pillar/docs/zedkube.md | 10 + pkg/pillar/hypervisor/kubevirt.go | 803 ++++++++++++------ pkg/pillar/hypervisor/kubevirt_test.go | 108 +++ pkg/pillar/kubeapi/kubeapi.go | 4 +- pkg/pillar/types/clustertypes.go | 2 + pkg/pillar/types/domainmgrtypes.go | 5 + pkg/pillar/types/zedmanagertypes.go | 3 +- 18 files changed, 1139 insertions(+), 262 deletions(-) create mode 100644 pkg/pillar/cmd/zedmanager/handleclusterapp.go create mode 100644 pkg/pillar/hypervisor/kubevirt_test.go diff --git a/pkg/pillar/base/kubevirt.go b/pkg/pillar/base/kubevirt.go index 3c75eb95ae..5d250fc10b 100644 --- a/pkg/pillar/base/kubevirt.go +++ b/pkg/pillar/base/kubevirt.go @@ -67,15 +67,37 @@ func GetAppKubeName(displayName string, uuid uuid.UUID) string { } // GetVMINameFromVirtLauncher : get VMI name from the corresponding Kubevirt -// launcher pod name. +// launcher pod name for replicaset generated VMI. func GetVMINameFromVirtLauncher(podName string) (vmiName string, isVirtLauncher bool) { if !strings.HasPrefix(podName, VMIPodNamePrefix) { return "", false } vmiName = strings.TrimPrefix(podName, VMIPodNamePrefix) lastSep := strings.LastIndex(vmiName, "-") - if lastSep != -1 { - vmiName = vmiName[:lastSep] + if lastSep == -1 || lastSep < 5 { + return "", false } + + // Check if the last part is 5 bytes long + if len(vmiName[lastSep+1:]) != 5 { + return "", false + } + + // Use the index minus 5 bytes to get the VMI name to remove added + // replicaset suffix + vmiName = vmiName[:lastSep-5] return vmiName, true } + +// GetReplicaPodName : get the app name from the pod name for replica pods. +func GetReplicaPodName(displayName, podName string, uuid uuid.UUID) (kubeName string, isReplicaPod bool) { + kubeName = GetAppKubeName(displayName, uuid) + if !strings.HasPrefix(podName, kubeName) { + return "", false + } + suffix := strings.TrimPrefix(podName, kubeName) + if strings.HasPrefix(suffix, "-") && len(suffix[1:]) == 5 { + return kubeName, true + } + return "", false +} diff --git a/pkg/pillar/cipher/cipher.go b/pkg/pillar/cipher/cipher.go index 42b486e5e1..db7ff4a3e8 100644 --- a/pkg/pillar/cipher/cipher.go +++ b/pkg/pillar/cipher/cipher.go @@ -27,6 +27,7 @@ func getEncryptionBlock( decBlock.CellularNetUsername = zconfigDecBlockPtr.CellularNetUsername decBlock.CellularNetPassword = zconfigDecBlockPtr.CellularNetPassword decBlock.ProtectedUserData = zconfigDecBlockPtr.ProtectedUserData + decBlock.ClusterToken = zconfigDecBlockPtr.ClusterToken return decBlock } diff --git a/pkg/pillar/cmd/domainmgr/domainmgr.go b/pkg/pillar/cmd/domainmgr/domainmgr.go index 906db44751..502e9706d7 100644 --- a/pkg/pillar/cmd/domainmgr/domainmgr.go +++ b/pkg/pillar/cmd/domainmgr/domainmgr.go @@ -91,6 +91,7 @@ type domainContext struct { pubDomainStatus pubsub.Publication subGlobalConfig pubsub.Subscription subZFSPoolStatus pubsub.Subscription + subEdgeNodeInfo pubsub.Subscription pubAssignableAdapters pubsub.Publication pubDomainMetric pubsub.Publication pubHostMemory pubsub.Publication @@ -126,6 +127,7 @@ type domainContext struct { cpuPinningSupported bool // Is it kubevirt eve hvTypeKube bool + nodeName string } // AddAgentSpecificCLIFlags adds CLI options @@ -414,9 +416,24 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar domainCtx.subZFSPoolStatus = subZFSPoolStatus subZFSPoolStatus.Activate() + // Look for edge node info + subEdgeNodeInfo, err := ps.NewSubscription(pubsub.SubscriptionOptions{ + AgentName: "zedagent", + MyAgentName: agentName, + TopicImpl: types.EdgeNodeInfo{}, + Persistent: true, + Activate: false, + }) + if err != nil { + log.Fatal(err) + } + domainCtx.subEdgeNodeInfo = subEdgeNodeInfo + _ = subEdgeNodeInfo.Activate() + // Parse any existing ConfigIntemValueMap but continue if there // is none - for !domainCtx.GCComplete { + waitEdgeNodeInfo := true + for !domainCtx.GCComplete || (domainCtx.hvTypeKube && waitEdgeNodeInfo) { log.Noticef("waiting for GCComplete") select { case change := <-subGlobalConfig.MsgChan(): @@ -425,6 +442,10 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar case <-domainCtx.publishTicker.C: publishProcessesHandler(&domainCtx) + case change := <-subEdgeNodeInfo.MsgChan(): + subEdgeNodeInfo.ProcessChange(change) + waitEdgeNodeInfo = false + case <-stillRunning.C: } ps.StillRunning(agentName, warningTime, errorTime) @@ -513,6 +534,9 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar case change := <-subZFSPoolStatus.MsgChan(): subZFSPoolStatus.ProcessChange(change) + case change := <-subEdgeNodeInfo.MsgChan(): + subEdgeNodeInfo.ProcessChange(change) + case <-domainCtx.publishTicker.C: publishProcessesHandler(&domainCtx) @@ -651,6 +675,9 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar case change := <-subPhysicalIOAdapter.MsgChan(): subPhysicalIOAdapter.ProcessChange(change) + case change := <-subEdgeNodeInfo.MsgChan(): + subEdgeNodeInfo.ProcessChange(change) + case <-domainCtx.publishTicker.C: start := time.Now() err = domainCtx.cipherMetrics.Publish(log, cipherMetricsPub, "global") @@ -977,12 +1004,15 @@ func verifyStatus(ctx *domainContext, status *types.DomainStatus) { status.SetErrorDescription(errDescription) } - //cleanup app instance tasks - if err := hyper.Task(status).Delete(status.DomainName); err != nil { - log.Errorf("failed to delete domain: %s (%v)", status.DomainName, err) - } - if err := hyper.Task(status).Cleanup(status.DomainName); err != nil { - log.Errorf("failed to cleanup domain: %s (%v)", status.DomainName, err) + // in cluster mode, we can not delete the pod due to failing to get app info + if !ctx.hvTypeKube { + //cleanup app instance tasks + if err := hyper.Task(status).Delete(status.DomainName); err != nil { + log.Errorf("failed to delete domain: %s (%v)", status.DomainName, err) + } + if err := hyper.Task(status).Cleanup(status.DomainName); err != nil { + log.Errorf("failed to cleanup domain: %s (%v)", status.DomainName, err) + } } } status.DomainId = 0 @@ -1071,6 +1101,14 @@ func maybeRetryBoot(ctx *domainContext, status *types.DomainStatus) { if !status.BootFailed { return } + + err := ctx.retrieveNodeNameAndUUID() + if err != nil { + log.Errorf("maybeRetryBoot(%s) retrieveNodeNameAndUUID failed: %s", + status.Key(), err) + return + } + if status.Activated && status.BootFailed { log.Functionf("maybeRetryBoot(%s) clearing bootFailed since Activated", status.Key()) @@ -1138,6 +1176,11 @@ func maybeRetryBoot(ctx *domainContext, status *types.DomainStatus) { log.Errorf("Failed to setup vTPM for %s: %s", status.DomainName, err) } + // pass nodeName to hypervisor call Setup + if status.NodeName == "" { + status.NodeName = ctx.nodeName + } + if err := hyper.Task(status).Setup(*status, *config, ctx.assignableAdapters, nil, file); err != nil { //it is retry, so omit error log.Errorf("Failed to create DomainStatus from %+v: %s", @@ -1322,6 +1365,7 @@ func handleCreate(ctx *domainContext, key string, config *types.DomainConfig) { State: types.INSTALLED, VmConfig: config.VmConfig, Service: config.Service, + IsDNidNode: config.IsDNidNode, } status.VmConfig.CPUs = make([]int, 0) @@ -1531,6 +1575,13 @@ func doActivate(ctx *domainContext, config types.DomainConfig, log.Functionf("doActivate(%v) for %s", config.UUIDandVersion, config.DisplayName) + err := ctx.retrieveNodeNameAndUUID() + if err != nil { + log.Errorf("doActivate(%s) retrieveNodeNameAndUUID failed: %s", + status.Key(), err) + return + } + if ctx.cpuPinningSupported { if err := assignCPUs(ctx, &config, status); err != nil { log.Warnf("failed to assign CPUs for %s", config.DisplayName) @@ -1684,6 +1735,11 @@ func doActivate(ctx *domainContext, config types.DomainConfig, log.Errorf("Failed to setup vTPM for %s: %s", status.DomainName, err) } + // pass nodeName to hypervisor call Setup + if status.NodeName == "" { + status.NodeName = ctx.nodeName + } + globalConfig := agentlog.GetGlobalConfig(log, ctx.subGlobalConfig) if err := hyper.Task(status).Setup(*status, config, ctx.assignableAdapters, globalConfig, file); err != nil { log.Errorf("Failed to create DomainStatus from %+v: %s", @@ -1751,6 +1807,18 @@ func doActivateTail(ctx *domainContext, status *types.DomainStatus, log.Errorf("domain start for %s: %s", status.DomainName, err) status.SetErrorNow(err.Error()) + // HvKube case + if ctx.hvTypeKube && !status.IsDNidNode { + log.Noticef("doActivateTail(%v) we are not DNiD, skip delete app", status.DomainName) + return + } + + // Only send delete if DomainConfig is not deleted + // detail see the zedkube.md section 'Handling Domain Deletion in Domainmgr' + if ctx.hvTypeKube && !status.DomainConfigDeleted { + log.Noticef("doActivateTail(%v) DomainConfig exists, skip delete app", status.DomainName) + return + } // Delete if err := hyper.Task(status).Delete(status.DomainName); err != nil { log.Errorf("failed to delete domain: %s (%v)", status.DomainName, err) @@ -1780,6 +1848,18 @@ func doActivateTail(ctx *domainContext, status *types.DomainStatus, status.SetErrorNow(err.Error()) log.Errorf("doActivateTail(%v) failed for %s: %s", status.UUIDandVersion, status.DisplayName, err) + + if ctx.hvTypeKube && !status.IsDNidNode { + log.Noticef("doActivateTail(%v) we are not DNiD, skip delete app", status.DomainName) + return + } + // Only send delete if DomainConfig is not deleted + // detail see the zedkube.md section 'Handling Domain Deletion in Domainmgr' + if ctx.hvTypeKube && !status.DomainConfigDeleted { + log.Noticef("doActivateTail(%v) DomainConfig exists, skip delete app", status.DomainName) + return + } + // Delete if err := hyper.Task(status).Delete(status.DomainName); err != nil { log.Errorf("failed to delete domain: %s (%v)", status.DomainName, err) @@ -1844,7 +1924,7 @@ func doInactivate(ctx *domainContext, status *types.DomainStatus, impatient bool if doShutdown { // If the Shutdown fails we don't wait; assume failure // was due to no PV tools - if err := DomainShutdown(*status, false); err != nil { + if err := DomainShutdown(ctx, *status, false); err != nil { log.Errorf("DomainShutdown %s failed: %s", status.DomainName, err) } else { @@ -1864,7 +1944,7 @@ func doInactivate(ctx *domainContext, status *types.DomainStatus, impatient bool // the domain is already on the way down. // In case of errors we proceed directly to deleting the task, // and after that we waitForDomainGone - if err := DomainShutdown(*status, true); err != nil { + if err := DomainShutdown(ctx, *status, true); err != nil { log.Warnf("DomainShutdown -F %s failed: %s", status.DomainName, err) } else { @@ -1881,6 +1961,17 @@ func doInactivate(ctx *domainContext, status *types.DomainStatus, impatient bool } if status.DomainId != 0 { + if ctx.hvTypeKube && !status.IsDNidNode { + log.Noticef("doInactivate(%v) we are not DNiD, skip delete app", status.DomainName) + return + } + // Only send delete if DomainConfig is not deleted + // detail see the zedkube.md section 'Handling Domain Deletion in Domainmgr' + if ctx.hvTypeKube && !status.DomainConfigDeleted { + log.Noticef("doInactivate(%v) DomainConfig exists, skip delete app", status.DomainName) + return + } + if err := hyper.Task(status).Delete(status.DomainName); err != nil { log.Errorf("Failed to delete domain %s (%v)", status.DomainName, err) } else { @@ -2468,6 +2559,16 @@ func handleDelete(ctx *domainContext, key string, status *types.DomainStatus) { // No point in publishing metrics any more ctx.pubDomainMetric.Unpublish(status.Key()) + if ctx.hvTypeKube && !status.IsDNidNode { + log.Noticef("handleDelete(%v) we are not DNiD, skip delete app", status.DomainName) + return + } + + // set the DomainConfigDeleted for kubernetes to remove the domain + // detail see the zedkube.md section 'Handling Domain Deletion in Domainmgr' + status.DomainConfigDeleted = true + log.Noticef("handleDelete(%v) DomainConfigDeleted", status.DomainName) + err := hyper.Task(status).Delete(status.DomainName) if err != nil { log.Errorln(err) @@ -2508,13 +2609,18 @@ func DomainCreate(ctx *domainContext, status types.DomainStatus) (int, error) { } // DomainShutdown is a wrapper for domain shutdown -func DomainShutdown(status types.DomainStatus, force bool) error { +func DomainShutdown(ctx *domainContext, status types.DomainStatus, force bool) error { var err error log.Functionf("DomainShutdown force-%v %s %d", force, status.DomainName, status.DomainId) // Stop the domain log.Functionf("Stopping domain - %s", status.DomainName) + + if ctx.hvTypeKube && !status.IsDNidNode { + log.Noticef("DomainShutdown(%v) we are not DNiD, skip delete app", status.DomainName) + return nil + } err = hyper.Task(&status).Stop(status.DomainName, force) return err @@ -3603,3 +3709,16 @@ func lookupCapabilities(ctx *domainContext) (*types.Capabilities, error) { } return &capabilities, nil } + +func (ctx *domainContext) retrieveNodeNameAndUUID() error { + if ctx.nodeName == "" { + NodeInfo, err := ctx.subEdgeNodeInfo.Get("global") + if err != nil { + log.Errorf("retrieveNodeNameAndUUID: can't get edgeNodeInfo %v", err) + return err + } + enInfo := NodeInfo.(types.EdgeNodeInfo) + ctx.nodeName = strings.ToLower(enInfo.DeviceName) + } + return nil +} diff --git a/pkg/pillar/cmd/zedagent/parseconfig.go b/pkg/pillar/cmd/zedagent/parseconfig.go index be8ccad25e..2c7d5e21a3 100644 --- a/pkg/pillar/cmd/zedagent/parseconfig.go +++ b/pkg/pillar/cmd/zedagent/parseconfig.go @@ -151,6 +151,10 @@ func parseConfig(getconfigCtx *getconfigContext, config *zconfig.EdgeDevConfig, if source != fromBootstrap { activateNewBaseOS := parseBaseOS(getconfigCtx, config) + parseEdgeNodeClusterConfig(getconfigCtx, config) + + // Parse EdgeNode Cluster configuration + parseNetworkInstanceConfig(getconfigCtx, config) parseContentInfoConfig(getconfigCtx, config) parseVolumeConfig(getconfigCtx, config) @@ -764,6 +768,10 @@ func parseAppInstanceConfig(getconfigCtx *getconfigContext, // Add config submitted via local profile server. addLocalAppConfig(getconfigCtx, &appInstance) + // XXX add Designated ID to the appInstance + // XXX Keep this here for now to allow the kubevirt single-node working, the later PR to EVE main will remove this + appInstance.DesignatedNodeID = devUUID + // Verify that it fits and if not publish with error checkAndPublishAppInstanceConfig(getconfigCtx, appInstance) } @@ -3199,3 +3207,53 @@ func handleDeviceOperation(ctxPtr *zedagentContext, op types.DeviceOperation) { shutdownAppsGlobal(ctxPtr) // nothing else to be done } + +func parseEdgeNodeClusterConfig(getconfigCtx *getconfigContext, + config *zconfig.EdgeDevConfig) { + + ctx := getconfigCtx.zedagentCtx + zcfgCluster := config.GetCluster() + if zcfgCluster == nil { + log.Functionf("parseEdgeNodeClusterConfig: No EdgeNodeClusterConfig, Unpublishing") + pub := ctx.pubEdgeNodeClusterConfig + items := pub.GetAll() + if len(items) > 0 { + log.Functionf("parseEdgeNodeClusterConfig: Unpublishing EdgeNodeClusterConfig") + ctx.pubEdgeNodeClusterConfig.Unpublish("global") + } + return + } + ipAddr, ipNet, err := net.ParseCIDR(zcfgCluster.GetClusterIpPrefix()) + if err != nil { + log.Errorf("parseEdgeNodeClusterConfig: ParseCIDR failed %s", err) + return + } + ipNet.IP = ipAddr + + joinServerIP := net.ParseIP(zcfgCluster.GetJoinServerIp()) + var isJoinNode bool + // deduce the bootstrap node status from clusterIPPrefix and joinServerIP + if ipAddr.Equal(joinServerIP) { // deduce the bootstrap node status from + isJoinNode = true + } + + id, err := uuid.FromString(zcfgCluster.GetClusterId()) + if err != nil { + log.Errorf("parseEdgeNodeClusterConfig: failed to parse UUID: %v", err) + return + } + enClusterConfig := types.EdgeNodeClusterConfig{ + ClusterName: zcfgCluster.GetClusterName(), + ClusterID: types.UUIDandVersion{UUID: id}, + ClusterInterface: zcfgCluster.GetClusterInterface(), + ClusterIPPrefix: ipNet, + IsWorkerNode: zcfgCluster.GetIsWorkerNode(), + JoinServerIP: joinServerIP, + BootstrapNode: isJoinNode, + // XXX EncryptedClusterToken is only for gcp config + } + enClusterConfig.CipherToken = parseCipherBlock(getconfigCtx, + enClusterConfig.Key(), zcfgCluster.GetEncryptedClusterToken()) + log.Functionf("parseEdgeNodeClusterConfig: ENCluster API, Config %+v, %v", zcfgCluster, enClusterConfig) + ctx.pubEdgeNodeClusterConfig.Publish("global", enClusterConfig) +} diff --git a/pkg/pillar/cmd/zedagent/zedagent.go b/pkg/pillar/cmd/zedagent/zedagent.go index 70e88b7a70..62d4fc2df1 100644 --- a/pkg/pillar/cmd/zedagent/zedagent.go +++ b/pkg/pillar/cmd/zedagent/zedagent.go @@ -229,6 +229,9 @@ type zedagentContext struct { // Is Kubevirt eve hvTypeKube bool + // EN cluster config + pubEdgeNodeClusterConfig pubsub.Publication + // Netdump netDumper *netdump.NetDumper // nil if netdump is disabled netdumpInterval time.Duration @@ -1103,6 +1106,16 @@ func initPublications(zedagentCtx *zedagentContext) { } getconfigCtx.pubZedAgentStatus.ClearRestarted() + zedagentCtx.pubEdgeNodeClusterConfig, err = ps.NewPublication(pubsub.PublicationOptions{ + AgentName: agentName, + Persistent: true, + TopicType: types.EdgeNodeClusterConfig{}, + }) + if err != nil { + log.Fatal(err) + } + zedagentCtx.pubEdgeNodeClusterConfig.ClearRestarted() + getconfigCtx.pubPhysicalIOAdapters, err = ps.NewPublication(pubsub.PublicationOptions{ AgentName: agentName, TopicType: types.PhysicalIOAdapterList{}, diff --git a/pkg/pillar/cmd/zedmanager/handleclusterapp.go b/pkg/pillar/cmd/zedmanager/handleclusterapp.go new file mode 100644 index 0000000000..83a94dfa67 --- /dev/null +++ b/pkg/pillar/cmd/zedmanager/handleclusterapp.go @@ -0,0 +1,43 @@ +// Copyright (c) 2024 Zededa, Inc. +// SPDX-License-Identifier: Apache-2.0 + +package zedmanager + +import "github.com/lf-edge/eve/pkg/pillar/types" + +func handleENClusterAppStatusCreate(ctxArg interface{}, key string, configArg interface{}) { + log.Functionf("handleENClusterAppStatusCreate(%s)", key) + ctx := ctxArg.(*zedmanagerContext) + status := configArg.(types.ENClusterAppStatus) + handleENClusterAppStatusImpl(ctx, key, &status) +} + +func handleENClusterAppStatusModify(ctxArg interface{}, key string, configArg interface{}, oldConfigArg interface{}) { + log.Functionf("handleENClusterAppStatusModify(%s)", key) + ctx := ctxArg.(*zedmanagerContext) + status := configArg.(types.ENClusterAppStatus) + handleENClusterAppStatusImpl(ctx, key, &status) +} + +func handleENClusterAppStatusDelete(ctxArg interface{}, key string, configArg interface{}) { + log.Functionf("handleENClusterAppStatusDelete(%s)", key) + ctx := ctxArg.(*zedmanagerContext) + //status := configArg.(types.ENClusterAppStatus) + handleENClusterAppStatusImpl(ctx, key, nil) +} + +func handleENClusterAppStatusImpl(ctx *zedmanagerContext, key string, status *types.ENClusterAppStatus) { + + log.Functionf("handleENClusterAppStatusImpl(%s) for app-status %v", key, status) + pub := ctx.pubAppInstanceStatus + items := pub.GetAll() + for _, st := range items { + aiStatus := st.(types.AppInstanceStatus) + if aiStatus.UUIDandVersion.UUID.String() == key { + log.Functionf("handleENClusterAppStatusImpl(%s) found ai status, update", key) + + updateAIStatusUUID(ctx, aiStatus.UUIDandVersion.UUID.String()) + break + } + } +} diff --git a/pkg/pillar/cmd/zedmanager/handledomainmgr.go b/pkg/pillar/cmd/zedmanager/handledomainmgr.go index 35c3e53e69..33ccb70b37 100644 --- a/pkg/pillar/cmd/zedmanager/handledomainmgr.go +++ b/pkg/pillar/cmd/zedmanager/handledomainmgr.go @@ -11,6 +11,7 @@ import ( "strings" "github.com/lf-edge/eve/pkg/pillar/types" + uuid "github.com/satori/go.uuid" ) const ( @@ -46,7 +47,12 @@ func MaybeAddDomainConfig(ctx *zedmanagerContext, if ns != nil { AppNum = ns.AppNum } - effectiveActivate := effectiveActivateCurrentProfile(aiConfig, ctx.currentProfile) + + isDNiDnode := false + if aiConfig.DesignatedNodeID != uuid.Nil && aiConfig.DesignatedNodeID == ctx.nodeUUID { + isDNiDnode = true + } + effectiveActivate := effectiveActivateCombined(aiConfig, ctx) dc := types.DomainConfig{ UUIDandVersion: aiConfig.UUIDandVersion, DisplayName: aiConfig.DisplayName, @@ -60,6 +66,7 @@ func MaybeAddDomainConfig(ctx *zedmanagerContext, MetaDataType: aiConfig.MetaDataType, Service: aiConfig.Service, CloudInitVersion: aiConfig.CloudInitVersion, + IsDNidNode: isDNiDnode, } dc.DiskConfigList = make([]types.DiskConfig, 0, len(aiStatus.VolumeRefStatusList)) diff --git a/pkg/pillar/cmd/zedmanager/handlezedrouter.go b/pkg/pillar/cmd/zedmanager/handlezedrouter.go index 176e54688c..9a04791779 100644 --- a/pkg/pillar/cmd/zedmanager/handlezedrouter.go +++ b/pkg/pillar/cmd/zedmanager/handlezedrouter.go @@ -19,7 +19,7 @@ func MaybeAddAppNetworkConfig(ctx *zedmanagerContext, log.Functionf("MaybeAddAppNetworkConfig for %s displayName %s", key, displayName) - effectiveActivate := effectiveActivateCurrentProfile(aiConfig, ctx.currentProfile) + effectiveActivate := effectiveActivateCombined(aiConfig, ctx) changed := false m := lookupAppNetworkConfig(ctx, key) diff --git a/pkg/pillar/cmd/zedmanager/updatestatus.go b/pkg/pillar/cmd/zedmanager/updatestatus.go index 3c2a265b87..0a3da0c25c 100644 --- a/pkg/pillar/cmd/zedmanager/updatestatus.go +++ b/pkg/pillar/cmd/zedmanager/updatestatus.go @@ -202,7 +202,7 @@ func doUpdate(ctx *zedmanagerContext, return changed } - effectiveActivate := effectiveActivateCurrentProfile(config, ctx.currentProfile) + effectiveActivate := effectiveActivateCombined(config, ctx) if !effectiveActivate { if status.Activated || status.ActivateInprogress { @@ -729,6 +729,34 @@ func doActivate(ctx *zedmanagerContext, uuidStr string, // if the VM already active or in restarting/purging state - continue with the doActivate logic } + // delay this if referencename is not set + if ctx.hvTypeKube && config.FixedResources.VirtualizationMode == types.NOHYPER { + var findcontainer bool + for _, vrc := range config.VolumeRefConfigList { + vrs := lookupVolumeRefStatus(ctx, vrc.Key()) + if vrs == nil || !vrs.IsContainer() { + continue + } + findcontainer = true + if vrs.ReferenceName == "" { + log.Noticef("doActivate: waiting for referencename ") + if status.State != types.START_DELAYED { + status.State = types.START_DELAYED + return true + } + return changed + } + } + if !findcontainer { + log.Noticef("doActivate: no container found, wait") + if status.State != types.START_DELAYED { + status.State = types.START_DELAYED + return true + } + return changed + } + } + // Make sure we have a DomainConfig // We modify it below and then publish it dc, err := MaybeAddDomainConfig(ctx, config, *status, ns) diff --git a/pkg/pillar/cmd/zedmanager/zedmanager.go b/pkg/pillar/cmd/zedmanager/zedmanager.go index 1fc2e6b661..abe4872800 100644 --- a/pkg/pillar/cmd/zedmanager/zedmanager.go +++ b/pkg/pillar/cmd/zedmanager/zedmanager.go @@ -26,6 +26,7 @@ import ( "github.com/lf-edge/eve/pkg/pillar/types" fileutils "github.com/lf-edge/eve/pkg/pillar/utils/file" "github.com/lf-edge/eve/pkg/pillar/utils/wait" + uuid "github.com/satori/go.uuid" "github.com/sirupsen/logrus" ) @@ -51,9 +52,11 @@ type zedmanagerContext struct { subAppNetworkStatus pubsub.Subscription pubDomainConfig pubsub.Publication subDomainStatus pubsub.Subscription + subENClusterAppStatus pubsub.Subscription subGlobalConfig pubsub.Subscription subHostMemory pubsub.Subscription subZedAgentStatus pubsub.Subscription + subEdgeNodeInfo pubsub.Subscription pubVolumesSnapConfig pubsub.Publication subVolumesSnapStatus pubsub.Subscription subAssignableAdapters pubsub.Subscription @@ -71,6 +74,7 @@ type zedmanagerContext struct { assignableAdapters *types.AssignableAdapters // Is it kubevirt eve hvTypeKube bool + nodeUUID uuid.UUID } // AddAgentSpecificCLIFlags adds CLI options @@ -375,6 +379,24 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar ctx.subVolumesSnapStatus = subVolumesSnapshotStatus _ = subVolumesSnapshotStatus.Activate() + subENClusterAppStatus, err := ps.NewSubscription(pubsub.SubscriptionOptions{ + AgentName: "zedkube", + MyAgentName: agentName, + TopicImpl: types.ENClusterAppStatus{}, + Activate: false, + Ctx: &ctx, + CreateHandler: handleENClusterAppStatusCreate, + ModifyHandler: handleENClusterAppStatusModify, + DeleteHandler: handleENClusterAppStatusDelete, + WarningTime: warningTime, + ErrorTime: errorTime, + }) + if err != nil { + log.Fatal(err) + } + ctx.subENClusterAppStatus = subENClusterAppStatus + _ = subENClusterAppStatus.Activate() + ctx.subAssignableAdapters, err = ps.NewSubscription(pubsub.SubscriptionOptions{ AgentName: "domainmgr", MyAgentName: agentName, @@ -391,6 +413,20 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar log.Fatal(err) } + // Look for edge node info + subEdgeNodeInfo, err := ps.NewSubscription(pubsub.SubscriptionOptions{ + AgentName: "zedagent", + MyAgentName: agentName, + TopicImpl: types.EdgeNodeInfo{}, + Persistent: true, + Activate: false, + }) + if err != nil { + log.Fatal(err) + } + ctx.subEdgeNodeInfo = subEdgeNodeInfo + _ = subEdgeNodeInfo.Activate() + // Pick up debug aka log level before we start real work for !ctx.GCInitialized { log.Functionf("waiting for GCInitialized") @@ -441,6 +477,9 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar case change := <-subVolumesSnapshotStatus.MsgChan(): subVolumesSnapshotStatus.ProcessChange(change) + case change := <-subENClusterAppStatus.MsgChan(): + subENClusterAppStatus.ProcessChange(change) + case change := <-ctx.subAssignableAdapters.MsgChan(): ctx.subAssignableAdapters.ProcessChange(change) @@ -455,6 +494,9 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar ctx.checkFreedResources = false } + case change := <-subEdgeNodeInfo.MsgChan(): + subEdgeNodeInfo.ProcessChange(change) + case <-delayedStartTicker.C: checkDelayedStartApps(&ctx) @@ -659,7 +701,7 @@ func publishAppInstanceSummary(ctxPtr *zedmanagerContext) { effectiveActivate := false config := lookupAppInstanceConfig(ctxPtr, status.Key(), true) if config != nil { - effectiveActivate = effectiveActivateCurrentProfile(*config, ctxPtr.currentProfile) + effectiveActivate = effectiveActivateCombined(*config, ctxPtr) } // Only condition we did not count is EffectiveActive = true and Activated = false. // That means customer either halted his app or did not activate it yet. @@ -689,6 +731,17 @@ func publishAppInstanceStatus(ctx *zedmanagerContext, key := status.Key() log.Tracef("publishAppInstanceStatus(%s)", key) pub := ctx.pubAppInstanceStatus + if ctx.hvTypeKube { + sub := ctx.subENClusterAppStatus + st, _ := sub.Get(key) + if st != nil { + clusterStatus := st.(types.ENClusterAppStatus) + if !clusterStatus.ScheduledOnThisNode { + log.Functionf("publishAppInstanceStatus(%s) not scheduled on this node, skip", key) + return + } + } + } pub.Publish(key, *status) } @@ -1177,7 +1230,7 @@ func handleModify(ctxArg interface{}, key string, updateSnapshotsInAIStatus(status, config) - effectiveActivate := effectiveActivateCurrentProfile(config, ctx.currentProfile) + effectiveActivate := effectiveActivateCombined(config, ctx) publishAppInstanceStatus(ctx, status) @@ -1545,8 +1598,9 @@ func updateBasedOnProfile(ctx *zedmanagerContext, oldProfile string) { if localConfig := lookupLocalAppInstanceConfig(ctx, config.Key()); localConfig != nil { config = *localConfig } - effectiveActivate := effectiveActivateCurrentProfile(config, ctx.currentProfile) - effectiveActivateOld := effectiveActivateCurrentProfile(config, oldProfile) + effectiveActivate := effectiveActivateCombined(config, ctx) + effectiveActivateOldTemp := effectiveActivateCurrentProfile(config, oldProfile) + effectiveActivateOld := getKubeAppActivateStatus(ctx, config, effectiveActivateOldTemp) if effectiveActivateOld == effectiveActivate { // no changes in effective activate continue @@ -1563,6 +1617,14 @@ func updateBasedOnProfile(ctx *zedmanagerContext, oldProfile string) { } // returns effective Activate status based on Activate from app instance config and current profile +func effectiveActivateCombined(config types.AppInstanceConfig, ctx *zedmanagerContext) bool { + effectiveActivate := effectiveActivateCurrentProfile(config, ctx.currentProfile) + // Add cluster login in the activate state + combined := getKubeAppActivateStatus(ctx, config, effectiveActivate) + log.Functionf("effectiveActivateCombined: effectiveActivate %t, combined %t", effectiveActivate, combined) + return combined +} + func effectiveActivateCurrentProfile(config types.AppInstanceConfig, currentProfile string) bool { if currentProfile == "" { log.Functionf("effectiveActivateCurrentProfile(%s): empty current", config.Key()) @@ -1586,3 +1648,72 @@ func effectiveActivateCurrentProfile(config types.AppInstanceConfig, currentProf config.Key(), currentProfile) return false } + +func getKubeAppActivateStatus(ctx *zedmanagerContext, aiConfig types.AppInstanceConfig, effectiveActivate bool) bool { + + if !ctx.hvTypeKube || aiConfig.DesignatedNodeID == uuid.Nil { + return effectiveActivate + } + + if ctx.nodeUUID == uuid.Nil { + err := getnodeNameAndUUID(ctx) + if err != nil { + log.Errorf("getKubeAppActivateStatus: can't get nodeUUID %v", err) + return false + } + } + sub := ctx.subENClusterAppStatus + items := sub.GetAll() + + // 1) if the dnid is on this node + // a) if the pod is not on this node, and the pod is running, return false + // b) otherwise, return true + // 2) if the dnid is not on this node + // a) if the pod is on this node, and status is running, return true + // b) otherwise, return false + var onTheDevice bool + var statusRunning bool + for _, item := range items { + status := item.(types.ENClusterAppStatus) + if status.AppUUID == aiConfig.UUIDandVersion.UUID { + statusRunning = status.StatusRunning + if status.IsDNidNode { + onTheDevice = true + break + } else if status.ScheduledOnThisNode { + onTheDevice = true + break + } + } + } + + log.Functionf("getKubeAppActivateStatus: ai %s, node %s, onTheDevice %v, statusRunning %v", + aiConfig.DesignatedNodeID.String(), ctx.nodeUUID, onTheDevice, statusRunning) + if aiConfig.DesignatedNodeID == ctx.nodeUUID { + if statusRunning && !onTheDevice { + return false + } + return effectiveActivate + } else { + // the pod is on this node, but it will not be in running state, unless + // zedmanager make this app activate and zedrouter CNI has the network status + // for this App. So, not in running state is ok. + if onTheDevice { + return effectiveActivate + } + return false + } +} + +func getnodeNameAndUUID(ctx *zedmanagerContext) error { + if ctx.nodeUUID == uuid.Nil { + NodeInfo, err := ctx.subEdgeNodeInfo.Get("global") + if err != nil { + log.Errorf("getnodeNameAndUUID: can't get edgeNodeInfo %v", err) + return err + } + enInfo := NodeInfo.(types.EdgeNodeInfo) + ctx.nodeUUID = enInfo.DeviceID + } + return nil +} diff --git a/pkg/pillar/cmd/zedrouter/cni.go b/pkg/pillar/cmd/zedrouter/cni.go index ec069cbc4a..56bc3c2a32 100644 --- a/pkg/pillar/cmd/zedrouter/cni.go +++ b/pkg/pillar/cmd/zedrouter/cni.go @@ -333,6 +333,10 @@ func (z *zedrouter) getAppByPodName( for _, item := range z.pubAppNetworkStatus.GetAll() { appStatus := item.(types.AppNetworkStatus) appUUID := appStatus.UUIDandVersion.UUID + repPodName, isReplicaPod := base.GetReplicaPodName(appStatus.DisplayName, podName, appUUID) + if isReplicaPod { + appKubeName = repPodName + } if base.GetAppKubeName(appStatus.DisplayName, appUUID) == appKubeName { appConfig := z.lookupAppNetworkConfig(appStatus.Key()) if appConfig == nil { diff --git a/pkg/pillar/docs/zedkube.md b/pkg/pillar/docs/zedkube.md index 7aad723a52..822773ba2d 100644 --- a/pkg/pillar/docs/zedkube.md +++ b/pkg/pillar/docs/zedkube.md @@ -45,6 +45,16 @@ Any given node could be hosting one or more longhorn volume replicas and thus co A drain operation should be performed before any Node Operation / Node Command which can cause an extended outage of a node such as a reboot, shutdown, reset. kubenodeop handles NodeDrainRequest objects which zedkube subscribes to, initiates the drain, and publishes NodeDrainStatus objects. +## Applications under Kubevirt Mode + +### Handling Domain Deletion in Domainmgr + +In normal cases of EVE application launching and running, the domainmgr handles the configuration creation, starts the domain, and monitors the domain's running status. If the starting and monitoring status is not in the running state, then there is something wrong with the runtime process, and the domain is normally stopped and deleted by the domainmgr. Domainmgr keeps a timer, usually 10 minutes, to retry starting the domain again later. + +When the application is launched and managed in KubeVirt mode, the Kubernetes cluster is provisioned for this application, being a VMI (Virtual Machine Instance) replicaSet object or a Pod replicaSet object. It uses a declarative approach to manage the desired state of the applications. The configurations are saved in the Kubernetes database for the Kubernetes controller to use to ensure the objects eventually achieve the correct state if possible. Any particular VMI/Pod state of a domain may not be in working condition at the time when EVE domainmgr checks. In the domainmgr code running in KubeVirt mode, it normally skips the hyper.Task().Delete() or hyper.Task().Stop() in domainmgr.go, and lets the Kubernetes cluster have a chance to work its way to bring up the application to the running state. + +The exception to the above is in the case of the application itself being removed from the AppInstanceConfig, in which case, the DomainStatus of this application will be deleted, and we have a new boolean DomainConfigDeleted to be set if the DomainStatus is pending for deletion. When the DomainStatus of DomainConfigDeleted is set, the code in domainmgr will allow the Stop() or Delete() operations for Kubernetes to remove the replicaSet of the application. + ## Kubernetes Node Draining ### kubeapi diff --git a/pkg/pillar/hypervisor/kubevirt.go b/pkg/pillar/hypervisor/kubevirt.go index 378584edbc..41fae4a6b8 100644 --- a/pkg/pillar/hypervisor/kubevirt.go +++ b/pkg/pillar/hypervisor/kubevirt.go @@ -27,6 +27,7 @@ import ( netattdefv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/apis/k8s.cni.cncf.io/v1" "github.com/lf-edge/eve/pkg/pillar/kubeapi" "github.com/sirupsen/logrus" + appsv1 "k8s.io/api/apps/v1" k8sv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" @@ -34,6 +35,7 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" metricsv "k8s.io/metrics/pkg/client/clientset/versioned" + "k8s.io/utils/pointer" v1 "kubevirt.io/api/core/v1" "kubevirt.io/client-go/kubecli" ) @@ -45,17 +47,30 @@ const ( eveLabelKey = "App-Domain-Name" waitForPodCheckCounter = 5 // Check 5 times waitForPodCheckTime = 15 // Check every 15 seconds, don't wait for too long to cause watchdog + tolerateSec = 30 // Pod/VMI reschedule delay after node unreachable seconds +) + +// MetaDataType is a type for different Domain types +// We only support ReplicaSet for VMI and Pod for now. +type MetaDataType int + +// Constants representing different resource types. +const ( + IsMetaVmi MetaDataType = iota + IsMetaPod + IsMetaReplicaVMI + IsMetaReplicaPod ) // VM instance meta data structure. type vmiMetaData struct { - vmi *v1.VirtualMachineInstance // Handle to the VM instance - pod *k8sv1.Pod // Handle to the pod container - domainID int // DomainID understood by domainmgr in EVE - isPod bool // switch on is Pod or is VMI - name string // Display-Name(all lower case) + first 5 bytes of domainName - cputotal uint64 // total CPU in NS so far - maxmem uint32 // total Max memory usage in bytes so far + repPod *appsv1.ReplicaSet // Handle to the replicaSetof pod + repVMI *v1.VirtualMachineInstanceReplicaSet // Handle to the replicaSet of VMI + domainID int // DomainID understood by domainmgr in EVE + mtype MetaDataType // switch on is ReplicaSet, Pod or is VMI + name string // Display-Name(all lower case) + first 5 bytes of domainName + cputotal uint64 // total CPU in NS so far + maxmem uint32 // total Max memory usage in bytes so far } type kubevirtContext struct { @@ -66,12 +81,14 @@ type kubevirtContext struct { virthandlerIPAddr string prevDomainMetric map[string]types.DomainMetric kubeConfig *rest.Config + nodeNameMap map[string]string // to pass nodeName between methods without pointer receiver } // Use few states for now var stateMap = map[string]types.SwState{ "Paused": types.PAUSED, "Running": types.RUNNING, + "NonLocal": types.RUNNING, "shutdown": types.HALTING, "suspended": types.PAUSED, "Pending": types.PENDING, @@ -131,6 +148,7 @@ func newKubevirt() Hypervisor { devicemodel: "virt", vmiList: make(map[string]*vmiMetaData), prevDomainMetric: make(map[string]types.DomainMetric), + nodeNameMap: make(map[string]string), } case "amd64": return kubevirtContext{ @@ -138,6 +156,7 @@ func newKubevirt() Hypervisor { devicemodel: "pc-q35-3.1", vmiList: make(map[string]*vmiMetaData), prevDomainMetric: make(map[string]types.DomainMetric), + nodeNameMap: make(map[string]string), } } return nil @@ -197,13 +216,15 @@ func (ctx kubevirtContext) Setup(status types.DomainStatus, config types.DomainC "for app %s", config.DisplayName) } + getMyNodeUUID(&ctx, status.NodeName) + if config.VirtualizationMode == types.NOHYPER { - if err := ctx.CreatePodConfig(domainName, config, status, diskStatusList, aa, file); err != nil { - return logError("failed to build kube pod config: %v", err) + if err := ctx.CreateReplicaPodConfig(domainName, config, status, diskStatusList, aa, file); err != nil { + return logError("failed to build kube replicaset config: %v", err) } } else { - // Take eve domain config and convert to VMI config - if err := ctx.CreateVMIConfig(domainName, config, status, diskStatusList, aa, file); err != nil { + // Take eve domain config and convert to VMI Replicaset config + if err := ctx.CreateReplicaVMIConfig(domainName, config, status, diskStatusList, aa, file); err != nil { return logError("failed to build kube config: %v", err) } } @@ -212,12 +233,12 @@ func (ctx kubevirtContext) Setup(status types.DomainStatus, config types.DomainC } -// Kubevirt VMI config spec is updated with the domain config/status of the app. +// Kubevirt VMI ReplicaSet config spec is updated with the domain config/status of the app. // The details and the struct of the spec can be found at: // https://kubevirt.io/api-reference/v1.0.0/definitions.html -func (ctx kubevirtContext) CreateVMIConfig(domainName string, config types.DomainConfig, status types.DomainStatus, +func (ctx kubevirtContext) CreateReplicaVMIConfig(domainName string, config types.DomainConfig, status types.DomainStatus, diskStatusList []types.DiskStatus, aa *types.AssignableAdapters, file *os.File) error { - logrus.Debugf("CreateVMIConfig called for Domain: %s", domainName) + logrus.Debugf("CreateReplicaVMIConfig called for Domain: %s", domainName) err := getConfig(&ctx) if err != nil { @@ -225,12 +246,15 @@ func (ctx kubevirtContext) CreateVMIConfig(domainName string, config types.Domai } kvClient, err := kubecli.GetKubevirtClientFromRESTConfig(ctx.kubeConfig) - if err != nil { logrus.Errorf("couldn't get the kubernetes client API config: %v", err) return err } + nodeName, ok := ctx.nodeNameMap["nodename"] + if !ok { + return logError("Failed to get nodeName") + } kubeName := base.GetAppKubeName(config.DisplayName, config.UUIDandVersion.UUID) // Get a VirtualMachineInstance object and populate the values from DomainConfig vmi := v1.NewVMIReferenceFromNameWithNS(kubeapi.EVEKubeNameSpace, kubeName) @@ -242,7 +266,7 @@ func (ctx kubevirtContext) CreateVMIConfig(domainName string, config types.Domai // Set memory mem := v1.Memory{} - m, err := resource.ParseQuantity(convertToKubernetesFormat(config.Memory * 1024)) // To bytes from KB + m, err := resource.ParseQuantity(convertToKubernetesFormat(config.Memory * 1024)) if err != nil { logrus.Errorf("Could not parse the memory value %v", err) return err @@ -290,7 +314,6 @@ func (ctx kubevirtContext) CreateVMIConfig(domainName string, config types.Domai vols := make([]v1.Volume, len(diskStatusList)) ndisks := len(diskStatusList) for i, ds := range diskStatusList { - diskName := "disk" + strconv.Itoa(i+1) // Domainmgr sets devtype 9P for container images. Though in kubevirt container image is @@ -336,7 +359,6 @@ func (ctx kubevirtContext) CreateVMIConfig(domainName string, config types.Domai }, } } else { - pvcName, err := ds.GetPVCNameFromVolumeKey() if err != nil { return logError("Failed to fetch PVC Name from volumekey %v", ds.VolumeKey) @@ -361,7 +383,6 @@ func (ctx kubevirtContext) CreateVMIConfig(domainName string, config types.Domai }, } } - } vmi.Spec.Domain.Devices.Disks = disks[0:ndisks] vmi.Spec.Volumes = vols[0:ndisks] @@ -398,7 +419,6 @@ func (ctx kubevirtContext) CreateVMIConfig(domainName string, config types.Domai tap := pciDevice{pciLong: ib.PciLong, ioType: ib.Type} pciAssignments = addNoDuplicatePCI(pciAssignments, tap) } - } } @@ -414,23 +434,58 @@ func (ctx kubevirtContext) CreateVMIConfig(domainName string, config types.Domai } } + // Set the affinity to this node the VMI is preferred to run on + affinity := setKubeAffinity(nodeName) + + // Set tolerations to handle node conditions + tolerations := setKubeToleration(int64(tolerateSec)) + + vmi.Spec.Affinity = affinity + vmi.Spec.Tolerations = tolerations vmi.Labels = make(map[string]string) vmi.Labels[eveLabelKey] = domainName + // Create a VirtualMachineInstanceReplicaSet + replicaSet := &v1.VirtualMachineInstanceReplicaSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: kubeName, + Namespace: kubeapi.EVEKubeNameSpace, + }, + Spec: v1.VirtualMachineInstanceReplicaSetSpec{ + Replicas: pointer.Int32Ptr(1), + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + eveLabelKey: domainName, + }, + }, + Template: &v1.VirtualMachineInstanceTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + eveLabelKey: domainName, + }, + }, + Spec: vmi.Spec, + }, + }, + } + + logrus.Infof("CreateReplicaVMIConfig: VirtualMachineInstanceReplicaSet: %+v", replicaSet) + // Now we have VirtualMachine Instance object, save it to config file for debug purposes // and save it in context which will be used to start VM in Start() call // dispName is for vmi name/handle on kubernetes meta := vmiMetaData{ - vmi: vmi, + repVMI: replicaSet, name: kubeName, + mtype: IsMetaReplicaVMI, domainID: int(rand.Uint32()), } ctx.vmiList[domainName] = &meta - vmiStr := fmt.Sprintf("%+v", vmi) + repvmiStr := fmt.Sprintf("%+v", replicaSet) // write to config file - file.WriteString(vmiStr) + file.WriteString(repvmiStr) return nil } @@ -438,56 +493,64 @@ func (ctx kubevirtContext) CreateVMIConfig(domainName string, config types.Domai func (ctx kubevirtContext) Start(domainName string) error { logrus.Debugf("Starting Kubevirt domain %s", domainName) + nodeName, ok := ctx.nodeNameMap["nodename"] + if !ok { + return logError("Failed to get nodeName") + } + err := getConfig(&ctx) if err != nil { return err } kubeconfig := ctx.kubeConfig + logrus.Infof("Starting Kubevirt domain %s, devicename nodename %d", domainName, len(ctx.nodeNameMap)) vmis, ok := ctx.vmiList[domainName] if !ok { return logError("start domain %s failed to get vmlist", domainName) } - if vmis.isPod { - err := StartPodContainer(kubeconfig, ctx.vmiList[domainName].pod) + + // Start the Pod ReplicaSet + if vmis.mtype == IsMetaReplicaPod { + err := StartReplicaPodContiner(ctx, ctx.vmiList[domainName].repPod) return err + } else if vmis.mtype != IsMetaReplicaVMI { + return logError("Start domain %s wrong type", domainName) } - vmi := vmis.vmi + // Start the VMI ReplicaSet + repvmi := vmis.repVMI virtClient, err := kubecli.GetKubevirtClientFromRESTConfig(kubeconfig) - if err != nil { logrus.Errorf("couldn't get the kubernetes client API config: %v", err) return err } - // Create the VM + // Create the VMI ReplicaSet i := 5 for { - _, err = virtClient.VirtualMachineInstance(kubeapi.EVEKubeNameSpace).Create(context.Background(), vmi) + _, err = virtClient.ReplicaSet(kubeapi.EVEKubeNameSpace).Create(repvmi) if err != nil { if strings.Contains(err.Error(), "dial tcp 127.0.0.1:6443") && i <= 0 { - logrus.Infof("Start VM failed %v\n", err) + logrus.Infof("Start VMI replicaset failed %v\n", err) return err } time.Sleep(10 * time.Second) - logrus.Infof("Start VM failed, retry (%d) err %v", i, err) + logrus.Infof("Start VMI replicaset failed, retry (%d) err %v", i, err) } else { break } i = i - 1 } + logrus.Infof("Started Kubevirt domain replicaset %s, VMI replicaset %s", domainName, vmis.name) - logrus.Infof("Started Kubevirt domain %s", domainName) - - err = waitForVMI(vmis.name, true) + err = waitForVMI(vmis.name, nodeName, true) if err != nil { logrus.Errorf("couldn't start VMI %v", err) return err } return nil - } // Create is no-op for kubevirt, just return the domainID we already have. @@ -508,22 +571,16 @@ func (ctx kubevirtContext) Stop(domainName string, force bool) error { if !ok { return logError("domain %s failed to get vmlist", domainName) } - if vmis.isPod { - err := StopPodContainer(kubeconfig, vmis.name) - return err + if vmis.mtype == IsMetaReplicaPod { + err = StopReplicaPodContainer(kubeconfig, vmis.name) + } else if vmis.mtype == IsMetaReplicaVMI { + err = StopReplicaVMI(kubeconfig, vmis.name) } else { - virtClient, err := kubecli.GetKubevirtClientFromRESTConfig(kubeconfig) - if err != nil { - logrus.Errorf("couldn't get the kubernetes client API config: %v", err) - return err - } + return logError("Stop domain %s wrong type", domainName) + } - // Stop the VM - err = virtClient.VirtualMachineInstance(kubeapi.EVEKubeNameSpace).Delete(context.Background(), vmis.name, &metav1.DeleteOptions{}) - if err != nil { - logrus.Errorf("Stop error %v\n", err) - return err - } + if err != nil { + return err } delete(ctx.vmiList, domainName) @@ -545,27 +602,16 @@ func (ctx kubevirtContext) Delete(domainName string) (result error) { if !ok { return logError("delete domain %s failed to get vmlist", domainName) } - if vmis.isPod { - err := StopPodContainer(kubeconfig, vmis.name) - return err + if vmis.mtype == IsMetaReplicaPod { + err = StopReplicaPodContainer(kubeconfig, vmis.name) + } else if vmis.mtype == IsMetaReplicaVMI { + err = StopReplicaVMI(kubeconfig, vmis.name) } else { - virtClient, err := kubecli.GetKubevirtClientFromRESTConfig(kubeconfig) - - if err != nil { - logrus.Errorf("couldn't get the kubernetes client API config: %v", err) - return err - } - - // Stop the VM - err = virtClient.VirtualMachineInstance(kubeapi.EVEKubeNameSpace).Delete(context.Background(), vmis.name, &metav1.DeleteOptions{}) + return logError("delete domain %s wrong type", domainName) + } - // May be already deleted during Stop action, so its not an error if does not exist - if errors.IsNotFound(err) { - logrus.Infof("Domain already deleted: %v", domainName) - } else { - fmt.Printf("Delete error %v\n", err) - return err - } + if err != nil { + return err } // Delete the state dir @@ -584,9 +630,33 @@ func (ctx kubevirtContext) Delete(domainName string) (result error) { return nil } +// StopReplicaVMI stops the VMI ReplicaSet +func StopReplicaVMI(kubeconfig *rest.Config, repVmiName string) error { + virtClient, err := kubecli.GetKubevirtClientFromRESTConfig(kubeconfig) + if err != nil { + logrus.Errorf("couldn't get the kubernetes client API config: %v", err) + return err + } + + // Stop the VMI ReplicaSet + err = virtClient.ReplicaSet(kubeapi.EVEKubeNameSpace).Delete(repVmiName, &metav1.DeleteOptions{}) + if errors.IsNotFound(err) { + logrus.Infof("Stop VMI Replicaset, Domain already deleted: %v", repVmiName) + } else { + logrus.Errorf("Stop VMI Replicaset error %v\n", err) + return err + } + + return nil +} + func (ctx kubevirtContext) Info(domainName string) (int, types.SwState, error) { logrus.Debugf("Info called for Domain: %s", domainName) + nodeName, ok := ctx.nodeNameMap["nodename"] + if !ok { + return 0, types.BROKEN, logError("Failed to get nodeName") + } var res string var err error @@ -598,10 +668,10 @@ func (ctx kubevirtContext) Info(domainName string) (int, types.SwState, error) { if !ok { return 0, types.HALTED, logError("info domain %s failed to get vmlist", domainName) } - if vmis.isPod { - res, err = InfoPodContainer(ctx.kubeConfig, vmis.name) + if vmis.mtype == IsMetaReplicaPod { + res, err = InfoReplicaSetContainer(ctx, vmis.name) } else { - res, err = getVMIStatus(vmis.name) + res, err = getVMIStatus(vmis.name, nodeName) } if err != nil { return 0, types.BROKEN, logError("domain %s failed to get info: %v", domainName, err) @@ -622,15 +692,25 @@ func (ctx kubevirtContext) Cleanup(domainName string) error { if err := ctx.ctrdContext.Cleanup(domainName); err != nil { return fmt.Errorf("couldn't cleanup task %s: %v", domainName, err) } + nodeName, ok := ctx.nodeNameMap["nodename"] + if !ok { + return logError("Cleanup: Failed to get nodeName") + } var err error vmis, ok := ctx.vmiList[domainName] if !ok { return logError("cleanup domain %s failed to get vmlist", domainName) } - if vmis.isPod { + if vmis.mtype == IsMetaReplicaPod { + _, err = InfoReplicaSetContainer(ctx, vmis.name) + if err == nil { + err = ctx.Delete(domainName) + } + } else if vmis.mtype == IsMetaReplicaVMI { + err = waitForVMI(vmis.name, nodeName, false) } else { - err = waitForVMI(vmis.name, false) + err = logError("cleanup domain %s wrong type", domainName) } if err != nil { return fmt.Errorf("waitforvmi failed %s: %v", domainName, err) @@ -655,7 +735,7 @@ func convertToKubernetesFormat(b int) string { return fmt.Sprintf("%.1fYi", bf) } -func getVMIStatus(vmiName string) (string, error) { +func getVMIStatus(repVmiName, nodeName string) (string, error) { kubeconfig, err := kubeapi.GetKubeConfig() if err != nil { @@ -668,20 +748,42 @@ func getVMIStatus(vmiName string) (string, error) { return "", logError("couldn't get the Kube client Config: %v", err) } - // Get the VMI info - vmi, err := virtClient.VirtualMachineInstance(kubeapi.EVEKubeNameSpace).Get(context.Background(), vmiName, &metav1.GetOptions{}) - + // List VMIs with a label selector that matches the replicaset name + vmiList, err := virtClient.VirtualMachineInstance(kubeapi.EVEKubeNameSpace).List(context.Background(), &metav1.ListOptions{}) if err != nil { - return "", logError("domain %s failed to get VMI info %s", vmiName, err) + return "", logError("getVMIStatus: domain %s failed to get VMI info %s", repVmiName, err) + } + if len(vmiList.Items) == 0 { + return "", logError("getVMIStatus: No VMI found with the given replicaset name %s", repVmiName) } - res := fmt.Sprintf("%v", vmi.Status.Phase) - + // Use the first VMI in the list + var foundNonlocal bool + var targetVMI *v1.VirtualMachineInstance + for _, vmi := range vmiList.Items { + if vmi.Status.NodeName == nodeName { + if vmi.GenerateName == repVmiName { + targetVMI = &vmi + break + } + } else { + if vmi.GenerateName == repVmiName { + foundNonlocal = true + } + } + } + if targetVMI == nil { + if foundNonlocal { + return "NonLocal", nil + } + return "", logError("getVMIStatus: No VMI %s found with the given nodeName %s", repVmiName, nodeName) + } + res := fmt.Sprintf("%v", targetVMI.Status.Phase) return res, nil } // Inspired from kvm.go -func waitForVMI(vmiName string, available bool) error { +func waitForVMI(vmiName, nodeName string, available bool) error { maxDelay := time.Minute * 5 // 5mins ?? lets keep it for now delay := time.Second var waited time.Duration @@ -693,8 +795,7 @@ func waitForVMI(vmiName string, available bool) error { waited += delay } - state, err := getVMIStatus(vmiName) - + state, err := getVMIStatus(vmiName, nodeName) if err != nil { if available { @@ -728,17 +829,22 @@ func waitForVMI(vmiName string, available bool) error { func (ctx kubevirtContext) GetDomsCPUMem() (map[string]types.DomainMetric, error) { logrus.Debugf("GetDomsCPUMem: enter") + nodeName, ok := ctx.nodeNameMap["nodename"] + if !ok { + return nil, nil + } res := make(kubevirtMetrics, len(ctx.vmiList)) - virtIP, err := getVirtHandlerIPAddr(&ctx) + virtIP, err := getVirtHandlerIPAddr(&ctx, nodeName) if err != nil { - logrus.Errorf("GetDomsCPUMem get virthandler ip error %v", err) + logrus.Debugf("GetDomsCPUMem get virthandler ip error %v", err) return nil, err } url := "https://" + virtIP + ":8443/metrics" httpClient := &http.Client{ Transport: &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + DisableKeepAlives: true, }, } @@ -765,6 +871,7 @@ func (ctx kubevirtContext) GetDomsCPUMem() (map[string]types.DomainMetric, error return nil, err } + // It seems the virt-handler metrics only container the VMIs running on this node scanner := bufio.NewScanner(strings.NewReader(string(body))) for scanner.Scan() { line := scanner.Text() @@ -809,7 +916,7 @@ func (ctx kubevirtContext) GetDomsCPUMem() (map[string]types.DomainMetric, error var domainName string for n, vmis := range ctx.vmiList { - if vmis.name == vmiName { + if strings.HasPrefix(vmiName, vmis.name) { // handle the VMI ReplicaSet domainName = n if _, ok := res[domainName]; !ok { res[domainName] = types.DomainMetric{ @@ -852,15 +959,13 @@ func (ctx kubevirtContext) GetDomsCPUMem() (map[string]types.DomainMetric, error } hasEmptyRes := len(ctx.vmiList) - len(res) - if hasEmptyRes > 0 { - // check and get the kubernetes pod's metrics - checkPodMetrics(ctx, res, hasEmptyRes) - } + checkReplicaPodMetrics(ctx, res, hasEmptyRes) + logrus.Debugf("GetDomsCPUMem: %d VMs: %+v, podnum %d", len(ctx.vmiList), res, hasEmptyRes) return res, nil } -func getVirtHandlerIPAddr(ctx *kubevirtContext) (string, error) { +func getVirtHandlerIPAddr(ctx *kubevirtContext, nodeName string) (string, error) { if ctx.virthandlerIPAddr != "" { return ctx.virthandlerIPAddr, nil } @@ -879,6 +984,9 @@ func getVirtHandlerIPAddr(ctx *kubevirtContext) (string, error) { var vmiPod *k8sv1.Pod for _, pod := range pods.Items { + if nodeName != pod.Spec.NodeName { + continue + } if strings.HasPrefix(pod.ObjectMeta.Name, "virt-handler-") { vmiPod = &pod break @@ -886,7 +994,7 @@ func getVirtHandlerIPAddr(ctx *kubevirtContext) (string, error) { } if vmiPod == nil { - return "", fmt.Errorf("can not find virt-handler pod") + return "", fmt.Errorf("getVirtHandlerIPAddr: can not find virt-handler pod") } ctx.virthandlerIPAddr = vmiPod.Status.PodIP return ctx.virthandlerIPAddr, nil @@ -919,17 +1027,23 @@ func assignToInt64(parsedValue interface{}) int64 { return intValue } -func (ctx kubevirtContext) CreatePodConfig(domainName string, config types.DomainConfig, status types.DomainStatus, +func (ctx kubevirtContext) CreateReplicaPodConfig(domainName string, config types.DomainConfig, status types.DomainStatus, diskStatusList []types.DiskStatus, aa *types.AssignableAdapters, file *os.File) error { kubeName := base.GetAppKubeName(config.DisplayName, config.UUIDandVersion.UUID) if config.KubeImageName == "" { err := fmt.Errorf("domain config kube image name empty") - logrus.Errorf("CreatePodConfig: %v", err) + logrus.Errorf("CreateReplicaPodConfig: %v", err) return err } ociName := config.KubeImageName + logrus.Infof("CreateReplicaPodConfig: domainName %s, kubeName %s, nodeName %d", domainName, kubeName, len(ctx.nodeNameMap)) + nodeName, ok := ctx.nodeNameMap["nodename"] + if !ok { + return logError("Failed to get nodeName") + } + var netSelections []netattdefv1.NetworkSelectionElement for _, vif := range config.VifList { netSelections = append(netSelections, netattdefv1.NetworkSelectionElement{ @@ -950,7 +1064,7 @@ func (ctx kubevirtContext) CreatePodConfig(domainName string, config types.Domai // Check if the NAD is created in the cluster, return error if not err := kubeapi.CheckEtherPassThroughNAD(nadName) if err != nil { - logrus.Errorf("CreatePodConfig: check ether NAD failed, %v", err) + logrus.Errorf("CreateReplicaPodConfig: check ether NAD failed, %v", err) return err } } @@ -961,131 +1075,219 @@ func (ctx kubevirtContext) CreatePodConfig(domainName string, config types.Domai annotations = map[string]string{ "k8s.v1.cni.cncf.io/networks": encodeSelections(netSelections), } - logrus.Infof("CreatePodConfig: annotations %+v", annotations) + logrus.Infof("CreateReplicaPodConfig: annotations %+v", annotations) } else { - err := fmt.Errorf("CreatePodConfig: no network selections, exit") + err := fmt.Errorf("CreateReplicaPodConfig: no network selections, exit") return err } - vcpus := strconv.Itoa(config.VCpus*1000) + "m" + //vcpus := strconv.Itoa(config.VCpus*1000) + "m" // FixedResources.Memory is in Kbytes - memoryLimit := strconv.Itoa(config.Memory * 1000) - memoryRequest := strconv.Itoa(config.Memory * 1000) + //memoryLimit := "100Mi" // convertToKubernetesFormat(config.Memory * 1000) + //memoryRequest := memoryLimit - pod := &k8sv1.Pod{ + var replicaNum int32 + replicaNum = 1 + repNum := &replicaNum + replicaSet := &appsv1.ReplicaSet{ ObjectMeta: metav1.ObjectMeta{ - Name: kubeName, - Namespace: kubeapi.EVEKubeNameSpace, - Annotations: annotations, + Name: kubeName, + Namespace: kubeapi.EVEKubeNameSpace, }, - Spec: k8sv1.PodSpec{ - Containers: []k8sv1.Container{ - { - Name: kubeName, - Image: ociName, - ImagePullPolicy: k8sv1.PullNever, - SecurityContext: &k8sv1.SecurityContext{ - Privileged: &[]bool{true}[0], + Spec: appsv1.ReplicaSetSpec{ + Replicas: repNum, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": kubeName, + }, + }, + Template: k8sv1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": kubeName, }, - Resources: k8sv1.ResourceRequirements{ - Limits: k8sv1.ResourceList{ - k8sv1.ResourceCPU: resource.MustParse(vcpus), - k8sv1.ResourceMemory: resource.MustParse(memoryLimit), - }, - Requests: k8sv1.ResourceList{ - k8sv1.ResourceCPU: resource.MustParse(vcpus), - k8sv1.ResourceMemory: resource.MustParse(memoryRequest), + Annotations: annotations, + }, + Spec: k8sv1.PodSpec{ + Tolerations: setKubeToleration(int64(tolerateSec)), + Affinity: setKubeAffinity(nodeName), + Containers: []k8sv1.Container{ + { + Name: kubeName, + Image: ociName, + ImagePullPolicy: k8sv1.PullNever, + SecurityContext: &k8sv1.SecurityContext{ + Privileged: &[]bool{true}[0], + }, }, }, + RestartPolicy: k8sv1.RestartPolicyAlways, + DNSConfig: &k8sv1.PodDNSConfig{ + Nameservers: []string{"8.8.8.8", "1.1.1.1"}, // XXX, temp, Add your desired DNS servers here + }, }, }, - DNSConfig: &k8sv1.PodDNSConfig{ - Nameservers: []string{"8.8.8.8", "1.1.1.1"}, // XXX, temp, Add your desired DNS servers here - }, }, } - pod.Labels = make(map[string]string) - pod.Labels[eveLabelKey] = domainName - logrus.Infof("CreatePodConfig: pod setup %+v", pod) + logrus.Infof("CreateReplicaPodConfig: replicaset %+v", replicaSet) + + // Add pod non-image volume disks + if len(diskStatusList) > 1 { + length := len(diskStatusList) - 1 + for _, ds := range diskStatusList[1:] { + if ds.Devtype == "9P" { // skip 9P volume type + if length > 0 { + length-- + } else { + break + } + } + } + if length > 0 { + volumes := make([]k8sv1.Volume, length) + mounts := make([]k8sv1.VolumeMount, length) + + i := 0 + for _, ds := range diskStatusList[1:] { + if ds.Devtype == "9P" { + continue + } + voldispName := strings.ToLower(ds.DisplayName) + //voldevs[i] = k8sv1.VolumeDevice{ + // Name: voldispName, + // DevicePath: ds.MountDir, + //} + mounts[i] = k8sv1.VolumeMount{ + Name: voldispName, + MountPath: ds.MountDir, + } + + volumes[i].Name = voldispName + volumes[i].VolumeSource = k8sv1.VolumeSource{ + PersistentVolumeClaim: &k8sv1.PersistentVolumeClaimVolumeSource{ + ClaimName: strings.ToLower(ds.DisplayName), + //ClaimName: ds.VolumeKey, + }, + } + logrus.Infof("CreateReplicaPodConfig:(%d) mount[i] %+v, volumes[i] %+v", i, mounts[i], volumes[i]) + i++ + } + replicaSet.Spec.Template.Spec.Containers[0].VolumeMounts = mounts + replicaSet.Spec.Template.Spec.Volumes = volumes + } + } + logrus.Infof("CreateReplicaPodConfig: replicaset setup %+v", replicaSet) // Now we have VirtualMachine Instance object, save it to config file for debug purposes // and save it in context which will be used to start VM in Start() call meta := vmiMetaData{ - pod: pod, - isPod: true, + repPod: replicaSet, + mtype: IsMetaReplicaPod, name: kubeName, domainID: int(rand.Uint32()), } ctx.vmiList[domainName] = &meta - podStr := fmt.Sprintf("%+v", pod) + repStr := fmt.Sprintf("%+v", replicaSet) // write to config file - file.WriteString(podStr) + file.WriteString(repStr) return nil } -func encodeSelections(selections []netattdefv1.NetworkSelectionElement) string { - bytes, err := json.Marshal(selections) - if err != nil { - logrus.Errorf("encodeSelections %v", err) - return "" +func setKubeAffinity(nodeName string) *k8sv1.Affinity { + affinity := &k8sv1.Affinity{ + NodeAffinity: &k8sv1.NodeAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []k8sv1.PreferredSchedulingTerm{ + { + Preference: k8sv1.NodeSelectorTerm{ + MatchExpressions: []k8sv1.NodeSelectorRequirement{ + { + Key: "kubernetes.io/hostname", + Operator: "In", + Values: []string{nodeName}, + }, + }, + }, + Weight: 100, + }, + }, + }, } - return string(bytes) + return affinity } -// StartPodContainer : Starts container as kubernetes pod -func StartPodContainer(kubeconfig *rest.Config, pod *k8sv1.Pod) error { +func setKubeToleration(timeOutSec int64) []k8sv1.Toleration { + tolerations := []k8sv1.Toleration{ + { + Key: "node.kubernetes.io/unreachable", + Operator: k8sv1.TolerationOpExists, + Effect: k8sv1.TaintEffectNoExecute, + TolerationSeconds: pointer.Int64Ptr(timeOutSec), + }, + { + Key: "node.kubernetes.io/not-ready", + Operator: k8sv1.TolerationOpExists, + Effect: k8sv1.TaintEffectNoExecute, + TolerationSeconds: pointer.Int64Ptr(timeOutSec), + }, + } + return tolerations +} - clientset, err := kubernetes.NewForConfig(kubeconfig) +// StartReplicaPodContiner starts the ReplicaSet pod +func StartReplicaPodContiner(ctx kubevirtContext, rep *appsv1.ReplicaSet) error { + err := getConfig(&ctx) + if err != nil { + return err + } + clientset, err := kubernetes.NewForConfig(ctx.kubeConfig) if err != nil { - logrus.Errorf("StartPodContainer: can't get clientset %v", err) + logrus.Errorf("StartReplicaPodContiner: can't get clientset %v", err) return err } opStr := "created" - _, err = clientset.CoreV1().Pods(kubeapi.EVEKubeNameSpace).Create(context.TODO(), pod, metav1.CreateOptions{}) + result, err := clientset.AppsV1().ReplicaSets(kubeapi.EVEKubeNameSpace).Create(context.TODO(), rep, metav1.CreateOptions{}) if err != nil { if !errors.IsAlreadyExists(err) { - // TODO: update - logrus.Errorf("StartPodContainer: pod create filed: %v", err) + logrus.Errorf("StartReplicaPodContiner: replicaset create failed: %v", err) return err } else { opStr = "already exists" } } - logrus.Infof("StartPodContainer: Pod %s %s with nad %+v", pod.ObjectMeta.Name, opStr, pod.Annotations) + logrus.Infof("StartReplicaPodContiner: Rep %s %s, result %v", rep.ObjectMeta.Name, opStr, result) - err = checkForPod(kubeconfig, pod.ObjectMeta.Name) + err = checkForReplicaPod(ctx, rep.ObjectMeta.Name) if err != nil { - logrus.Errorf("StartPodContainer: check for pod status error %v", err) + logrus.Errorf("StartReplicaPodContiner: check for pod status error %v", err) return err } - logrus.Infof("StartPodContainer: Pod %s running", pod.ObjectMeta.Name) + logrus.Infof("StartReplicaPodContiner: Pod %s running", rep.ObjectMeta.Name) return nil } -func checkForPod(kubeconfig *rest.Config, podName string) error { +func checkForReplicaPod(ctx kubevirtContext, repName string) error { var i int var status string var err error - // wait for pod to be in running state, sometimes can take long, but we only wait for - // about a minute in order not to cause watchdog action for { i++ - logrus.Infof("checkForPod: check(%d) wait 15 sec, %v", i, podName) - time.Sleep(waitForPodCheckTime * time.Second) + logrus.Infof("checkForReplicaPod: check(%d) wait 15 sec, %v", i, repName) + time.Sleep(15 * time.Second) - status, err = InfoPodContainer(kubeconfig, podName) + status, err = InfoReplicaSetContainer(ctx, repName) if err != nil { - logrus.Infof("checkForPod: podName %s, %v", podName, err) + logrus.Infof("checkForReplicaPod: repName %s, %v", repName, err) } else { - if status == "Running" { + if status == "Running" || status == "NonLocal" { + logrus.Infof("checkForReplicaPod: (%d) status %s, good", i, status) return nil } else { - logrus.Errorf("checkForPod: get podName info status %v (not running)", status) + logrus.Errorf("checkForReplicaPod(%d): get podName info status %v (not running)", i, status) } } if i > waitForPodCheckCounter { @@ -1093,64 +1295,70 @@ func checkForPod(kubeconfig *rest.Config, podName string) error { } } - return fmt.Errorf("checkForPod: timed out, statuus %s, err %v", status, err) + return fmt.Errorf("checkForReplicaPod: timed out, statuus %s, err %v", status, err) } -// StopPodContainer : Stops the running kubernetes pod -func StopPodContainer(kubeconfig *rest.Config, podName string) error { +// InfoReplicaSetContainer gets the status of the ReplicaSet pod +func InfoReplicaSetContainer(ctx kubevirtContext, repName string) (string, error) { - clientset, err := kubernetes.NewForConfig(kubeconfig) + err := getConfig(&ctx) if err != nil { - logrus.Errorf("StopPodContainer: can't get clientset %v", err) - return err + return "", err } - - err = clientset.CoreV1().Pods(kubeapi.EVEKubeNameSpace).Delete(context.TODO(), podName, metav1.DeleteOptions{}) + podclientset, err := kubernetes.NewForConfig(ctx.kubeConfig) if err != nil { - // Handle error - logrus.Errorf("StopPodContainer: deleting pod: %v", err) - return err + return "", logError("InfoReplicaSetContainer: couldn't get the pod Config: %v", err) } - logrus.Infof("StopPodContainer: Pod %s deleted", podName) - return nil -} - -// InfoPodContainer : Get the pod information -func InfoPodContainer(kubeconfig *rest.Config, podName string) (string, error) { - - podclientset, err := kubernetes.NewForConfig(kubeconfig) - if err != nil { - return "", logError("InfoPodContainer: couldn't get the pod Config: %v", err) + nodeName, ok := ctx.nodeNameMap["nodename"] + if !ok { + return "", logError("Failed to get nodeName") } - - pod, err := podclientset.CoreV1().Pods(kubeapi.EVEKubeNameSpace).Get(context.TODO(), podName, metav1.GetOptions{}) + pods, err := podclientset.CoreV1().Pods(kubeapi.EVEKubeNameSpace).List(context.TODO(), metav1.ListOptions{ + LabelSelector: fmt.Sprintf("app=%s", repName), + }) if err != nil { - return "", logError("InfoPodContainer: couldn't get the pod: %v", err) + return "", logError("InfoReplicaSetContainer: couldn't get the pods: %v", err) } - var res string - // https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/ - switch pod.Status.Phase { - case k8sv1.PodPending: - res = "Pending" - case k8sv1.PodRunning: - res = "Running" - case k8sv1.PodSucceeded: - res = "Running" - case k8sv1.PodFailed: - res = "Failed" - case k8sv1.PodUnknown: - res = "Scheduling" - default: - res = "Scheduling" - } - logrus.Infof("InfoPodContainer: pod %s, status %s", podName, res) + var foundNonlocal bool + for _, pod := range pods.Items { + if nodeName != pod.Spec.NodeName { + foundNonlocal = true + logrus.Infof("InfoReplicaSetContainer: rep %s, nodeName %v differ w/ hostname", repName, pod.Spec.NodeName) + continue + } - return res, nil + var res string + // https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/ + switch pod.Status.Phase { + case k8sv1.PodPending: + res = "Pending" + case k8sv1.PodRunning: + res = "Running" + case k8sv1.PodSucceeded: + res = "Running" + case k8sv1.PodFailed: + res = "Failed" + case k8sv1.PodUnknown: + res = "Scheduling" + default: + res = "Scheduling" + } + logrus.Infof("InfoReplicaSetContainer: rep %s, nodeName %v, status %s", pod.ObjectMeta.Name, pod.Spec.NodeName, res) + if pod.Status.Phase != k8sv1.PodRunning { + continue + } + + return res, nil + } + if foundNonlocal { + return "NonLocal", nil + } + return "", logError("InfoReplicaSetContainer: pod not ready") } -func checkPodMetrics(ctx kubevirtContext, res map[string]types.DomainMetric, emptySlot int) { +func checkReplicaPodMetrics(ctx kubevirtContext, res map[string]types.DomainMetric, emptySlot int) { err := getConfig(&ctx) if err != nil { @@ -1159,74 +1367,187 @@ func checkPodMetrics(ctx kubevirtContext, res map[string]types.DomainMetric, emp kubeconfig := ctx.kubeConfig podclientset, err := kubernetes.NewForConfig(kubeconfig) if err != nil { - logrus.Errorf("checkPodMetrics: can not get pod client %v", err) + logrus.Errorf("checkReplicaPodMetrics: can not get pod client %v", err) return } clientset, err := metricsv.NewForConfig(kubeconfig) if err != nil { - logrus.Errorf("checkPodMetrics: can't get clientset %v", err) + logrus.Errorf("checkReplicaPodMetrics: can't get clientset %v", err) + return + } + + nodeName, ok := ctx.nodeNameMap["nodename"] + if !ok { + logrus.Errorf("checkReplicaPodMetrics: can't get node name") // XXX may remove return } count := 0 for n, vmis := range ctx.vmiList { - if !vmis.isPod { + if vmis.mtype != IsMetaReplicaPod { continue } count++ - podName := vmis.name - pod, err := podclientset.CoreV1().Pods(kubeapi.EVEKubeNameSpace).Get(context.TODO(), podName, metav1.GetOptions{}) + repName := vmis.name + pods, err := podclientset.CoreV1().Pods(kubeapi.EVEKubeNameSpace).List(context.TODO(), metav1.ListOptions{ + LabelSelector: fmt.Sprintf("app=%s", repName), + }) if err != nil { - logrus.Errorf("checkPodMetrics: can't get pod %v", err) + logrus.Errorf("checkReplicaPodMetrics: can't get pod %v", err) continue } - memoryLimits := pod.Spec.Containers[0].Resources.Limits.Memory() - metrics, err := clientset.MetricsV1beta1().PodMetricses(kubeapi.EVEKubeNameSpace).Get(context.TODO(), podName, metav1.GetOptions{}) - if err != nil { - logrus.Errorf("checkPodMetrics: get pod metrics error %v", err) - continue + for _, pod := range pods.Items { + dm := getPodMetrics(clientset, pod, vmis, nodeName, res) + if dm != nil { + if count <= emptySlot { + res[n] = *dm + } + logrus.Infof("checkReplicaPodMetrics: dm %+v, res %v", dm, res) + + ctx.vmiList[n] = vmis // update for the last seen metrics + } } + } - cpuTotalNs := metrics.Containers[0].Usage[k8sv1.ResourceCPU] - cpuTotalNsAsFloat64 := cpuTotalNs.AsApproximateFloat64() * float64(time.Second) // get nanoseconds - totalCPU := uint64(cpuTotalNsAsFloat64) + logrus.Infof("checkReplicaPodMetrics: done with vmiList") +} + +func getPodMetrics(clientset *metricsv.Clientset, pod k8sv1.Pod, vmis *vmiMetaData, + nodeName string, res map[string]types.DomainMetric) *types.DomainMetric { + if pod.Status.Phase != k8sv1.PodRunning { + return nil + } + if nodeName != pod.Spec.NodeName { // cluster, pod from other nodes + return nil + } + podName := pod.ObjectMeta.Name + memoryLimits := pod.Spec.Containers[0].Resources.Limits.Memory() - //allocatedMemory := metrics.Containers[0].Usage[k8sv1.ResourceMemory] - usedMemory := metrics.Containers[0].Usage[k8sv1.ResourceMemory] - maxMemory := uint32(usedMemory.Value()) + metrics, err := clientset.MetricsV1beta1().PodMetricses(kubeapi.EVEKubeNameSpace).Get(context.TODO(), podName, metav1.GetOptions{}) + if err != nil { + logrus.Errorf("getPodMetrics: get pod metrics error %v", err) + return nil + } + + cpuTotalNs := metrics.Containers[0].Usage[k8sv1.ResourceCPU] + cpuTotalNsAsFloat64 := cpuTotalNs.AsApproximateFloat64() * float64(time.Second) // get nanoseconds + totalCPU := uint64(cpuTotalNsAsFloat64) + + //allocatedMemory := metrics.Containers[0].Usage[k8sv1.ResourceMemory] + usedMemory := metrics.Containers[0].Usage[k8sv1.ResourceMemory] + maxMemory := uint32(usedMemory.Value()) + if vmis != nil { if vmis.maxmem < maxMemory { vmis.maxmem = maxMemory } else { maxMemory = vmis.maxmem } + } - available := uint32(memoryLimits.Value()) - if uint32(usedMemory.Value()) < available { - available = available - uint32(usedMemory.Value()) - } - usedMemoryPercent := calculateMemoryUsagePercent(usedMemory.Value(), memoryLimits.Value()) - BytesInMegabyte := uint32(1024 * 1024) + available := uint32(memoryLimits.Value()) + if uint32(usedMemory.Value()) < available { + available = available - uint32(usedMemory.Value()) + } + usedMemoryPercent := calculateMemoryUsagePercent(usedMemory.Value(), memoryLimits.Value()) + BytesInMegabyte := uint32(1024 * 1024) - realCPUTotal := vmis.cputotal + totalCPU + var realCPUTotal uint64 + if vmis != nil { + realCPUTotal = vmis.cputotal + totalCPU vmis.cputotal = realCPUTotal - dm := types.DomainMetric{ - CPUTotalNs: realCPUTotal, - CPUScaled: 1, - AllocatedMB: uint32(memoryLimits.Value()) / BytesInMegabyte, - UsedMemory: uint32(usedMemory.Value()) / BytesInMegabyte, - MaxUsedMemory: maxMemory / BytesInMegabyte, - AvailableMemory: available / BytesInMegabyte, - UsedMemoryPercent: usedMemoryPercent, - } - if count <= emptySlot { - res[n] = dm - } - logrus.Infof("checkPodMetrics: dm %+v, res %v", dm, res) + } + dm := &types.DomainMetric{ + CPUTotalNs: realCPUTotal, + CPUScaled: 1, + AllocatedMB: uint32(memoryLimits.Value()) / BytesInMegabyte, + UsedMemory: uint32(usedMemory.Value()) / BytesInMegabyte, + MaxUsedMemory: maxMemory / BytesInMegabyte, + AvailableMemory: available / BytesInMegabyte, + UsedMemoryPercent: usedMemoryPercent, + NodeName: pod.Spec.NodeName, + } + logrus.Infof("getPodMetrics: dm %+v, res %v", dm, res) + return dm +} + +// StopReplicaPodContainer stops the ReplicaSet pod +func StopReplicaPodContainer(kubeconfig *rest.Config, repName string) error { + + clientset, err := kubernetes.NewForConfig(kubeconfig) + if err != nil { + logrus.Errorf("StopReplicaPodContainer: can't get clientset %v", err) + return err + } + + err = clientset.AppsV1().ReplicaSets(kubeapi.EVEKubeNameSpace).Delete(context.TODO(), repName, metav1.DeleteOptions{}) + if err != nil { + // Handle error + logrus.Errorf("StopReplicaPodContainer: deleting pod: %v", err) + return err + } + + logrus.Infof("StopReplicaPodContainer: Pod %s deleted", repName) + return nil +} - ctx.vmiList[n] = vmis // update for the last seen metrics +func encodeSelections(selections []netattdefv1.NetworkSelectionElement) string { + bytes, err := json.Marshal(selections) + if err != nil { + logrus.Errorf("encodeSelections %v", err) + return "" } + return string(bytes) +} + +// InfoPodContainer : Get the pod information +func InfoPodContainer(ctx kubevirtContext, podName string) (string, error) { + err := getConfig(&ctx) + if err != nil { + return "", err + } + podclientset, err := kubernetes.NewForConfig(ctx.kubeConfig) + if err != nil { + return "", logError("InfoPodContainer: couldn't get the pod Config: %v", err) + } + + nodeName, ok := ctx.nodeNameMap["nodename"] + if !ok { + return "", logError("Failed to get nodeName") + } + + pod, err := podclientset.CoreV1().Pods(kubeapi.EVEKubeNameSpace).Get(context.TODO(), podName, metav1.GetOptions{}) + if err != nil { + return "", logError("InfoPodContainer: couldn't get the pod: %v", err) + } + + if nodeName != pod.Spec.NodeName { + logrus.Infof("InfoPodContainer: pod %s, nodeName %v differ w/ hostname", podName, pod.Spec.NodeName) + return "", nil + } else { + logrus.Infof("InfoPodContainer: pod %s, nodeName %v, matches the hostname uuid", podName, pod.Spec.NodeName) + } + + var res string + // https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/ + switch pod.Status.Phase { + case k8sv1.PodPending: + res = "Pending" + case k8sv1.PodRunning: + res = "Running" + case k8sv1.PodSucceeded: + res = "Running" + case k8sv1.PodFailed: + res = "Failed" + case k8sv1.PodUnknown: + res = "Scheduling" + default: + res = "Scheduling" + } + logrus.Infof("InfoPodContainer: pod %s, nodeName %v, status %s", podName, pod.Spec.NodeName, res) + + return res, nil } // Helper function to calculate the memory usage percentage @@ -1378,3 +1699,9 @@ func (ctx kubevirtContext) VirtualTPMTerminate(domainName string, wp *types.Watc func (ctx kubevirtContext) VirtualTPMTeardown(domainName string, wp *types.WatchdogParam) error { return fmt.Errorf("not implemented") } + +func getMyNodeUUID(ctx *kubevirtContext, nodeName string) { + if len(ctx.nodeNameMap) == 0 { + ctx.nodeNameMap["nodename"] = nodeName + } +} diff --git a/pkg/pillar/hypervisor/kubevirt_test.go b/pkg/pillar/hypervisor/kubevirt_test.go new file mode 100644 index 0000000000..aa136299b8 --- /dev/null +++ b/pkg/pillar/hypervisor/kubevirt_test.go @@ -0,0 +1,108 @@ +// Copyright (c) 2024 Zededa, Inc. +// SPDX-License-Identifier: Apache-2.0 +// +//go:build kubevirt + +package hypervisor + +import ( + "net" + "os" + "testing" + + "github.com/lf-edge/eve/pkg/pillar/types" + uuid "github.com/satori/go.uuid" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" +) + +// k3sPem is used for a mock k3s.yaml file +const k3sPem = `apiVersion: v1 +clusters: +- cluster: + certificate-authority-data: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJlRENDQVIyZ0F3SUJBZ0lCQURBS0JnZ3Foa2pPUFFRREFqQWpNU0V3SHdZRFZRUUREQmhyTTNNdGMyVnkKZG1WeUxXTmhRREUzTWpnME1UTXpOelV3SGhjTk1qUXhNREE0TVRnME9UTTFXaGNOTXpReE1EQTJNVGcwT1RNMQpXakFqTVNFd0h3WURWUVFEREJock0zTXRjMlZ5ZG1WeUxXTmhRREUzTWpnME1UTXpOelV3V1RBVEJnY3Foa2pPClBRSUJCZ2dxaGtqT1BRTUJCd05DQUFRMWVZSEVBL2JZTUdyQ3oxV2ZlaUdmR1BuVE5TNkd3SjA0enRFVzNydjYKeWd3cElvYnhTR25GRXpTYTFSbDZpT0J5SE15MkdmdWdTMGkvQzVnSzJhaStvMEl3UURBT0JnTlZIUThCQWY4RQpCQU1DQXFRd0R3WURWUjBUQVFIL0JBVXdBd0VCL3pBZEJnTlZIUTRFRmdRVXQ1MThHdnNwZjJYOEwzTUxZREx2CllKUk9mNE13Q2dZSUtvWkl6ajBFQXdJRFNRQXdSZ0loQUpxdFFob3FaV2lLUWVwYnphTTFQcmtNWVk2KzFEekkKUllIdHF4cjhPQnAzQWlFQTlrNnNOcEhOemxXRW9XMHFkbmI2Q0pnWVBXenJsdW5YcjRrUmNKWUtpRzA9Ci0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0K + server: https://127.0.0.1:6443 + name: default +contexts: +- context: + cluster: default + user: default + name: default +current-context: default +kind: Config +preferences: {} +users: +- name: default + user: + client-certificate-data: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJrVENDQVRlZ0F3SUJBZ0lJRG1FcTFOaTZGdDR3Q2dZSUtvWkl6ajBFQXdJd0l6RWhNQjhHQTFVRUF3d1kKYXpOekxXTnNhV1Z1ZEMxallVQXhOekk0TkRFek16YzFNQjRYRFRJME1UQXdPREU0TkRrek5Wb1hEVEkxTVRBdwpPREU0TkRrek5Wb3dNREVYTUJVR0ExVUVDaE1PYzNsemRHVnRPbTFoYzNSbGNuTXhGVEFUQmdOVkJBTVRESE41CmMzUmxiVHBoWkcxcGJqQlpNQk1HQnlxR1NNNDlBZ0VHQ0NxR1NNNDlBd0VIQTBJQUJGR01VS2R4VFJobVR2MlcKNm50Z2UyVm9VUXRuWFRpeWpBTUptL01STTdkbnRTa3g5OXRGWGJUNUMxSUhkQmVLMXFSZ3RXclpjMmlZcWNMYQpzTTVVTlg2alNEQkdNQTRHQTFVZER3RUIvd1FFQXdJRm9EQVRCZ05WSFNVRUREQUtCZ2dyQmdFRkJRY0RBakFmCkJnTlZIU01FR0RBV2dCUzRIZjU3TjU1Nzl6ajRyTVdqK0hvSWp0MWcyakFLQmdncWhrak9QUVFEQWdOSUFEQkYKQWlCK0d2cDMwQnJJazk4UzZUeVdXbzI0VmRNZU1JZkdheW90REhhb0NsTnFrQUloQUxZTnZQK3dwTVFFV2pHRApkMDRvTWh0ckN3akNUblZFT3pvMXRtV3lQK0ZOCi0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0KLS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJkakNDQVIyZ0F3SUJBZ0lCQURBS0JnZ3Foa2pPUFFRREFqQWpNU0V3SHdZRFZRUUREQmhyTTNNdFkyeHAKWlc1MExXTmhRREUzTWpnME1UTXpOelV3SGhjTk1qUXhNREE0TVRnME9UTTFXaGNOTXpReE1EQTJNVGcwT1RNMQpXakFqTVNFd0h3WURWUVFEREJock0zTXRZMnhwWlc1MExXTmhRREUzTWpnME1UTXpOelV3V1RBVEJnY3Foa2pPClBRSUJCZ2dxaGtqT1BRTUJCd05DQUFRaXBVL3BoeWtINHFBYThsK3VwZmhtUk43M2tsbFI3VnhiZ1FGMU5GemcKTVRvUk5zSkxaYnFIY1NpMkk4RnMvNnBPZ1g4TlBlUHVOb01YYlFqaGJJTW9vMEl3UURBT0JnTlZIUThCQWY4RQpCQU1DQXFRd0R3WURWUjBUQVFIL0JBVXdBd0VCL3pBZEJnTlZIUTRFRmdRVXVCMytlemVlZS9jNCtLekZvL2g2CkNJN2RZTm93Q2dZSUtvWkl6ajBFQXdJRFJ3QXdSQUlnZklMeDNVUTlwZ2Z3VmZCRmh5aEo2YUhMeGkyQk03aGQKZ0YwalNhc1U1UndDSUE0OFN6NXpkaVhoNFJpdTVlZ2NtRnBXdkpyMXRKc1dCS25PQWt3clExdFgKLS0tLS1FTkQgQ0VSVElGSUNBVEUtLS0tLQo= + client-key-data: LS0tLS1CRUdJTiBFQyBQUklWQVRFIEtFWS0tLS0tCk1IY0NBUUVFSUNRK1NkV2QrdkwwaE5DOU4yeTVFMUxmMzF6a0I2SHdvTUw3UlVFTkZVTWpvQW9HQ0NxR1NNNDkKQXdFSG9VUURRZ0FFVVl4UXAzRk5HR1pPL1picWUyQjdaV2hSQzJkZE9MS01Bd21iOHhFenQyZTFLVEgzMjBWZAp0UGtMVWdkMEY0cldwR0MxYXRsemFKaXB3dHF3emxRMWZnPT0KLS0tLS1FTkQgRUMgUFJJVkFURSBLRVktLS0tLQo=` + +// TestCreateReplicaPodConfig tests the CreateReplicaPodConfig function +func TestCreateReplicaPodConfig(t *testing.T) { + // Set up sample inputs + domainName := "test-domain" + uuidValue, err := uuid.NewV4() + assert.NoError(t, err) + + config := types.DomainConfig{ + DisplayName: "test-app", + UUIDandVersion: types.UUIDandVersion{UUID: uuidValue}, + KubeImageName: "test-image", + VifList: []types.VifConfig{ + {Vif: "vif0", Bridge: "br0", Mac: net.HardwareAddr{0x52, 0x54, 0x00, 0x12, 0x34, 0x56}}, + {Vif: "vif1", Bridge: "br0", Mac: net.HardwareAddr{0x52, 0x54, 0x00, 0x12, 0x34, 0x57}}, + }, + } + status := types.DomainStatus{} + diskStatusList := []types.DiskStatus{} + aa := &types.AssignableAdapters{} + file, err := os.CreateTemp("", "testfile") + assert.NoError(t, err) + defer os.Remove(file.Name()) + + // Set up kubevirtContext + ctx := kubevirtContext{ + devicemodel: "virt", + vmiList: make(map[string]*vmiMetaData), + prevDomainMetric: make(map[string]types.DomainMetric), + nodeNameMap: map[string]string{"nodename": "test-node"}, + } + + // Mock the /run/.kube/k3s/k3s.yaml file + kubeConfigPath := "/run/.kube/k3s/k3s.yaml" + err = os.MkdirAll("/run/.kube/k3s", 0755) + assert.NoError(t, err) + mockFile, err := os.Create(kubeConfigPath) + assert.NoError(t, err) + defer os.Remove(kubeConfigPath) + defer mockFile.Close() + + _, err = mockFile.WriteString(k3sPem) + assert.NoError(t, err) + + // Call the function to create the replicaSet of pod configure + err = ctx.CreateReplicaPodConfig(domainName, config, status, diskStatusList, aa, file) + assert.NoError(t, err) + + // Additional checks and assertions similar to the Start function + nodeName, ok := ctx.nodeNameMap["nodename"] + assert.True(t, ok, "Failed to get nodeName from map") + + err = getConfig(&ctx) + assert.NoError(t, err) + kubeconfig := ctx.kubeConfig + assert.NotNil(t, kubeconfig, "kubeConfig should not be nil") + + // Check the Pod ReplicaSet + logrus.Infof("Checking Kubevirt domain %s, nodename %s", domainName, nodeName) + vmis, ok := ctx.vmiList[domainName] + assert.True(t, ok, "check domain %s failed to get vmlist", domainName) + + assert.True(t, vmis.mtype == IsMetaReplicaPod, "check domain %s failed to get type", domainName) + + replicaPod := vmis.repPod + assert.NotNil(t, replicaPod, "replicaPod should not be nil") + + assert.Equal(t, replicaPod.ObjectMeta.Name, vmis.name, "replicaPod name %s is different from name %s", + replicaPod.ObjectMeta.Name, vmis.name) +} diff --git a/pkg/pillar/kubeapi/kubeapi.go b/pkg/pillar/kubeapi/kubeapi.go index fecfb10031..3fea0de1c3 100644 --- a/pkg/pillar/kubeapi/kubeapi.go +++ b/pkg/pillar/kubeapi/kubeapi.go @@ -282,8 +282,8 @@ func waitForNodeReady(client *kubernetes.Clientset, readyCh chan bool, devUUID s if err != nil { return err } - if len(pods.Items) < 6 { - return fmt.Errorf("kubevirt running pods less than 6") + if len(pods.Items) < 4 { // need at least 4 pods to be running + return fmt.Errorf("kubevirt running pods less than 4") } err = waitForLonghornReady(client, hostname) diff --git a/pkg/pillar/types/clustertypes.go b/pkg/pillar/types/clustertypes.go index f3a00af8e7..31794adc19 100644 --- a/pkg/pillar/types/clustertypes.go +++ b/pkg/pillar/types/clustertypes.go @@ -49,6 +49,8 @@ type ENClusterAppStatus struct { ScheduledOnThisNode bool // App is running on this device StatusRunning bool // Status of the app in "Running" state IsVolumeDetached bool // Are volumes detached after failover ? + AppIsVMI bool // Is this a VMI app, vs a Pod app + AppKubeName string // Kube name of the app, either VMI or Pod } // Key - returns the key for the config of EdgeNodeClusterConfig diff --git a/pkg/pillar/types/domainmgrtypes.go b/pkg/pillar/types/domainmgrtypes.go index cdcb5c3d9f..a4f0dfb146 100644 --- a/pkg/pillar/types/domainmgrtypes.go +++ b/pkg/pillar/types/domainmgrtypes.go @@ -327,6 +327,11 @@ type DomainStatus struct { FmlCustomResolution string // if this node is the DNiD of the App IsDNidNode bool + // handle DomainConfig Delete + // for kubevirt EVE, App is configured into the kubernetes database, + // there is no need to delete the domain if the status check fails. + // But this flag is used to handle if the domain config is deleted. + DomainConfigDeleted bool // the device name is used for kube node name // Need to pass in from domainmgr to hypervisor context commands NodeName string diff --git a/pkg/pillar/types/zedmanagertypes.go b/pkg/pillar/types/zedmanagertypes.go index fb46b5aacb..5901f2a5ed 100644 --- a/pkg/pillar/types/zedmanagertypes.go +++ b/pkg/pillar/types/zedmanagertypes.go @@ -144,8 +144,7 @@ type AppInstanceConfig struct { // allow AppInstance to discover other AppInstances attached to its network instances AllowToDiscover bool - // XXX Cluster Designated Node Id - // temp, this will be changed to bool in later PR + // Cluster Designated Node Id DesignatedNodeID uuid.UUID }