Skip to content

Commit

Permalink
refine docker center code
Browse files Browse the repository at this point in the history
  • Loading branch information
linrunqi08 committed Dec 9, 2024
1 parent bf1d12d commit e78ef73
Show file tree
Hide file tree
Showing 6 changed files with 315 additions and 61 deletions.
1 change: 1 addition & 0 deletions pkg/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ require (
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stefanberger/go-pkcs11uri v0.0.0-20201008174630-78d3cae3a980 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/tchap/go-patricia v2.3.0+incompatible // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/vishvananda/netlink v1.1.1-0.20210330154013-f5de75959ad5 // indirect
Expand Down
13 changes: 8 additions & 5 deletions pkg/helper/container_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,21 +191,24 @@ func SplitRegexFromMap(input map[string]string) (staticResult map[string]string,
return staticResult, regexResult, nil
}

func CreateDockerClient(opt ...docker.Opt) (client *docker.Client, err error) {
func CreateDockerClient(opt ...docker.Opt) (*DockerClientWrapper, error) {
opt = append(opt, docker.FromEnv)
client, err = docker.NewClientWithOpts(opt...)
rawClient, err := docker.NewClientWithOpts(opt...)
if err != nil {
return nil, err
}
// add dockerClient connectivity tests
pingCtx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
ping, err := client.Ping(pingCtx)
ping, err := rawClient.Ping(pingCtx)
if err != nil {
return nil, err
}
client.NegotiateAPIVersionPing(ping)
return
rawClient.NegotiateAPIVersionPing(ping)

return &DockerClientWrapper{
client: rawClient,
}, nil
}

func RegisterDockerEventListener(c chan events.Message) {
Expand Down
103 changes: 65 additions & 38 deletions pkg/helper/docker_center.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"errors"
"hash/fnv"
"io"
"path"
"regexp"
"runtime"
Expand Down Expand Up @@ -458,7 +459,7 @@ type DockerCenter struct {
// For the CRI scenario, the container list only contains the real containers and excludes the sandbox containers. But the
// sandbox meta would be saved to its bound container.
containerMap map[string]*DockerInfoDetail // all containers will in this map
client *docker.Client
client DockerCenterClientInterface
lastErrMu sync.Mutex
lastErr error
lock sync.RWMutex
Expand All @@ -471,6 +472,43 @@ type DockerCenter struct {
initStaticContainerInfoSuccess bool
}

type DockerCenterClientInterface interface {
ContainerList(ctx context.Context, options types.ContainerListOptions) ([]types.Container, error)
ImageInspectWithRaw(ctx context.Context, imageID string) (types.ImageInspect, []byte, error)
ContainerInspect(ctx context.Context, containerID string) (types.ContainerJSON, error)
Events(ctx context.Context, options types.EventsOptions) (<-chan events.Message, <-chan error)
ContainerLogs(ctx context.Context, container string, options types.ContainerLogsOptions) (io.ReadCloser, error)
ContainerProcessAlive(pid int) bool
}

type DockerClientWrapper struct {
client *docker.Client
}

func (r *DockerClientWrapper) ContainerList(ctx context.Context, options types.ContainerListOptions) ([]types.Container, error) {
return r.client.ContainerList(ctx, options)
}

func (r *DockerClientWrapper) ImageInspectWithRaw(ctx context.Context, imageID string) (types.ImageInspect, []byte, error) {
return r.client.ImageInspectWithRaw(ctx, imageID)
}

func (r *DockerClientWrapper) ContainerInspect(ctx context.Context, containerID string) (types.ContainerJSON, error) {
return r.client.ContainerInspect(ctx, containerID)
}

func (r *DockerClientWrapper) Events(ctx context.Context, options types.EventsOptions) (<-chan events.Message, <-chan error) {
return r.client.Events(ctx, options)
}

func (r *DockerClientWrapper) ContainerProcessAlive(pid int) bool {
return ContainerProcessAlive(pid)
}

func (r *DockerClientWrapper) ContainerLogs(ctx context.Context, container string, options types.ContainerLogsOptions) (io.ReadCloser, error) {
return r.client.ContainerLogs(ctx, container, options)
}

func getIPByHosts(hostFileName, hostname string) string {
lines, err := util.ReadLines(hostFileName)
if err != nil {
Expand Down Expand Up @@ -1023,35 +1061,6 @@ func (dc *DockerCenter) updateContainer(id string, container *DockerInfoDetail)
dc.refreshLastUpdateMapTime()
}

func (dc *DockerCenter) inspectOneContainer(containerID string) (types.ContainerJSON, error) {
var err error
var containerDetail types.ContainerJSON
for idx := 0; idx < 3; idx++ {
if containerDetail, err = dc.client.ContainerInspect(context.Background(), containerID); err == nil {
break
}
time.Sleep(time.Second * 5)
}
if err != nil {
dc.setLastError(err, "inspect container error "+containerID)
return types.ContainerJSON{}, err
}
if !ContainerProcessAlive(containerDetail.State.Pid) {
containerDetail.State.Status = ContainerStatusExited
finishedAt := containerDetail.State.FinishedAt
finishedAtTime, _ := time.Parse(time.RFC3339, finishedAt)
now := time.Now()
duration := now.Sub(finishedAtTime)
if duration >= ContainerInfoDeletedTimeout {
errMsg := "inspect time out container " + containerID
err = errors.New(errMsg)
dc.setLastError(err, errMsg)
return types.ContainerJSON{}, err
}
}
return containerDetail, nil
}

func (dc *DockerCenter) fetchAll() error {
dc.containerStateLock.Lock()
defer dc.containerStateLock.Unlock()
Expand All @@ -1065,21 +1074,37 @@ func (dc *DockerCenter) fetchAll() error {

for _, container := range containers {
var containerDetail types.ContainerJSON
containerDetail, err = dc.inspectOneContainer(container.ID)
for idx := 0; idx < 3; idx++ {
if containerDetail, err = dc.client.ContainerInspect(context.Background(), container.ID); err == nil {
break
}
time.Sleep(time.Second * 5)
}
if err == nil {
if !dc.client.ContainerProcessAlive(containerDetail.State.Pid) {
continue
}
containerMap[container.ID] = dc.CreateInfoDetail(containerDetail, envConfigPrefix, false)
} else {
dc.setLastError(err, "inspect container error "+container.ID)
}
}
dc.updateContainers(containerMap)

return nil
}

func (dc *DockerCenter) fetchOne(containerID string, tryFindSandbox bool) error {
dc.containerStateLock.Lock()
defer dc.containerStateLock.Unlock()
containerDetail, err := dc.inspectOneContainer(containerID)
containerDetail, err := dc.client.ContainerInspect(context.Background(), containerID)
if err != nil {
dc.setLastError(err, "inspect container error "+containerID)
return err
}
if !dc.client.ContainerProcessAlive(containerDetail.State.Pid) {
errMsg := "inspect time out container " + containerID
err = errors.New(errMsg)
dc.setLastError(err, errMsg)
return err
}
// docker 场景下
Expand All @@ -1093,7 +1118,7 @@ func (dc *DockerCenter) fetchOne(containerID string, tryFindSandbox bool) error
if err != nil {
dc.setLastError(err, "inspect sandbox container error "+id)
} else {
if containerDetail.State.Status == ContainerStatusRunning && !ContainerProcessAlive(containerDetail.State.Pid) {
if containerDetail.State.Status == ContainerStatusRunning && !dc.client.ContainerProcessAlive(containerDetail.State.Pid) {
containerDetail.State.Status = ContainerStatusExited
}
dc.updateContainer(id, dc.CreateInfoDetail(containerDetail, envConfigPrefix, false))
Expand Down Expand Up @@ -1171,10 +1196,12 @@ func dockerCenterRecover() {

func (dc *DockerCenter) initClient() error {
var err error
// DockerCenterTimeout should only be used by context.WithTimeout() in specific methods
if dc.client, err = CreateDockerClient(); err != nil {
dc.setLastError(err, "init docker client from env error")
return err
// do not CreateDockerClient multi times
if dc.client == nil {
if dc.client, err = CreateDockerClient(); err != nil {
dc.setLastError(err, "init docker client from env error")
return err
}
}
return nil
}
Expand Down
Loading

0 comments on commit e78ef73

Please sign in to comment.