Skip to content

Commit

Permalink
container state
Browse files Browse the repository at this point in the history
Signed-off-by: Starnop <starnop@163.com>
  • Loading branch information
starnop committed Jun 11, 2018
1 parent f742cf2 commit 85c19c1
Show file tree
Hide file tree
Showing 103 changed files with 15,504 additions and 44 deletions.
7 changes: 7 additions & 0 deletions cri/config/config.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
package config

const (
// DefaultNameSpace the default namespace of containerd
DefaultNameSpace = "default"
// CadvisorNameSpace the default namespace of the cadvisor will collect
CadvisorNameSpace = "k8s.io"
)

// Config defines the CRI configuration.
type Config struct {
// Listen is the listening address which servers CRI.
Expand Down
15 changes: 8 additions & 7 deletions cri/criservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ import (
servicev1alpha1 "github.com/alibaba/pouch/cri/v1alpha1/service"
criv1alpha2 "github.com/alibaba/pouch/cri/v1alpha2"
servicev1alpha2 "github.com/alibaba/pouch/cri/v1alpha2/service"
"github.com/alibaba/pouch/ctrd"
"github.com/alibaba/pouch/daemon/config"
"github.com/alibaba/pouch/daemon/mgr"

"github.com/sirupsen/logrus"
)

// RunCriService start cri service if pouchd is specified with --enable-cri.
func RunCriService(daemonconfig *config.Config, containerMgr mgr.ContainerMgr, imageMgr mgr.ImageMgr, stopCh chan error) {
func RunCriService(daemonconfig *config.Config, client ctrd.APIClient, containerMgr mgr.ContainerMgr, imageMgr mgr.ImageMgr, stopCh chan error) {
var err error

defer func() {
Expand All @@ -26,19 +27,19 @@ func RunCriService(daemonconfig *config.Config, containerMgr mgr.ContainerMgr, i
}
switch daemonconfig.CriConfig.CriVersion {
case "v1alpha1":
err = runv1alpha1(daemonconfig, containerMgr, imageMgr)
err = runv1alpha1(daemonconfig, client, containerMgr, imageMgr)
case "v1alpha2":
err = runv1alpha2(daemonconfig, containerMgr, imageMgr)
err = runv1alpha2(daemonconfig, client, containerMgr, imageMgr)
default:
err = fmt.Errorf("invalid CRI version,failed to start CRI service")
}
return
}

// Start CRI service with CRI version: v1alpha1
func runv1alpha1(daemonconfig *config.Config, containerMgr mgr.ContainerMgr, imageMgr mgr.ImageMgr) error {
func runv1alpha1(daemonconfig *config.Config, client ctrd.APIClient, containerMgr mgr.ContainerMgr, imageMgr mgr.ImageMgr) error {
logrus.Infof("Start CRI service with CRI version: v1alpha1")
criMgr, err := criv1alpha1.NewCriManager(daemonconfig, containerMgr, imageMgr)
criMgr, err := criv1alpha1.NewCriManager(daemonconfig, client, containerMgr, imageMgr)
if err != nil {
return fmt.Errorf("failed to get CriManager with error: %v", err)
}
Expand Down Expand Up @@ -76,9 +77,9 @@ func runv1alpha1(daemonconfig *config.Config, containerMgr mgr.ContainerMgr, ima
}

// Start CRI service with CRI version: v1alpha2
func runv1alpha2(daemonconfig *config.Config, containerMgr mgr.ContainerMgr, imageMgr mgr.ImageMgr) error {
func runv1alpha2(daemonconfig *config.Config, client ctrd.APIClient, containerMgr mgr.ContainerMgr, imageMgr mgr.ImageMgr) error {
logrus.Infof("Start CRI service with CRI version: v1alpha2")
criMgr, err := criv1alpha2.NewCriManager(daemonconfig, containerMgr, imageMgr)
criMgr, err := criv1alpha2.NewCriManager(daemonconfig, client, containerMgr, imageMgr)
if err != nil {
return fmt.Errorf("failed to get CriManager with error: %v", err)
}
Expand Down
105 changes: 101 additions & 4 deletions cri/v1alpha1/cri.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

apitypes "github.com/alibaba/pouch/apis/types"
"github.com/alibaba/pouch/ctrd"
"github.com/alibaba/pouch/daemon/config"
"github.com/alibaba/pouch/daemon/mgr"
"github.com/alibaba/pouch/pkg/errtypes"
Expand Down Expand Up @@ -60,6 +61,15 @@ const (

// resolvConfPath is the abs path of resolv.conf on host or container.
resolvConfPath = "/etc/resolv.conf"

// statsCollectPeriod is the time duration we sync stats from containerd.
statsCollectPeriod = 10

// defaultSnapshotterName is the default Snapshotter name.
defaultSnapshotterName = "overlayfs"

// snapshotPlugin implements a snapshotter.
snapshotPlugin = "io.containerd.snapshotter.v1"
)

var (
Expand All @@ -85,6 +95,9 @@ type CriManager struct {
ImageMgr mgr.ImageMgr
CniMgr CniMgr

// Client is used to interact with containerd.
Client ctrd.APIClient

// StreamServer is the stream server of CRI serves container streaming request.
StreamServer Server

Expand All @@ -95,10 +108,16 @@ type CriManager struct {
SandboxImage string
// SandboxStore stores the configuration of sandboxes.
SandboxStore *meta.Store

// SnapshotStore stores information of all snapshots.
SnapshotStore *mgr.SnapshotStore

// ImageFSUUID is the device uuid of image filesystem.
ImageFSUUID string
}

// NewCriManager creates a brand new cri manager.
func NewCriManager(config *config.Config, ctrMgr mgr.ContainerMgr, imgMgr mgr.ImageMgr) (CriMgr, error) {
func NewCriManager(config *config.Config, cli ctrd.APIClient, ctrMgr mgr.ContainerMgr, imgMgr mgr.ImageMgr) (CriMgr, error) {
streamServer, err := newStreamServer(ctrMgr, streamServerAddress, streamServerPort)
if err != nil {
return nil, fmt.Errorf("failed to create stream server for cri manager: %v", err)
Expand All @@ -108,9 +127,11 @@ func NewCriManager(config *config.Config, ctrMgr mgr.ContainerMgr, imgMgr mgr.Im
ContainerMgr: ctrMgr,
ImageMgr: imgMgr,
CniMgr: NewCniManager(&config.CriConfig),
Client: cli,
StreamServer: streamServer,
SandboxBaseDir: path.Join(config.HomeDir, "sandboxes"),
SandboxImage: config.CriConfig.SandboxImage,
SnapshotStore: mgr.NewSnapshotStore(),
}

c.SandboxStore, err = meta.NewStore(meta.Config{
Expand All @@ -127,6 +148,19 @@ func NewCriManager(config *config.Config, ctrMgr mgr.ContainerMgr, imgMgr mgr.Im
return nil, fmt.Errorf("failed to create sandbox meta store: %v", err)
}

imageFSPath := imageFSPath(path.Join(config.HomeDir, "containerd/root"), defaultSnapshotterName)
c.ImageFSUUID, err = getDeviceUUID(imageFSPath)
if err != nil {
return nil, fmt.Errorf("failed to get imagefs uuid of %q: %v", imageFSPath, err)
}

snapshotsSyncer := mgr.NewSnapshotsSyncer(
c.SnapshotStore,
c.Client,
time.Duration(statsCollectPeriod)*time.Second,
)
snapshotsSyncer.Start()

return NewCriWrapper(c), nil
}

Expand Down Expand Up @@ -679,12 +713,55 @@ func (c *CriManager) ContainerStatus(ctx context.Context, r *runtime.ContainerSt
// ContainerStats returns stats of the container. If the container does not
// exist, the call returns an error.
func (c *CriManager) ContainerStats(ctx context.Context, r *runtime.ContainerStatsRequest) (*runtime.ContainerStatsResponse, error) {
return nil, fmt.Errorf("ContainerStats Not Implemented Yet")
containerID := r.GetContainerId()

container, err := c.ContainerMgr.Get(ctx, containerID)
if err != nil {
return nil, fmt.Errorf("failed to get container %q with error: %v", containerID, err)
}

stats, err := c.ContainerMgr.Stats(ctx, containerID)
if err != nil {
return nil, fmt.Errorf("failed to get stats of container %q with error: %v", containerID, err)
}

cs, err := c.getContainerMetrics(container, stats)
if err != nil {
return nil, fmt.Errorf("failed to decode container metrics: %v", err)
}

return &runtime.ContainerStatsResponse{Stats: cs}, nil
}

// ListContainerStats returns stats of all running containers.
func (c *CriManager) ListContainerStats(ctx context.Context, r *runtime.ListContainerStatsRequest) (*runtime.ListContainerStatsResponse, error) {
return nil, fmt.Errorf("ListContainerStats Not Implemented Yet")
opts := &mgr.ContainerListOption{All: true}
filter := func(c *mgr.Container) bool {
return true
}
containers, err := c.ContainerMgr.List(ctx, filter, opts)
if err != nil {
return nil, fmt.Errorf("failed to list containers: %v", err)
}

result := &runtime.ListContainerStatsResponse{}
for _, container := range containers {
stats, err := c.ContainerMgr.Stats(ctx, container.ID)
if err != nil {
logrus.Errorf("failed to get stats of container %q: %v", container.ID, err)
continue
}

cs, err := c.getContainerMetrics(container, stats)
if err != nil {
logrus.Errorf("failed to decode metrics of container %q: %v", container.ID, err)
continue
}

result.Stats = append(result.Stats, cs)
}

return result, nil
}

// UpdateContainerResources updates ContainerConfig of the container.
Expand Down Expand Up @@ -907,5 +984,25 @@ func (c *CriManager) RemoveImage(ctx context.Context, r *runtime.RemoveImageRequ

// ImageFsInfo returns information of the filesystem that is used to store images.
func (c *CriManager) ImageFsInfo(ctx context.Context, r *runtime.ImageFsInfoRequest) (*runtime.ImageFsInfoResponse, error) {
return nil, fmt.Errorf("ImageFsInfo Not Implemented Yet")
snapshots := c.SnapshotStore.List()
timestamp := time.Now().UnixNano()
var usedBytes, inodesUsed uint64
for _, sn := range snapshots {
// Use the oldest timestamp as the timestamp of imagefs info.
if sn.Timestamp < timestamp {
timestamp = sn.Timestamp
}
usedBytes += sn.Size
inodesUsed += sn.Inodes
}
return &runtime.ImageFsInfoResponse{
ImageFilesystems: []*runtime.FilesystemUsage{
{
Timestamp: timestamp,
StorageId: &runtime.StorageIdentifier{Uuid: c.ImageFSUUID},
UsedBytes: &runtime.UInt64Value{Value: usedBytes},
InodesUsed: &runtime.UInt64Value{Value: inodesUsed},
},
},
}, nil
}
Loading

0 comments on commit 85c19c1

Please sign in to comment.