Skip to content

Commit

Permalink
Merge pull request #501 from jwsui/remove-mediator
Browse files Browse the repository at this point in the history
Remove Subnet/SubnetPort service from mediator
  • Loading branch information
jwsui authored Feb 1, 2024
2 parents 0764a93 + e79139f commit e5cdcd6
Show file tree
Hide file tree
Showing 16 changed files with 217 additions and 318 deletions.
18 changes: 9 additions & 9 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/vmware-tanzu/nsx-operator/pkg/apis/v1alpha1"
"github.com/vmware-tanzu/nsx-operator/pkg/apis/v1alpha2"
"github.com/vmware-tanzu/nsx-operator/pkg/config"
commonctl "github.com/vmware-tanzu/nsx-operator/pkg/controllers/common"
ippool2 "github.com/vmware-tanzu/nsx-operator/pkg/controllers/ippool"
namespacecontroller "github.com/vmware-tanzu/nsx-operator/pkg/controllers/namespace"
networkpolicycontroller "github.com/vmware-tanzu/nsx-operator/pkg/controllers/networkpolicy"
Expand All @@ -41,6 +40,7 @@ import (
"github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/nsxserviceaccount"
"github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/staticroute"
subnetservice "github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/subnet"
subnetportservice "github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/subnetport"
"github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/vpc"
)

Expand Down Expand Up @@ -118,10 +118,7 @@ func StartVPCController(mgr ctrl.Manager, vpcService *vpc.VPCService) {
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}

vpcReconciler.Service = vpcService
commonctl.ServiceMediator.VPCService = vpcService

