diff --git a/pkg/metaserver/agent/agent_test.go b/pkg/metaserver/agent/agent_test.go index 16ed9297f..28487b72f 100644 --- a/pkg/metaserver/agent/agent_test.go +++ b/pkg/metaserver/agent/agent_test.go @@ -25,6 +25,7 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/fake" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" @@ -68,6 +69,7 @@ func constructPodFetcher(names []string) pod.PodFetcher { pods = append(pods, &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: name, + UID: types.UID(name + "-uid"), }, }) } @@ -94,6 +96,10 @@ func TestFetcher(t *testing.T) { assert.NoError(t, err) assert.Equal(t, 2, len(podObjList)) + pod1, err := agent.GetPod(ctx, "test-pod-1-uid") + assert.NoError(t, err) + assert.Equal(t, "test-pod-1", pod1.Name) + nodeObj, err := agent.GetNode(ctx) assert.NoError(t, err) assert.Equal(t, "test-node-1", nodeObj.Name) diff --git a/pkg/metaserver/agent/pod/pod.go b/pkg/metaserver/agent/pod/pod.go index bd26f73ed..074ce7c81 100644 --- a/pkg/metaserver/agent/pod/pod.go +++ b/pkg/metaserver/agent/pod/pod.go @@ -57,11 +57,13 @@ type PodFetcher interface { // GetContainerID & GetContainerSpec are used to parse running container info GetContainerID(podUID, containerName string) (string, error) GetContainerSpec(podUID, containerName string) (*v1.Container, error) + // GetPod returns Pod by UID + GetPod(ctx context.Context, podUID string) (*v1.Pod, error) } type podFetcherImpl struct { kubeletPodFetcher KubeletPodFetcher - runtimePodFetcher *RuntimePodFetcher + runtimePodFetcher RuntimePodFetcher kubeletPodsCache map[string]*v1.Pod kubeletPodsCacheLock sync.RWMutex @@ -197,6 +199,17 @@ func (w *podFetcherImpl) GetPodList(ctx context.Context, podFilter func(*v1.Pod) return res, nil } +func (w *podFetcherImpl) GetPod(ctx context.Context, podUID string) (*v1.Pod, error) { + kubeletPodsCache, err := w.getKubeletPodsCache(ctx) + if err != nil { + return nil, fmt.Errorf("getKubeletPodsCache failed with error: %v", err) + } + if pod, ok := kubeletPodsCache[podUID]; ok { + return pod, nil + } + return nil, fmt.Errorf("failed to find pod by uid %v", podUID) +} + func (w *podFetcherImpl) getKubeletPodsCache(ctx context.Context) (map[string]*v1.Pod, error) { // if current kubelet pod cache is nil or enforce bypass, we sync cache first w.kubeletPodsCacheLock.RLock() diff --git a/pkg/metaserver/agent/pod/pod_stub.go b/pkg/metaserver/agent/pod/pod_stub.go index db9a4f8be..2fc2b7704 100644 --- a/pkg/metaserver/agent/pod/pod_stub.go +++ b/pkg/metaserver/agent/pod/pod_stub.go @@ -46,6 +46,17 @@ func (p *PodFetcherStub) GetPodList(_ context.Context, podFilter func(*v1.Pod) b return pods, nil } +func (p *PodFetcherStub) GetPod(_ context.Context, podUID string) (*v1.Pod, error) { + p.mutex.Lock() + defer p.mutex.Unlock() + for _, pod := range p.PodList { + if string(pod.UID) == podUID { + return pod, nil + } + } + return nil, fmt.Errorf("failed to find pod by uid %v", podUID) +} + func (p *PodFetcherStub) Run(_ context.Context) {} func (p *PodFetcherStub) GetContainerID(podUID, containerName string) (string, error) { diff --git a/pkg/metaserver/agent/pod/runtime.go b/pkg/metaserver/agent/pod/runtime.go index 19ffc835a..870501221 100644 --- a/pkg/metaserver/agent/pod/runtime.go +++ b/pkg/metaserver/agent/pod/runtime.go @@ -55,23 +55,27 @@ type labeledContainerInfo struct { PodUID kubetypes.UID } -type RuntimePodFetcher struct { +type RuntimePodFetcher interface { + GetPods(all bool) ([]*RuntimePod, error) +} + +type runtimePodFetcherImpl struct { runtimeService cri.RuntimeService } -func NewRuntimePodFetcher(conf *config.Configuration) (*RuntimePodFetcher, error) { +func NewRuntimePodFetcher(conf *config.Configuration) (RuntimePodFetcher, error) { runtimeService, err := remote.NewRemoteRuntimeService(conf.RemoteRuntimeEndpoint, 2*time.Minute) if err != nil { return nil, fmt.Errorf("create remote runtime service failed %s", err) } - return &RuntimePodFetcher{runtimeService: runtimeService}, nil + return &runtimePodFetcherImpl{runtimeService: runtimeService}, nil } // GetPods returns a list of containers grouped by pods. The boolean parameter // specifies whether the runtime returns all containers including those already // exited and dead containers (used for garbage collection). -func (r *RuntimePodFetcher) GetPods(all bool) ([]*RuntimePod, error) { +func (r *runtimePodFetcherImpl) GetPods(all bool) ([]*RuntimePod, error) { pods := make(map[kubetypes.UID]*RuntimePod) sandboxes, err := r.getKubeletSandboxes(all) if err != nil { @@ -129,7 +133,7 @@ func (r *RuntimePodFetcher) GetPods(all bool) ([]*RuntimePod, error) { } // getKubeletSandboxes lists all (or just the running) sandboxes managed by kubelet. -func (r *RuntimePodFetcher) getKubeletSandboxes(all bool) ([]*runtimeapi.PodSandbox, error) { +func (r *runtimePodFetcherImpl) getKubeletSandboxes(all bool) ([]*runtimeapi.PodSandbox, error) { var filter *runtimeapi.PodSandboxFilter if !all { readyState := runtimeapi.PodSandboxState_SANDBOX_READY @@ -152,7 +156,7 @@ func (r *RuntimePodFetcher) getKubeletSandboxes(all bool) ([]*runtimeapi.PodSand // getKubeletContainers lists containers managed by kubelet. // The boolean parameter specifies whether returns all containers including // those already exited and dead containers (used for garbage collection). -func (r *RuntimePodFetcher) getKubeletContainers(allContainers bool) ([]*runtimeapi.Container, error) { +func (r *runtimePodFetcherImpl) getKubeletContainers(allContainers bool) ([]*runtimeapi.Container, error) { filter := &runtimeapi.ContainerFilter{} if !allContainers { filter.State = &runtimeapi.ContainerStateValue{