Skip to content

Commit

Permalink
feat(metaserver): support get pod by uid
Browse files Browse the repository at this point in the history
Signed-off-by: linzhecheng <linzhecheng@bytedance.com>
  • Loading branch information
cheney-lin committed Apr 25, 2023
1 parent 4960db0 commit c4478b9
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 12 deletions.
6 changes: 6 additions & 0 deletions pkg/metaserver/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/fake"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"

Expand Down Expand Up @@ -68,6 +69,7 @@ func constructPodFetcher(names []string) pod.PodFetcher {
pods = append(pods, &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
UID: types.UID(name + "-uid"),
},
})
}
Expand All @@ -94,6 +96,10 @@ func TestFetcher(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 2, len(podObjList))

pod1, err := agent.GetPod(ctx, "test-pod-1-uid")
assert.NoError(t, err)
assert.Equal(t, "test-pod-1", pod1.Name)

nodeObj, err := agent.GetNode(ctx)
assert.NoError(t, err)
assert.Equal(t, "test-node-1", nodeObj.Name)
Expand Down
18 changes: 13 additions & 5 deletions pkg/metaserver/agent/pod/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,26 @@ type KubeletPodFetcher interface {
// kubeletPodFetcherImpl use kubelet 10255 pods interface to get pod directly without cache.
type kubeletPodFetcherImpl struct{}

// GetPodList get pods from kubelet 10255/pods api, and the returned slice does not
// contain pods that don't pass `podFilter`
func (k *kubeletPodFetcherImpl) GetPodList(_ context.Context, podFilter func(*v1.Pod) bool) ([]*v1.Pod, error) {
func getPodsByKubeletAPI() (v1.PodList, error) {
const podsApi = "http://localhost:10255/pods"

var podList v1.PodList
err := process.GetAndUnmarshal(podsApi, &podList)
if err != nil {
return []*v1.Pod{}, fmt.Errorf("failed to get pod list, error: %v", err)
return podList, fmt.Errorf("failed to get pod list, error: %v", err)
} else if len(podList.Items) == 0 {
// kubelet should at least contain current pod as an item
return []*v1.Pod{}, fmt.Errorf("kubelet returns empty pod list")
return podList, fmt.Errorf("kubelet returns empty pod list")
}
return podList, nil
}

// GetPodList get pods from kubelet 10255/pods api, and the returned slice does not
// contain pods that don't pass `podFilter`
func (k *kubeletPodFetcherImpl) GetPodList(_ context.Context, podFilter func(*v1.Pod) bool) ([]*v1.Pod, error) {
podList, err := getPodsByKubeletAPI()
if err != nil {
return nil, err
}

var pods []*v1.Pod
Expand Down
15 changes: 14 additions & 1 deletion pkg/metaserver/agent/pod/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,13 @@ type PodFetcher interface {
// GetContainerID & GetContainerSpec are used to parse running container info
GetContainerID(podUID, containerName string) (string, error)
GetContainerSpec(podUID, containerName string) (*v1.Container, error)
// GetPod returns Pod by UID
GetPod(ctx context.Context, podUID string) (*v1.Pod, error)
}

type podFetcherImpl struct {
kubeletPodFetcher KubeletPodFetcher
runtimePodFetcher *RuntimePodFetcher
runtimePodFetcher RuntimePodFetcher

kubeletPodsCache map[string]*v1.Pod
kubeletPodsCacheLock sync.RWMutex
Expand Down Expand Up @@ -197,6 +199,17 @@ func (w *podFetcherImpl) GetPodList(ctx context.Context, podFilter func(*v1.Pod)
return res, nil
}

func (w *podFetcherImpl) GetPod(ctx context.Context, podUID string) (*v1.Pod, error) {
kubeletPodsCache, err := w.getKubeletPodsCache(ctx)
if err != nil {
return nil, fmt.Errorf("getKubeletPodsCache failed with error: %v", err)
}
if pod, ok := kubeletPodsCache[podUID]; ok {
return pod, nil
}
return nil, fmt.Errorf("failed to find pod by uid %v", podUID)
}

func (w *podFetcherImpl) getKubeletPodsCache(ctx context.Context) (map[string]*v1.Pod, error) {
// if current kubelet pod cache is nil or enforce bypass, we sync cache first
w.kubeletPodsCacheLock.RLock()
Expand Down
11 changes: 11 additions & 0 deletions pkg/metaserver/agent/pod/pod_stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,17 @@ func (p *PodFetcherStub) GetPodList(_ context.Context, podFilter func(*v1.Pod) b
return pods, nil
}

func (p *PodFetcherStub) GetPod(_ context.Context, podUID string) (*v1.Pod, error) {
p.mutex.Lock()
defer p.mutex.Unlock()
for _, pod := range p.PodList {
if string(pod.UID) == podUID {
return pod, nil
}
}
return nil, fmt.Errorf("failed to find pod by uid %v", podUID)
}

func (p *PodFetcherStub) Run(_ context.Context) {}

func (p *PodFetcherStub) GetContainerID(podUID, containerName string) (string, error) {
Expand Down
16 changes: 10 additions & 6 deletions pkg/metaserver/agent/pod/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,23 +55,27 @@ type labeledContainerInfo struct {
PodUID kubetypes.UID
}

type RuntimePodFetcher struct {
type RuntimePodFetcher interface {
GetPods(all bool) ([]*RuntimePod, error)
}

type RuntimePodFetcherImpl struct {
runtimeService cri.RuntimeService
}

func NewRuntimePodFetcher(conf *config.Configuration) (*RuntimePodFetcher, error) {
func NewRuntimePodFetcher(conf *config.Configuration) (*RuntimePodFetcherImpl, error) {
runtimeService, err := remote.NewRemoteRuntimeService(conf.RemoteRuntimeEndpoint, 2*time.Minute)
if err != nil {
return nil, fmt.Errorf("create remote runtime service failed %s", err)
}

return &RuntimePodFetcher{runtimeService: runtimeService}, nil
return &RuntimePodFetcherImpl{runtimeService: runtimeService}, nil
}

// GetPods returns a list of containers grouped by pods. The boolean parameter
// specifies whether the runtime returns all containers including those already
// exited and dead containers (used for garbage collection).
func (r *RuntimePodFetcher) GetPods(all bool) ([]*RuntimePod, error) {
func (r *RuntimePodFetcherImpl) GetPods(all bool) ([]*RuntimePod, error) {
pods := make(map[kubetypes.UID]*RuntimePod)
sandboxes, err := r.getKubeletSandboxes(all)
if err != nil {
Expand Down Expand Up @@ -129,7 +133,7 @@ func (r *RuntimePodFetcher) GetPods(all bool) ([]*RuntimePod, error) {
}

// getKubeletSandboxes lists all (or just the running) sandboxes managed by kubelet.
func (r *RuntimePodFetcher) getKubeletSandboxes(all bool) ([]*runtimeapi.PodSandbox, error) {
func (r *RuntimePodFetcherImpl) getKubeletSandboxes(all bool) ([]*runtimeapi.PodSandbox, error) {
var filter *runtimeapi.PodSandboxFilter
if !all {
readyState := runtimeapi.PodSandboxState_SANDBOX_READY
Expand All @@ -152,7 +156,7 @@ func (r *RuntimePodFetcher) getKubeletSandboxes(all bool) ([]*runtimeapi.PodSand
// getKubeletContainers lists containers managed by kubelet.
// The boolean parameter specifies whether returns all containers including
// those already exited and dead containers (used for garbage collection).
func (r *RuntimePodFetcher) getKubeletContainers(allContainers bool) ([]*runtimeapi.Container, error) {
func (r *RuntimePodFetcherImpl) getKubeletContainers(allContainers bool) ([]*runtimeapi.Container, error) {
filter := &runtimeapi.ContainerFilter{}
if !allContainers {
filter.State = &runtimeapi.ContainerStateValue{
Expand Down

0 comments on commit c4478b9

Please sign in to comment.