if err := vpcReconciler.Start(mgr); err != nil {
log.Error(err, "failed to create vpc controller", "controller", "VPC")
os.Exit(1)
Expand Down Expand Up @@ -188,9 +185,12 @@ func main() {
ipPoolService, err := ippool.InitializeIPPool(commonService, vpcService)
if err != nil {
log.Error(err, "failed to initialize ippool commonService", "controller", "IPPool")
}
subnetPortService, err := subnetportservice.InitializeSubnetPort(commonService)
if err != nil {
log.Error(err, "failed to initialize subnetport commonService", "controller", "SubnetPort")
os.Exit(1)
}
commonctl.ServiceMediator.VPCService = vpcService
nodeService, err := nodeservice.InitializeNode(commonService)
if err != nil {
log.Error(err, "failed to initialize node commonService", "controller", "Node")
Expand All @@ -205,17 +205,17 @@ func main() {
StartVPCController(mgr, vpcService)
StartNamespaceController(mgr, cf, vpcService)
// Start subnet/subnetset controller.
if err := subnet.StartSubnetController(mgr, subnetService); err != nil {
if err := subnet.StartSubnetController(mgr, subnetService, subnetPortService, vpcService); err != nil {
os.Exit(1)
}
if err := subnetset.StartSubnetSetController(mgr, subnetService); err != nil {
if err := subnetset.StartSubnetSetController(mgr, subnetService, subnetPortService, vpcService); err != nil {
os.Exit(1)
}

node.StartNodeController(mgr, nodeService)
staticroutecontroller.StartStaticRouteController(mgr, staticRouteService)
subnetport.StartSubnetPortController(mgr, commonService)
pod.StartPodController(mgr, commonService, nodeService)
subnetport.StartSubnetPortController(mgr, subnetPortService, subnetService, vpcService)
pod.StartPodController(mgr, subnetPortService, subnetService, vpcService, nodeService)
StartIPPoolController(mgr, ipPoolService, vpcService)
networkpolicycontroller.StartNetworkPolicyController(mgr, commonService, vpcService)
}
Expand Down
2 changes: 0 additions & 2 deletions pkg/clean/clean.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"k8s.io/client-go/util/retry"

"github.com/vmware-tanzu/nsx-operator/pkg/config"
commonctl "github.com/vmware-tanzu/nsx-operator/pkg/controllers/common"
"github.com/vmware-tanzu/nsx-operator/pkg/logger"
"github.com/vmware-tanzu/nsx-operator/pkg/nsx"
"github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/common"
Expand Down Expand Up @@ -82,7 +81,6 @@ func InitializeCleanupService(cf *config.NSXOperatorConfig) (*CleanupService, er
NSXConfig: cf,
}
vpcService, vpcErr := vpc.InitializeVPC(commonService)
commonctl.ServiceMediator.VPCService = vpcService

// initialize all the CR services
// Use Fluent Interface to escape error check hell
Expand Down
4 changes: 0 additions & 4 deletions pkg/controllers/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"time"

ctrl "sigs.k8s.io/controller-runtime"

"github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/mediator"
)

const (
Expand Down Expand Up @@ -33,6 +31,4 @@ var (
ResultRequeueAfter10sec = ctrl.Result{Requeue: true, RequeueAfter: 10 * time.Second}
// for unstable event, eg: failed to k8s resources when reconciling, may due to k8s unstable
ResultRequeueAfter5mins = ctrl.Result{Requeue: true, RequeueAfter: 5 * time.Minute}

ServiceMediator = mediator.ServiceMediator{}
)
31 changes: 25 additions & 6 deletions pkg/controllers/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,42 @@ import (
"github.com/vmware-tanzu/nsx-operator/pkg/apis/v1alpha1"
"github.com/vmware-tanzu/nsx-operator/pkg/logger"
servicecommon "github.com/vmware-tanzu/nsx-operator/pkg/nsx/services/common"
"github.com/vmware-tanzu/nsx-operator/pkg/util"
)

var (
log = logger.Log
lock = &sync.Mutex{}
)

func AllocateSubnetFromSubnetSet(subnetSet *v1alpha1.SubnetSet) (string, error) {
func AllocateSubnetFromSubnetSet(subnetSet *v1alpha1.SubnetSet, vpcService servicecommon.VPCServiceProvider, subnetService servicecommon.SubnetServiceProvider, subnetPortService servicecommon.SubnetPortServiceProvider) (string, error) {
// TODO: For now, this is a global lock. In the future, we need to narrow its scope down to improve the performance.
lock.Lock()
defer lock.Unlock()
subnetPath, err := ServiceMediator.GetAvailableSubnet(subnetSet)
if err != nil {
subnetList := subnetService.GetSubnetsByIndex(servicecommon.TagScopeSubnetSetCRUID, string(subnetSet.GetUID()))
for _, nsxSubnet := range subnetList {
portNums := len(subnetPortService.GetPortsOfSubnet(*nsxSubnet.Id))
totalIP := int(*nsxSubnet.Ipv4SubnetSize)
if len(nsxSubnet.IpAddresses) > 0 {
// totalIP will be overrided if IpAddresses are specified.
totalIP, _ = util.CalculateIPFromCIDRs(nsxSubnet.IpAddresses)
}
if portNums < totalIP-3 {
return *nsxSubnet.Path, nil
}
}
tags := subnetService.GenerateSubnetNSTags(subnetSet, subnetSet.Namespace)
if tags == nil {
return "", errors.New("failed to generate subnet tags")
}
log.Info("the existing subnets are not available, creating new subnet", "subnetList", subnetList, "subnetSet.Name", subnetSet.Name, "subnetSet.Namespace", subnetSet.Namespace)
vpcInfoList := vpcService.ListVPCInfo(subnetSet.Namespace)
if len(vpcInfoList) == 0 {
err := errors.New("no VPC found")
log.Error(err, "failed to allocate Subnet")
return "", err
}
return subnetPath, nil
return subnetService.CreateOrUpdateSubnet(subnetSet, vpcInfoList[0], tags)
}

func getSharedNamespaceAndVpcForNamespace(client k8sclient.Client, ctx context.Context, namespaceName string) (string, string, error) {
Expand Down Expand Up @@ -64,14 +83,14 @@ func GetDefaultSubnetSet(client k8sclient.Client, ctx context.Context, namespace
log.Info("namespace doesn't have shared VPC, searching the default subnetset in the current namespace", "namespace", namespace)
targetNamespace = namespace
}
subnetSet, err := getDefaultSubnetSetByNamespace(client, ctx, targetNamespace, resourceType)
subnetSet, err := getDefaultSubnetSetByNamespace(client, targetNamespace, resourceType)
if err != nil {
return nil, err
}
return subnetSet, err
}

func getDefaultSubnetSetByNamespace(client k8sclient.Client, ctx context.Context, namespace string, resourceType string) (*v1alpha1.SubnetSet, error) {
func getDefaultSubnetSetByNamespace(client k8sclient.Client, namespace string, resourceType string) (*v1alpha1.SubnetSet, error) {
subnetSetList := &v1alpha1.SubnetSetList{}
subnetSetSelector := &metav1.LabelSelector{
MatchLabels: map[string]string{
Expand Down
56 changes: 27 additions & 29 deletions pkg/controllers/pod/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,19 @@ var (
// PodReconciler reconciles a Pod object
type PodReconciler struct {
client.Client
Scheme *apimachineryruntime.Scheme
Service *subnetport.SubnetPortService
Scheme *apimachineryruntime.Scheme

SubnetPortService *subnetport.SubnetPortService
SubnetService servicecommon.SubnetServiceProvider
VPCService servicecommon.VPCServiceProvider
NodeServiceReader servicecommon.NodeServiceReader
}

func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
pod := &v1.Pod{}
log.Info("reconciling pod", "pod", req.NamespacedName)

metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerSyncTotal, MetricResTypePod)
metrics.CounterInc(r.SubnetPortService.NSXConfig, metrics.ControllerSyncTotal, MetricResTypePod)

if err := r.Client.Get(ctx, req.NamespacedName, pod); err != nil {
log.Error(err, "unable to fetch pod", "req", req.NamespacedName)
Expand All @@ -64,7 +67,7 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
}

if pod.ObjectMeta.DeletionTimestamp.IsZero() {
metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerUpdateTotal, MetricResTypePod)
metrics.CounterInc(r.SubnetPortService.NSXConfig, metrics.ControllerUpdateTotal, MetricResTypePod)
if !controllerutil.ContainsFinalizer(pod, servicecommon.PodFinalizerName) {
controllerutil.AddFinalizer(pod, servicecommon.PodFinalizerName)
if err := r.Client.Update(ctx, pod); err != nil {
Expand All @@ -88,7 +91,7 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
return common.ResultRequeue, err
}
contextID := *node.Id
nsxSubnetPortState, err := r.Service.CreateOrUpdateSubnetPort(pod, nsxSubnetPath, contextID, &pod.ObjectMeta.Labels)
nsxSubnetPortState, err := r.SubnetPortService.CreateOrUpdateSubnetPort(pod, nsxSubnetPath, contextID, &pod.ObjectMeta.Labels)
if err != nil {
log.Error(err, "failed to create or update NSX subnet port, would retry exponentially", "pod.Name", req.NamespacedName, "pod.UID", pod.UID)
updateFail(r, &ctx, pod, &err)
Expand All @@ -106,8 +109,8 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
updateSuccess(r, &ctx, pod)
} else {
if controllerutil.ContainsFinalizer(pod, servicecommon.PodFinalizerName) {
metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerDeleteTotal, MetricResTypePod)
if err := r.Service.DeleteSubnetPort(pod.UID); err != nil {
metrics.CounterInc(r.SubnetPortService.NSXConfig, metrics.ControllerDeleteTotal, MetricResTypePod)
if err := r.SubnetPortService.DeleteSubnetPort(pod.UID); err != nil {
log.Error(err, "deletion failed, would retry exponentially", "pod", req.NamespacedName)
deleteFail(r, &ctx, pod, &err)
return common.ResultRequeue, err
Expand Down Expand Up @@ -161,20 +164,15 @@ func (r *PodReconciler) SetupWithManager(mgr ctrl.Manager) error {
Complete(r)
}

func StartPodController(mgr ctrl.Manager, commonService servicecommon.Service, nodeServiceReader servicecommon.NodeServiceReader) {
func StartPodController(mgr ctrl.Manager, subnetPortService *subnetport.SubnetPortService, subnetService servicecommon.SubnetServiceProvider, vpcService servicecommon.VPCServiceProvider, nodeService servicecommon.NodeServiceReader) {
podPortReconciler := PodReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
NodeServiceReader: nodeServiceReader,
SubnetService: subnetService,
SubnetPortService: subnetPortService,
VPCService: vpcService,
NodeServiceReader: nodeService,
}
subnetPortService, err := subnetport.InitializeSubnetPort(commonService)
if err != nil {
log.Error(err, "failed to initialize subnetport commonService", "controller", "Pod")
os.Exit(1)
}
podPortReconciler.Service = subnetPortService
common.ServiceMediator.SubnetPortService = podPortReconciler.Service

if err := podPortReconciler.Start(mgr); err != nil {
log.Error(err, "failed to create controller", "controller", "Pod")
os.Exit(1)
Expand Down Expand Up @@ -202,7 +200,7 @@ func (r *PodReconciler) GarbageCollector(cancel chan bool, timeout time.Duration
return
case <-time.After(timeout):
}
nsxSubnetPortSet := r.Service.ListNSXSubnetPortIDForPod()
nsxSubnetPortSet := r.SubnetPortService.ListNSXSubnetPortIDForPod()
if len(nsxSubnetPortSet) == 0 {
continue
}
Expand All @@ -223,45 +221,45 @@ func (r *PodReconciler) GarbageCollector(cancel chan bool, timeout time.Duration
continue
}
log.V(1).Info("GC collected Pod", "UID", elem)
metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerDeleteTotal, MetricResTypePod)
err = r.Service.DeleteSubnetPort(types.UID(elem))
metrics.CounterInc(r.SubnetPortService.NSXConfig, metrics.ControllerDeleteTotal, MetricResTypePod)
err = r.SubnetPortService.DeleteSubnetPort(types.UID(elem))
if err != nil {
metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerDeleteFailTotal, MetricResTypePod)
metrics.CounterInc(r.SubnetPortService.NSXConfig, metrics.ControllerDeleteFailTotal, MetricResTypePod)
} else {
metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerDeleteSuccessTotal, MetricResTypePod)
metrics.CounterInc(r.SubnetPortService.NSXConfig, metrics.ControllerDeleteSuccessTotal, MetricResTypePod)
}
}
}
}

func updateFail(r *PodReconciler, c *context.Context, o *v1.Pod, e *error) {
metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerUpdateFailTotal, MetricResTypePod)
metrics.CounterInc(r.SubnetPortService.NSXConfig, metrics.ControllerUpdateFailTotal, MetricResTypePod)
}

func deleteFail(r *PodReconciler, c *context.Context, o *v1.Pod, e *error) {
metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerDeleteFailTotal, MetricResTypePod)
metrics.CounterInc(r.SubnetPortService.NSXConfig, metrics.ControllerDeleteFailTotal, MetricResTypePod)
}

func updateSuccess(r *PodReconciler, c *context.Context, o *v1.Pod) {
metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerUpdateSuccessTotal, MetricResTypePod)
metrics.CounterInc(r.SubnetPortService.NSXConfig, metrics.ControllerUpdateSuccessTotal, MetricResTypePod)
}

func deleteSuccess(r *PodReconciler, _ *context.Context, _ *v1.Pod) {
metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerDeleteSuccessTotal, MetricResTypePod)
metrics.CounterInc(r.SubnetPortService.NSXConfig, metrics.ControllerDeleteSuccessTotal, MetricResTypePod)
}

func (r *PodReconciler) GetSubnetPathForPod(ctx context.Context, pod *v1.Pod) (string, error) {
subnetPath := r.Service.GetSubnetPathForSubnetPortFromStore(string(pod.UID))
subnetPath := r.SubnetPortService.GetSubnetPathForSubnetPortFromStore(string(pod.UID))
if len(subnetPath) > 0 {
log.V(1).Info("NSX subnet port had been created, returning the existing NSX subnet path", "pod.UID", pod.UID, "subnetPath", subnetPath)
return subnetPath, nil
}
subnetSet, err := common.GetDefaultSubnetSet(r.Service.Client, ctx, pod.Namespace, servicecommon.LabelDefaultPodSubnetSet)
subnetSet, err := common.GetDefaultSubnetSet(r.SubnetPortService.Client, ctx, pod.Namespace, servicecommon.LabelDefaultPodSubnetSet)
if err != nil {
return "", err
}
log.Info("got default subnetset for pod, allocating the NSX subnet", "subnetSet.Name", subnetSet.Name, "subnetSet.UID", subnetSet.UID, "pod.Name", pod.Name, "pod.UID", pod.UID)
subnetPath, err = common.AllocateSubnetFromSubnetSet(subnetSet)
subnetPath, err = common.AllocateSubnetFromSubnetSet(subnetSet, r.VPCService, r.SubnetService, r.SubnetPortService)
if err != nil {
return subnetPath, err
}
Expand Down
Loading

0 comments on commit e5cdcd6

Please sign in to comment.