Skip to content

Commit

Permalink
Merge pull request #39 from cheney-lin/dev/get_pod_by_uid
Browse files Browse the repository at this point in the history
feat(metaserver): support get pod by uid
  • Loading branch information
waynepeking348 authored Apr 27, 2023
2 parents 4960db0 + 6113f3d commit 240d1dc
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 7 deletions.
6 changes: 6 additions & 0 deletions pkg/metaserver/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/fake"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"

Expand Down Expand Up @@ -68,6 +69,7 @@ func constructPodFetcher(names []string) pod.PodFetcher {
pods = append(pods, &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
UID: types.UID(name + "-uid"),
},
})
}
Expand All @@ -94,6 +96,10 @@ func TestFetcher(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 2, len(podObjList))

pod1, err := agent.GetPod(ctx, "test-pod-1-uid")
assert.NoError(t, err)
assert.Equal(t, "test-pod-1", pod1.Name)

nodeObj, err := agent.GetNode(ctx)
assert.NoError(t, err)
assert.Equal(t, "test-node-1", nodeObj.Name)
Expand Down
15 changes: 14 additions & 1 deletion pkg/metaserver/agent/pod/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,13 @@ type PodFetcher interface {
// GetContainerID & GetContainerSpec are used to parse running container info
GetContainerID(podUID, containerName string) (string, error)
GetContainerSpec(podUID, containerName string) (*v1.Container, error)
// GetPod returns Pod by UID
GetPod(ctx context.Context, podUID string) (*v1.Pod, error)
}

type podFetcherImpl struct {
kubeletPodFetcher KubeletPodFetcher
runtimePodFetcher *RuntimePodFetcher
runtimePodFetcher RuntimePodFetcher

kubeletPodsCache map[string]*v1.Pod
kubeletPodsCacheLock sync.RWMutex
Expand Down Expand Up @@ -197,6 +199,17 @@ func (w *podFetcherImpl) GetPodList(ctx context.Context, podFilter func(*v1.Pod)
return res, nil
}

func (w *podFetcherImpl) GetPod(ctx context.Context, podUID string) (*v1.Pod, error) {
kubeletPodsCache, err := w.getKubeletPodsCache(ctx)
if err != nil {
return nil, fmt.Errorf("getKubeletPodsCache failed with error: %v", err)
}
if pod, ok := kubeletPodsCache[podUID]; ok {
return pod, nil
}
return nil, fmt.Errorf("failed to find pod by uid %v", podUID)
}

func (w *podFetcherImpl) getKubeletPodsCache(ctx context.Context) (map[string]*v1.Pod, error) {
// if current kubelet pod cache is nil or enforce bypass, we sync cache first
w.kubeletPodsCacheLock.RLock()
Expand Down
11 changes: 11 additions & 0 deletions pkg/metaserver/agent/pod/pod_stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,17 @@ func (p *PodFetcherStub) GetPodList(_ context.Context, podFilter func(*v1.Pod) b
return pods, nil
}

func (p *PodFetcherStub) GetPod(_ context.Context, podUID string) (*v1.Pod, error) {
p.mutex.Lock()
defer p.mutex.Unlock()
for _, pod := range p.PodList {
if string(pod.UID) == podUID {
return pod, nil
}
}
return nil, fmt.Errorf("failed to find pod by uid %v", podUID)
}

func (p *PodFetcherStub) Run(_ context.Context) {}

func (p *PodFetcherStub) GetContainerID(podUID, containerName string) (string, error) {
Expand Down
16 changes: 10 additions & 6 deletions pkg/metaserver/agent/pod/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,23 +55,27 @@ type labeledContainerInfo struct {
PodUID kubetypes.UID
}

type RuntimePodFetcher struct {
type RuntimePodFetcher interface {
GetPods(all bool) ([]*RuntimePod, error)
}

type runtimePodFetcherImpl struct {
runtimeService cri.RuntimeService
}

func NewRuntimePodFetcher(conf *config.Configuration) (*RuntimePodFetcher, error) {
func NewRuntimePodFetcher(conf *config.Configuration) (RuntimePodFetcher, error) {
runtimeService, err := remote.NewRemoteRuntimeService(conf.RemoteRuntimeEndpoint, 2*time.Minute)
if err != nil {
return nil, fmt.Errorf("create remote runtime service failed %s", err)
}

return &RuntimePodFetcher{runtimeService: runtimeService}, nil
return &runtimePodFetcherImpl{runtimeService: runtimeService}, nil
}

// GetPods returns a list of containers grouped by pods. The boolean parameter
// specifies whether the runtime returns all containers including those already
// exited and dead containers (used for garbage collection).
func (r *RuntimePodFetcher) GetPods(all bool) ([]*RuntimePod, error) {
func (r *runtimePodFetcherImpl) GetPods(all bool) ([]*RuntimePod, error) {
pods := make(map[kubetypes.UID]*RuntimePod)
sandboxes, err := r.getKubeletSandboxes(all)
if err != nil {
Expand Down Expand Up @@ -129,7 +133,7 @@ func (r *RuntimePodFetcher) GetPods(all bool) ([]*RuntimePod, error) {
}

// getKubeletSandboxes lists all (or just the running) sandboxes managed by kubelet.
func (r *RuntimePodFetcher) getKubeletSandboxes(all bool) ([]*runtimeapi.PodSandbox, error) {
func (r *runtimePodFetcherImpl) getKubeletSandboxes(all bool) ([]*runtimeapi.PodSandbox, error) {
var filter *runtimeapi.PodSandboxFilter
if !all {
readyState := runtimeapi.PodSandboxState_SANDBOX_READY
Expand All @@ -152,7 +156,7 @@ func (r *RuntimePodFetcher) getKubeletSandboxes(all bool) ([]*runtimeapi.PodSand
// getKubeletContainers lists containers managed by kubelet.
// The boolean parameter specifies whether returns all containers including
// those already exited and dead containers (used for garbage collection).
func (r *RuntimePodFetcher) getKubeletContainers(allContainers bool) ([]*runtimeapi.Container, error) {
func (r *runtimePodFetcherImpl) getKubeletContainers(allContainers bool) ([]*runtimeapi.Container, error) {
filter := &runtimeapi.ContainerFilter{}
if !allContainers {
filter.State = &runtimeapi.ContainerStateValue{
Expand Down

0 comments on commit 240d1dc

Please sign in to comment.