Skip to content

Commit

Permalink
Using ListNode to get nodes entering and leaving the cluster.
Browse files Browse the repository at this point in the history
* Remove route controller again

* Print out clusterName for new nodes

* tag new nodes when it comes online

* only process a node once

* check taggNodes size

* add debugging

* use node name as key

* delete k,v from taggedNodes if node no longer exists

* log if delete is done

* Get a list of nodes and tag them if havent

* get instance IDs of the nodes that need tagging

* use MapToAWSInstanceID instead

* restored v1 aws

* restored from master

* tag instance with a random tag

* add klog

* tag and untag node resources

* Prepare for pr

* Initialize nodeMap
  • Loading branch information
nguyenkndinh authored and saurav-agarwalla committed May 11, 2022
1 parent 08f6bd6 commit e19546c
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 8 deletions.
1 change: 1 addition & 0 deletions cmd/aws-cloud-controller-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"k8s.io/component-base/logs"
_ "k8s.io/component-base/metrics/prometheus/clientgo" // for client metric registration
_ "k8s.io/component-base/metrics/prometheus/version" // for version metric registration
"k8s.io/klog/v2"

cloudcontrollerconfig "k8s.io/cloud-provider/app/config"

Expand Down
26 changes: 24 additions & 2 deletions pkg/controllers/aws_controller_manager.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controllers

