Skip to content

Commit

Permalink
1. patch the pod ip after allocated the eip
Browse files Browse the repository at this point in the history
2. add the flag --patch-podip which deside wheather patch podIP to kube-apiserver
  • Loading branch information
zhiguang.jzg committed Dec 6, 2019
1 parent 3a8cd9a commit b4392ce
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 6 deletions.
17 changes: 14 additions & 3 deletions daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type networkService struct {
//networkResourceMgr ResourceManager
mgrForResource map[string]ResourceManager
pendingPods map[string]interface{}
patchPodIP bool
pendingPodsLock sync.RWMutex
sync.RWMutex
}
Expand Down Expand Up @@ -198,6 +199,7 @@ func (networkService *networkService) AllocIP(grpcContext context.Context, r *rp
}

// 3. Allocate network resource for pod
var podIP string
switch podinfo.PodNetworkType {
case podNetworkTypeENIMultiIP:
var eniMultiIP *types.ENIIP
Expand All @@ -219,7 +221,7 @@ func (networkService *networkService) AllocIP(grpcContext context.Context, r *rp
if err != nil {
return nil, errors.Wrapf(err, "error put resource into store")
}

podIP = eniMultiIP.SecAddress.String()
allocIPReply.IPType = rpc.IPType_TypeENIMultiIP
allocIPReply.Success = true
allocIPReply.NetworkInfo = &rpc.AllocIPReply_ENIMultiIP{
Expand Down Expand Up @@ -259,6 +261,7 @@ func (networkService *networkService) AllocIP(grpcContext context.Context, r *rp
if err != nil {
return nil, errors.Wrapf(err, "error put resource into store")
}
podIP = vpcEni.Address.IP.String()
allocIPReply.IPType = rpc.IPType_TypeVPCENI
allocIPReply.Success = true
allocIPReply.NetworkInfo = &rpc.AllocIPReply_VpcEni{
Expand Down Expand Up @@ -320,7 +323,14 @@ func (networkService *networkService) AllocIP(grpcContext context.Context, r *rp
return nil, errors.Wrapf(err, "error on grpc connection")
}

// 4. return allocate result
// 4. patch podIP to kube-apiserver
if allocIPReply.Success && podIP != "" && networkService.patchPodIP {
if _, err := networkService.k8s.PatchAllocatedPodIP(podinfo, podIP); err != nil {
return nil, errors.Wrapf(err, fmt.Sprintf("failed to patch podIP to pod %s.", podInfoKey(podinfo.Namespace, podinfo.Name)))
}
}

// 5. return allocate result
return allocIPReply, err
}

Expand Down Expand Up @@ -540,9 +550,10 @@ func (networkService *networkService) startGarbageCollectionLoop() {
}()
}

func newNetworkService(configFilePath, kubeconfig, master, daemonMode string) (rpc.TerwayBackendServer, error) {
func newNetworkService(configFilePath, kubeconfig, master, daemonMode string, patchPodIP bool) (rpc.TerwayBackendServer, error) {
log.Debugf("start network service with: %s, %s", configFilePath, daemonMode)
netSrv := &networkService{
patchPodIP: patchPodIP,
pendingPods: map[string]interface{}{},
pendingPodsLock: sync.RWMutex{},
}
Expand Down
16 changes: 16 additions & 0 deletions daemon/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type Kubernetes interface {
GetServiceCidr() *net.IPNet
GetNodeCidr() *net.IPNet
SetNodeAllocatablePod(count int) error
PatchAllocatedPodIP(podInfo *podInfo, podIP string) (*corev1.Pod, error)
}

type k8s struct {
Expand Down Expand Up @@ -371,6 +372,21 @@ func (k *k8s) GetLocalPods() ([]*podInfo, error) {

return ret, nil
}

func (k *k8s) PatchAllocatedPodIP(podInfo *podInfo, podIP string) (*corev1.Pod, error) {
pod, err := k.client.CoreV1().Pods(podInfo.Namespace).Get(podInfo.Name, metav1.GetOptions{
ResourceVersion: "0",
})
if err != nil || pod == nil {
return nil, err
}
if pod.Status.PodIP != "" {
return nil, err
}
pod.Status.PodIP = podIP
return k.client.CoreV1().Pods(podInfo.Namespace).UpdateStatus(pod)
}

func (k *k8s) GetServiceCidr() *net.IPNet {
return k.svcCidr
}
Expand Down
4 changes: 2 additions & 2 deletions daemon/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func stackTriger() {
}

// Run terway daemon
func Run(pidFilePath, socketFilePath, debugSocketListen, configFilePath, kubeconfig, master, daemonMode, logLevel string) error {
func Run(pidFilePath, socketFilePath, debugSocketListen, configFilePath, kubeconfig, master, daemonMode, logLevel string, patchPodIP bool) error {
level, err := log.ParseLevel(logLevel)
if err != nil {
return errors.Wrapf(err, "error set log level: %s", logLevel)
Expand Down Expand Up @@ -84,7 +84,7 @@ func Run(pidFilePath, socketFilePath, debugSocketListen, configFilePath, kubecon
return fmt.Errorf("error listen at %s: %v", socketFilePath, err)
}

networkService, err := newNetworkService(configFilePath, kubeconfig, master, daemonMode)
networkService, err := newNetworkService(configFilePath, kubeconfig, master, daemonMode, patchPodIP)
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ var (
readonlyListen string
kubeconfig string
master string
patchPodIP bool
)

func init() {
Expand All @@ -27,13 +28,14 @@ func init() {
flag.StringVar(&readonlyListen, "readonly-listen", debugSocketPath, "terway readonly listen")
flag.StringVar(&master, "master", "", "The address of the Kubernetes API server (overrides any value in kubeconfig).")
flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to kubeconfig file with authorization and master location information.")
flag.BoolVar(&patchPodIP, "patch-podip", false, "Whether patch the podIP to kube-apiserver after setting up the pod network.")

}

func main() {
flag.Parse()
log.Infof("Starting terway of version: %s", gitVer)
if err := daemon.Run(defaultPidPath, defaultSocketPath, readonlyListen, defaultConfigPath, kubeconfig, master, daemonMode, logLevel); err != nil {
if err := daemon.Run(defaultPidPath, defaultSocketPath, readonlyListen, defaultConfigPath, kubeconfig, master, daemonMode, logLevel, patchPodIP); err != nil {
log.Fatal(err)
}
}

0 comments on commit b4392ce

Please sign in to comment.