diff --git a/container/common/fsHandler.go b/container/common/fsHandler.go index f944b2277a..a39b6b31a5 100644 --- a/container/common/fsHandler.go +++ b/container/common/fsHandler.go @@ -37,15 +37,21 @@ type FsUsage struct { InodeUsage uint64 } +type FsUsageProvider interface { + // Usage returns the fs usage + Usage() (*FsUsage, error) + // Targets returns where the fs usage metric is collected,it maybe a directory, a file or some + // information about the snapshotter(for containerd) + Targets() []string +} + type realFsHandler struct { sync.RWMutex - lastUpdate time.Time - usage FsUsage - period time.Duration - minPeriod time.Duration - rootfs string - extraDir string - fsInfo fs.FsInfo + lastUpdate time.Time + usage FsUsage + period time.Duration + minPeriod time.Duration + usageProvider FsUsageProvider // Tells the container to stop. stopChan chan struct{} } @@ -58,56 +64,33 @@ const DefaultPeriod = time.Minute var _ FsHandler = &realFsHandler{} -func NewFsHandler(period time.Duration, rootfs, extraDir string, fsInfo fs.FsInfo) FsHandler { +func NewFsHandler(period time.Duration, provider FsUsageProvider) FsHandler { return &realFsHandler{ - lastUpdate: time.Time{}, - usage: FsUsage{}, - period: period, - minPeriod: period, - rootfs: rootfs, - extraDir: extraDir, - fsInfo: fsInfo, - stopChan: make(chan struct{}, 1), + lastUpdate: time.Time{}, + usage: FsUsage{}, + period: period, + minPeriod: period, + usageProvider: provider, + stopChan: make(chan struct{}, 1), } } func (fh *realFsHandler) update() error { - var ( - rootUsage, extraUsage fs.UsageInfo - rootErr, extraErr error - ) - // TODO(vishh): Add support for external mounts. - if fh.rootfs != "" { - rootUsage, rootErr = fh.fsInfo.GetDirUsage(fh.rootfs) - } - if fh.extraDir != "" { - extraUsage, extraErr = fh.fsInfo.GetDirUsage(fh.extraDir) + usage, err := fh.usageProvider.Usage() + + if err != nil { + return err } - // Wait to handle errors until after all operartions are run. - // An error in one will not cause an early return, skipping others fh.Lock() defer fh.Unlock() fh.lastUpdate = time.Now() - if fh.rootfs != "" && rootErr == nil { - fh.usage.InodeUsage = rootUsage.Inodes - fh.usage.BaseUsageBytes = rootUsage.Bytes - fh.usage.TotalUsageBytes = rootUsage.Bytes - } - if fh.extraDir != "" && extraErr == nil { - if fh.rootfs != "" { - fh.usage.TotalUsageBytes += extraUsage.Bytes - } else { - // rootfs is empty, totalUsageBytes use extra usage bytes - fh.usage.TotalUsageBytes = extraUsage.Bytes - } - } - // Combine errors into a single error to return - if rootErr != nil || extraErr != nil { - return fmt.Errorf("rootDiskErr: %v, extraDiskErr: %v", rootErr, extraErr) - } + fh.usage.InodeUsage = usage.InodeUsage + fh.usage.BaseUsageBytes = usage.BaseUsageBytes + fh.usage.TotalUsageBytes = usage.TotalUsageBytes + return nil } @@ -130,7 +113,8 @@ func (fh *realFsHandler) trackUsage() { // if the long duration is persistent either because of slow // disk or lots of containers. longOp = longOp + time.Second - klog.V(2).Infof("fs: disk usage and inodes count on following dirs took %v: %v; will not log again for this container unless duration exceeds %v", duration, []string{fh.rootfs, fh.extraDir}, longOp) + klog.V(2).Infof(`fs: disk usage and inodes count on targets took %v: %v; `+ + `will not log again for this container unless duration exceeds %v`, duration, fh.usageProvider.Targets(), longOp) } select { case <-fh.stopChan: @@ -153,3 +137,56 @@ func (fh *realFsHandler) Usage() FsUsage { defer fh.RUnlock() return fh.usage } + +type fsUsageProvider struct { + fsInfo fs.FsInfo + rootFs string + // The directory consumed by the container but outside rootFs, e.g. directory of saving logs + extraDir string +} + +func NewGeneralFsUsageProvider(fsInfo fs.FsInfo, rootFs, extraDir string) FsUsageProvider { + return &fsUsageProvider{ + fsInfo: fsInfo, + rootFs: rootFs, + extraDir: extraDir, + } +} + +func (f *fsUsageProvider) Targets() []string { + return []string{f.rootFs, f.extraDir} +} + +func (f *fsUsageProvider) Usage() (*FsUsage, error) { + var ( + rootUsage, extraUsage fs.UsageInfo + rootErr, extraErr error + ) + + if f.rootFs != "" { + rootUsage, rootErr = f.fsInfo.GetDirUsage(f.rootFs) + } + + if f.extraDir != "" { + extraUsage, extraErr = f.fsInfo.GetDirUsage(f.extraDir) + } + + usage := &FsUsage{} + + if f.rootFs != "" && rootErr == nil { + usage.InodeUsage = rootUsage.Inodes + usage.BaseUsageBytes = rootUsage.Bytes + usage.TotalUsageBytes = rootUsage.Bytes + } + + if f.extraDir != "" && extraErr == nil { + usage.TotalUsageBytes += extraUsage.Bytes + } + + // Combine errors into a single error to return + if rootErr != nil || extraErr != nil { + return nil, fmt.Errorf("failed to obtain filesystem usage; rootDiskErr: %v, extraDiskErr: %v", rootErr, extraErr) + } + + return usage, nil +} diff --git a/container/containerd/client.go b/container/containerd/client.go index 72579d1d5c..3b5beb42be 100644 --- a/container/containerd/client.go +++ b/container/containerd/client.go @@ -22,8 +22,10 @@ import ( "time" containersapi "github.com/containerd/containerd/api/services/containers/v1" + snapshotapi "github.com/containerd/containerd/api/services/snapshots/v1" tasksapi "github.com/containerd/containerd/api/services/tasks/v1" versionapi "github.com/containerd/containerd/api/services/version/v1" + types "github.com/containerd/containerd/api/types" "github.com/containerd/containerd/containers" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/pkg/dialer" @@ -37,6 +39,7 @@ type client struct { containerService containersapi.ContainersClient taskService tasksapi.TasksClient versionService versionapi.VersionClient + snapshotService snapshotapi.SnapshotsClient criService criapi.RuntimeServiceClient } @@ -44,7 +47,9 @@ type ContainerdClient interface { LoadContainer(ctx context.Context, id string) (*containers.Container, error) TaskPid(ctx context.Context, id string) (uint32, error) Version(ctx context.Context) (string, error) + SnapshotMounts(ctx context.Context, snapshotter, key string) ([]*types.Mount, error) ContainerStatus(ctx context.Context, id string) (*criapi.ContainerStatus, error) + ContainerStats(ctx context.Context, id string) (*criapi.ContainerStats, error) } var once sync.Once @@ -95,6 +100,7 @@ func Client(address, namespace string) (ContainerdClient, error) { containerService: containersapi.NewContainersClient(conn), taskService: tasksapi.NewTasksClient(conn), versionService: versionapi.NewVersionClient(conn), + snapshotService: snapshotapi.NewSnapshotsClient(conn), criService: criapi.NewRuntimeServiceClient(conn), } }) @@ -129,6 +135,17 @@ func (c *client) Version(ctx context.Context) (string, error) { return response.Version, nil } +func (c *client) SnapshotMounts(ctx context.Context, snapshotter, key string) ([]*types.Mount, error) { + response, err := c.snapshotService.Mounts(ctx, &snapshotapi.MountsRequest{ + Snapshotter: snapshotter, + Key: key, + }) + if err != nil { + return nil, errdefs.FromGRPC(err) + } + return response.Mounts, nil +} + func (c *client) ContainerStatus(ctx context.Context, id string) (*criapi.ContainerStatus, error) { response, err := c.criService.ContainerStatus(ctx, &criapi.ContainerStatusRequest{ ContainerId: id, @@ -140,6 +157,16 @@ func (c *client) ContainerStatus(ctx context.Context, id string) (*criapi.Contai return response.Status, nil } +func (c *client) ContainerStats(ctx context.Context, id string) (*criapi.ContainerStats, error) { + response, err := c.criService.ContainerStats(ctx, &criapi.ContainerStatsRequest{ + ContainerId: id, + }) + if err != nil { + return nil, err + } + return response.Stats, nil +} + func containerFromProto(containerpb containersapi.Container) *containers.Container { var runtime containers.RuntimeInfo if containerpb.Runtime != nil { diff --git a/container/containerd/client_test.go b/container/containerd/client_test.go index 1f0deb824f..c80c47a1d1 100644 --- a/container/containerd/client_test.go +++ b/container/containerd/client_test.go @@ -18,6 +18,7 @@ import ( "context" "fmt" + types "github.com/containerd/containerd/api/types" "github.com/containerd/containerd/containers" criapi "github.com/google/cadvisor/cri-api/pkg/apis/runtime/v1alpha2" ) @@ -25,6 +26,7 @@ import ( type containerdClientMock struct { cntrs map[string]*containers.Container status *criapi.ContainerStatus + stats *criapi.ContainerStats returnErr error } @@ -51,10 +53,19 @@ func (c *containerdClientMock) ContainerStatus(ctx context.Context, id string) ( return c.status, nil } -func mockcontainerdClient(cntrs map[string]*containers.Container, status *criapi.ContainerStatus, returnErr error) ContainerdClient { +func (c *containerdClientMock) ContainerStats(ctx context.Context, id string) (*criapi.ContainerStats, error) { + return c.stats, nil +} + +func (c *containerdClientMock) SnapshotMounts(ctx context.Context, snapshotter, key string) ([]*types.Mount, error) { + return nil, nil +} + +func mockcontainerdClient(cntrs map[string]*containers.Container, status *criapi.ContainerStatus, stats *criapi.ContainerStats, returnErr error) ContainerdClient { return &containerdClientMock{ cntrs: cntrs, status: status, + stats: stats, returnErr: returnErr, } } diff --git a/container/containerd/factory_test.go b/container/containerd/factory_test.go index 4f14b6df70..09acd56f7d 100644 --- a/container/containerd/factory_test.go +++ b/container/containerd/factory_test.go @@ -58,7 +58,7 @@ func TestCanHandleAndAccept(t *testing.T) { testContainers["40af7cdcbe507acad47a5a62025743ad3ddc6ab93b77b21363aa1c1d641047c9"] = testContainer f := &containerdFactory{ - client: mockcontainerdClient(testContainers, nil, nil), + client: mockcontainerdClient(testContainers, nil, nil, nil), cgroupSubsystems: containerlibcontainer.CgroupSubsystems{}, fsInfo: nil, machineInfoFactory: nil, diff --git a/container/containerd/handler.go b/container/containerd/handler.go index 685cd06429..82e836fc55 100644 --- a/container/containerd/handler.go +++ b/container/containerd/handler.go @@ -22,6 +22,7 @@ import ( "time" "github.com/containerd/containerd/errdefs" + criapi "github.com/google/cadvisor/cri-api/pkg/apis/runtime/v1alpha2" "golang.org/x/net/context" "github.com/google/cadvisor/container" @@ -32,12 +33,26 @@ import ( specs "github.com/opencontainers/runtime-spec/specs-go" ) +type fsUsageProvider struct { + ctx context.Context + containerID string + client ContainerdClient + fsInfo fs.FsInfo + logPath string +} + type containerdContainerHandler struct { machineInfoFactory info.MachineInfoFactory // Absolute path to the cgroup hierarchies of this container. // (e.g.: "cpu" -> "/sys/fs/cgroup/cpu/test") - cgroupPaths map[string]string - fsInfo fs.FsInfo + cgroupPaths map[string]string + fsInfo fs.FsInfo + fsHandler common.FsHandler + fsLimit uint64 + fsTotalInodes uint64 + fsType string + device string + // Metadata associated with the container. reference info.ContainerReference envs map[string]string @@ -117,8 +132,9 @@ func newContainerdContainerHandler( // Special container name for sandbox(pause) // It is defined in https://github.com/kubernetes/kubernetes/blob/master/pkg/kubelet/dockershim/naming.go#L50-L52 containerName := "POD" + var status *criapi.ContainerStatus if cntr.Labels["io.cri-containerd.kind"] != "sandbox" { - status, err := client.ContainerStatus(ctx, id) + status, err = client.ContainerStatus(ctx, id) if err != nil { return nil, err } @@ -157,6 +173,65 @@ func newContainerdContainerHandler( // Add the name and bare ID as aliases of the container. handler.image = cntr.Image + if includedMetrics.Has(container.DiskUsageMetrics) && cntr.Labels["io.cri-containerd.kind"] != "sandbox" { + mounts, err := client.SnapshotMounts(ctx, cntr.Snapshotter, cntr.SnapshotKey) + if err != nil { + return nil, fmt.Errorf("failed to obtain containerd snapshot mounts for disk usage metrics: %v", err) + } + + // Default to top directory + snapshotDir := "/var/lib/containerd" + // TODO: only overlay snapshotters is handled as of now. + // Note: overlay returns single mount. https://github.com/containerd/containerd/blob/main/snapshots/overlay/overlay.go + if len(mounts) > 0 && mounts[0].Type == "overlay" { + for _, option := range mounts[0].Options { + // Example: upperdir=/var/lib/containerd/io.containerd.snapshotter.v1.overlayfs/snapshots/5001/fs + if strings.HasPrefix(option, "upperdir=") { + snapshotDir = option[len("upperdir="):] + break + } + } + } + deviceInfo, err := fsInfo.GetDirFsDevice(snapshotDir) + if err != nil { + return nil, err + } + + mi, err := machineInfoFactory.GetMachineInfo() + if err != nil { + return nil, err + } + + var ( + fsLimit uint64 + fsType string + fsTotalInodes uint64 + ) + // Containerd does not impose any filesystem limits for containers. So use capacity as limit. + for _, fs := range mi.Filesystems { + if fs.Device == deviceInfo.Device { + fsLimit = fs.Capacity + fsType = fs.Type + fsTotalInodes = fs.Inodes + break + } + } + + handler.fsLimit = fsLimit + handler.fsType = fsType + handler.fsTotalInodes = fsTotalInodes + handler.device = deviceInfo.Device + + handler.fsHandler = common.NewFsHandler(common.DefaultPeriod, &fsUsageProvider{ + ctx: ctx, + client: client, + containerID: id, + // Path of logs, e.g. /var/log/pods/XXX + logPath: status.LogPath, + fsInfo: fsInfo, + }) + } + for _, exposedEnv := range metadataEnvAllowList { if exposedEnv == "" { // if no containerdEnvWhitelist provided, len(metadataEnvAllowList) == 1, metadataEnvAllowList[0] == "" @@ -212,6 +287,27 @@ func (h *containerdContainerHandler) getFsStats(stats *info.ContainerStats) erro if h.includedMetrics.Has(container.DiskIOMetrics) { common.AssignDeviceNamesToDiskStats((*common.MachineInfoNamer)(mi), &stats.DiskIo) } + + if !h.includedMetrics.Has(container.DiskUsageMetrics) || h.labels["io.cri-containerd.kind"] == "sandbox" { + return nil + } + + fsStat := info.FsStats{} + usage := h.fsHandler.Usage() + fsStat.BaseUsage = usage.BaseUsageBytes + fsStat.Usage = usage.TotalUsageBytes + // By definition, "Inodes" is supposed to be the total number of inodes of a filesystem. + // Set to the number of used inodes to be back-compatible with docker + fsStat.Inodes = usage.InodeUsage + // This is not accurate because it ignores inodes used by everything else. + fsStat.InodesFree = h.fsTotalInodes - usage.InodeUsage + + fsStat.Device = h.device + fsStat.Limit = h.fsLimit + fsStat.Type = h.fsType + + stats.Filesystem = append(stats.Filesystem, fsStat) + return nil } @@ -262,12 +358,42 @@ func (h *containerdContainerHandler) Type() container.ContainerType { } func (h *containerdContainerHandler) Start() { + if h.fsHandler != nil { + h.fsHandler.Start() + } } func (h *containerdContainerHandler) Cleanup() { + if h.fsHandler != nil { + h.fsHandler.Stop() + } } func (h *containerdContainerHandler) GetContainerIPAddress() string { // containerd doesnt take care of networking.So it doesnt maintain networking states return "" } + +func (f *fsUsageProvider) Usage() (*common.FsUsage, error) { + stats, err := f.client.ContainerStats(f.ctx, f.containerID) + if err != nil { + return nil, err + } + var logUsedBytes uint64 + if f.logPath != "" { + logUsage, err := f.fsInfo.GetDirUsage(f.logPath) + if err != nil { + return nil, err + } + logUsedBytes = logUsage.Bytes + } + return &common.FsUsage{ + BaseUsageBytes: stats.WritableLayer.UsedBytes.Value, + TotalUsageBytes: stats.WritableLayer.UsedBytes.Value + logUsedBytes, + InodeUsage: stats.WritableLayer.InodesUsed.Value, + }, nil +} + +func (f *fsUsageProvider) Targets() []string { + return []string{f.containerID} +} diff --git a/container/containerd/handler_test.go b/container/containerd/handler_test.go index dcf2a23519..f9a92c61b3 100644 --- a/container/containerd/handler_test.go +++ b/container/containerd/handler_test.go @@ -88,7 +88,7 @@ func TestHandler(t *testing.T) { for _, ts := range []testCase{ { - mockcontainerdClient(nil, nil, nil), + mockcontainerdClient(nil, nil, nil, nil), "/kubepods/pod068e8fa0-9213-11e7-a01f-507b9d4141fa/40af7cdcbe507acad47a5a62025743ad3ddc6ab93b77b21363aa1c1d641047c9", nil, nil, @@ -102,7 +102,7 @@ func TestHandler(t *testing.T) { nil, }, { - mockcontainerdClient(testContainers, nil, nil), + mockcontainerdClient(testContainers, nil, nil, nil), "/kubepods/pod068e8fa0-9213-11e7-a01f-507b9d4141fa/40af7cdcbe507acad47a5a62025743ad3ddc6ab93b77b21363aa1c1d641047c9", &mockedMachineInfo{}, nil, @@ -121,7 +121,7 @@ func TestHandler(t *testing.T) { map[string]string{}, }, { - mockcontainerdClient(testContainers, nil, nil), + mockcontainerdClient(testContainers, nil, nil, nil), "/kubepods/pod068e8fa0-9213-11e7-a01f-507b9d4141fa/40af7cdcbe507acad47a5a62025743ad3ddc6ab93b77b21363aa1c1d641047c9", &mockedMachineInfo{}, nil, @@ -140,7 +140,7 @@ func TestHandler(t *testing.T) { map[string]string{"TEST_REGION": "FRA", "TEST_ZONE": "A"}, }, { - mockcontainerdClient(testContainers, status, nil), + mockcontainerdClient(testContainers, status, nil, nil), "/kubepods/pod068e8fa0-9213-11e7-a01f-507b9d4141fa/c6a1aa99f14d3e57417e145b897e34961145f6b6f14216a176a34bfabbf79086", &mockedMachineInfo{}, nil, diff --git a/container/crio/handler.go b/container/crio/handler.go index 3a4ef0c85a..8bea12fe88 100644 --- a/container/crio/handler.go +++ b/container/crio/handler.go @@ -183,7 +183,8 @@ func newCrioContainerHandler( // we optionally collect disk usage metrics if includedMetrics.Has(container.DiskUsageMetrics) { - handler.fsHandler = common.NewFsHandler(common.DefaultPeriod, rootfsStorageDir, storageLogDir, fsInfo) + handler.fsHandler = common.NewFsHandler(common.DefaultPeriod, common.NewGeneralFsUsageProvider( + fsInfo, rootfsStorageDir, storageLogDir)) } // TODO for env vars we wanted to show from container.Config.Env from whitelist //for _, exposedEnv := range metadataEnvAllowList { diff --git a/container/docker/handler.go b/container/docker/handler.go index 8528cbcb07..d0b9f4e2fc 100644 --- a/container/docker/handler.go +++ b/container/docker/handler.go @@ -240,7 +240,8 @@ func newDockerContainerHandler( if includedMetrics.Has(container.DiskUsageMetrics) { handler.fsHandler = &dockerFsHandler{ - fsHandler: common.NewFsHandler(common.DefaultPeriod, rootfsStorageDir, otherStorageDir, fsInfo), + fsHandler: common.NewFsHandler(common.DefaultPeriod, common.NewGeneralFsUsageProvider( + fsInfo, rootfsStorageDir, otherStorageDir)), thinPoolWatcher: thinPoolWatcher, zfsWatcher: zfsWatcher, deviceID: ctnr.GraphDriver.Data["DeviceId"],