import (
Expand All @@ -8,6 +21,7 @@ import (
cloudcontrollerconfig "k8s.io/cloud-provider/app/config"
genericcontrollermanager "k8s.io/controller-manager/app"
"k8s.io/controller-manager/controller"
"k8s.io/klog/v2"
)

const (
Expand All @@ -29,6 +43,9 @@ func BuildControllerInitializers() map[string]app.ControllerInitFuncConstructor

controllerInitializers[TaggingControllerKey] = taggingControllerConstructor

// TODO: remove the following line to enable the route controller
delete(controllerInitializers, "route")

return controllerInitializers
}

Expand All @@ -41,13 +58,18 @@ func startTaggingControllerWrapper(initContext app.ControllerInitContext, comple

func startTaggingController(ctx context.Context, initContext app.ControllerInitContext, completedConfig *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface) (controller.Interface, bool, error) {
// Start the TaggingController
taggingcontroller, err := taggingcontroller.NewTaggingController()
taggingcontroller, err := taggingcontroller.NewTaggingController(
completedConfig.SharedInformers.Core().V1().Nodes(),
// cloud node lifecycle controller uses existing cluster role from node-controller
completedConfig.ClientBuilder.ClientOrDie(initContext.ClientName),
cloud,
completedConfig.ComponentConfig.KubeCloudShared.NodeMonitorPeriod.Duration)
if err != nil {
klog.Warningf("failed to start tagging controller: %s", err)
return nil, false, nil
}

go taggingcontroller.Run(ctx.Done())
go taggingcontroller.Run(ctx)

return nil, true, nil
}
143 changes: 137 additions & 6 deletions pkg/controllers/tagging/tagging_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,158 @@ limitations under the License.
package tagging

import (
"context"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
v1lister "k8s.io/client-go/listers/core/v1"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/klog/v2"
"time"
)

// TaggingController is the controller implementation for tagging cluster resources
// TaggingController is the controller implementation for tagging cluster resources.
// It periodically check for Node events (creating/deleting) to apply appropriate
// tags to resources.
type TaggingController struct {
kubeClient clientset.Interface
nodeLister v1lister.NodeLister

cloud cloudprovider.Interface

// Value controlling TaggingController monitoring period, i.e. how often does TaggingController
// check node list. This value should be lower than nodeMonitorGracePeriod
// set in controller-manager
nodeMonitorPeriod time.Duration

// A map presenting the node and whether it currently exists
taggedNodes map[string]bool

// A map representing the nodes that were ever in the cluster
nodeMap map[string]*v1.Node
}

// NewTaggingController creates a NewTaggingController object
func NewTaggingController() (*TaggingController, error) {
func NewTaggingController(
nodeInformer coreinformers.NodeInformer,
kubeClient clientset.Interface,
cloud cloudprovider.Interface,
nodeMonitorPeriod time.Duration) (*TaggingController, error) {

tc := &TaggingController{
kubeClient: kubeClient,
nodeLister: nodeInformer.Lister(),
cloud: cloud,
nodeMonitorPeriod: nodeMonitorPeriod,
taggedNodes: make(map[string]bool),
nodeMap: make(map[string]*v1.Node),
}

return tc, nil
}

// Run will start the controller to tag resources attached to a cluster
// and untag resources detached from a cluster.
func (tc *TaggingController) Run(stopCh <-chan struct{}) {
func (tc *TaggingController) Run(ctx context.Context) {
defer utilruntime.HandleCrash()
klog.Infof("Running the TaggingController")

<-stopCh
}
wait.UntilWithContext(ctx, tc.monitorNodes, tc.nodeMonitorPeriod)
}

func (tc *TaggingController) monitorNodes(ctx context.Context) {
nodes, err := tc.nodeLister.List(labels.Everything())
if err != nil {
klog.Errorf("error listing nodes from cache: %s", err)
return
}

for k := range tc.taggedNodes {
tc.taggedNodes[k] = false
}

var nodesToTag []*v1.Node
for _, node := range nodes {
if _, ok := tc.taggedNodes[node.GetName()]; !ok {
nodesToTag = append(nodesToTag, node)
}

tc.nodeMap[node.GetName()] = node
tc.taggedNodes[node.GetName()] = true
}
tc.tagNodesResources(nodesToTag)

var nodesToUntag []*v1.Node
for nodeName, existed := range tc.taggedNodes {
if existed == false {
nodesToUntag = append(nodesToUntag, tc.nodeMap[nodeName])
}
}
tc.untagNodeResources(nodesToUntag)

tc.syncDeletedNodesToTaggedNodes()
}

// tagNodesResources tag node resources from a list of node
// If we want to tag more resources, modify this function appropriately
func (tc *TaggingController) tagNodesResources(nodes []*v1.Node) {
for _, node := range nodes {
klog.Infof("Tagging resources for node %s.", node.GetName())
}
}

func (tc *TaggingController) untagNodeResources(nodes []*v1.Node) {
for _, node := range nodes {
klog.Infof("Untagging resources for node %s.", node.GetName())
}
}

// syncDeletedNodes delete (k, v) from taggedNodes
// if it doesn't exist
func (tc *TaggingController) syncDeletedNodesToTaggedNodes() {
for k, v := range tc.taggedNodes {
if v == false {
delete(tc.taggedNodes, k)
}
}
}

//// tagEc2Instances applies the provided tags to each EC2 instances in
//// the cluster.
//func (tc *TaggingController) tagEc2Instances(nodes []*v1.Node) {
// var instanceIds []*string
// for _, node := range nodes {
// instanceId, _ := awsv1.KubernetesInstanceID(node.Spec.ProviderID).MapToAWSInstanceID()
// instanceIds = append(instanceIds, aws.String(string(instanceId)))
// }
//
// tc.tagResources(instanceIds)
//}

//func (tc *TaggingController) tagResources(resourceIds []*string) {
// request := &ec2.CreateTagsInput{
// Resources: resourceIds,
// Tags: tc.getTagsFromInputs(),
// }
//
// _, error := awsv1.awsSdkEC2.CreateTags(request)
// awsv1.Cloud.TagResoures(request)
//
// if error != nil {
// klog.Errorf("Error occurred trying to tag resources, %s", error)
// }
//}
//
//// Sample function demonstrating that we'll get the tag list from user
//func (tc *TaggingController) getTagsFromInputs() []*ec2.Tag {
// var awsTags []*ec2.Tag
// tag := &ec2.Tag{
// Key: aws.String("Sample Key"),
// Value: aws.String("Sample value"),
// }
// awsTags = append(awsTags, tag)
//
// return awsTags
//}

0 comments on commit e19546c

Please sign in to comment.