Skip to content

Commit

Permalink
move malachite client from global varibales to malachite fetcher (kub…
Browse files Browse the repository at this point in the history
  • Loading branch information
waynepeking348 authored and luomingmeng committed Oct 11, 2024
1 parent 91cabf1 commit ae3b9c0
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 49 deletions.
2 changes: 1 addition & 1 deletion pkg/controller/lifecycle/cnr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func TestCNRLifecycle_Run(t *testing.T) {
go cl.Run()

cache.WaitForCacheSync(cl.ctx.Done(), cl.nodeListerSynced, cl.cnrListerSynced)
time.Sleep(10 * time.Millisecond)
time.Sleep(100 * time.Millisecond)

gotCNR, err := cl.cnrLister.Get(tt.fields.node.Name)
assert.NoError(t, err)
Expand Down
8 changes: 4 additions & 4 deletions pkg/metaserver/agent/metric/malachite/cgroup/cgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import (
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric/malachite/client"
)

func GetCgroupStats(cgroupPath string) (*MalachiteCgroupInfo, error) {
cgroupStatsRaw, err := client.DefaultClient.GetCgroupStats(cgroupPath)
func GetCgroupStats(c client.MalachiteClient, cgroupPath string) (*MalachiteCgroupInfo, error) {
cgroupStatsRaw, err := c.GetCgroupStats(cgroupPath)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -81,7 +81,7 @@ func GetCgroupStats(cgroupPath string) (*MalachiteCgroupInfo, error) {
return cgroupInfo, nil
}

func GetAllPodsContainersStats() (map[string]map[string]*MalachiteCgroupInfo, error) {
func GetAllPodsContainersStats(c client.MalachiteClient) (map[string]map[string]*MalachiteCgroupInfo, error) {
podsContainersCgroupsPath, err := GetAllPodsContainersCgroupsPath()
if err != nil {
return nil, fmt.Errorf("failed to GetAllPodsContainersCgroupsPath, err %v", err)
Expand All @@ -91,7 +91,7 @@ func GetAllPodsContainersStats() (map[string]map[string]*MalachiteCgroupInfo, er
for podUid, containersCgroupsPath := range podsContainersCgroupsPath {
containersStats := make(map[string]*MalachiteCgroupInfo)
for containerName, cgroupsPath := range containersCgroupsPath {
cgStats, err := GetCgroupStats(cgroupsPath)
cgStats, err := GetCgroupStats(c, cgroupsPath)
if err != nil {
klog.V(4).ErrorS(fmt.Errorf("failed to GetCgroupStats for cgroup %s in pod %s, err %s", cgroupsPath, podUid, err), "")
continue
Expand Down
9 changes: 5 additions & 4 deletions pkg/metaserver/agent/metric/malachite/cgroup/cgroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,21 +135,22 @@ func TestGetCgroupStats(t *testing.T) {
}))
defer server.Close()

client.DefaultClient.(*client.Client).SetURL(map[string]string{
malachiteClient := client.New()
malachiteClient.(*client.Client).SetURL(map[string]string{
client.CgroupResource: server.URL,
})

info, err := GetCgroupStats("v1-path")
info, err := GetCgroupStats(malachiteClient, "v1-path")
assert.NoError(t, err)
assert.NotNil(t, info.V1)
assert.Nil(t, info.V2)

info, err = GetCgroupStats("v2-path")
info, err = GetCgroupStats(malachiteClient, "v2-path")
assert.NoError(t, err)
assert.NotNil(t, info.V2)
assert.Nil(t, info.V1)

_, err = GetCgroupStats("no-exist-path")
_, err = GetCgroupStats(malachiteClient, "no-exist-path")
assert.NotNil(t, err)
assert.NotNil(t, info.V2)
assert.Nil(t, info.V1)
Expand Down
3 changes: 1 addition & 2 deletions pkg/metaserver/agent/metric/malachite/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ const (
Net
)

var DefaultClient = New()

type Client struct {
sync.RWMutex
urls map[string]string
Expand All @@ -70,6 +68,7 @@ func New() MalachiteClient {
func (c *Client) SetURL(urls map[string]string) {
c.Lock()
defer c.Unlock()

c.urls = urls
}

Expand Down
16 changes: 8 additions & 8 deletions pkg/metaserver/agent/metric/malachite/system/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric/malachite/client"
)

func GetSystemComputeStats() (*SystemComputeData, error) {
statsData, err := client.DefaultClient.GetSystemStats(client.Compute)
func GetSystemComputeStats(c client.MalachiteClient) (*SystemComputeData, error) {
statsData, err := c.GetSystemStats(client.Compute)
if err != nil {
return nil, err
}
Expand All @@ -41,8 +41,8 @@ func GetSystemComputeStats() (*SystemComputeData, error) {
return &rsp.Data, nil
}

func GetSystemMemoryStats() (*SystemMemoryData, error) {
statsData, err := client.DefaultClient.GetSystemStats(client.Memory)
func GetSystemMemoryStats(c client.MalachiteClient) (*SystemMemoryData, error) {
statsData, err := c.GetSystemStats(client.Memory)
if err != nil {
return nil, err
}
Expand All @@ -59,8 +59,8 @@ func GetSystemMemoryStats() (*SystemMemoryData, error) {
return &rsp.Data, nil
}

func GetSystemIOStats() (*SystemDiskIoData, error) {
statsData, err := client.DefaultClient.GetSystemStats(client.IO)
func GetSystemIOStats(c client.MalachiteClient) (*SystemDiskIoData, error) {
statsData, err := c.GetSystemStats(client.IO)
if err != nil {
return nil, err
}
Expand All @@ -77,8 +77,8 @@ func GetSystemIOStats() (*SystemDiskIoData, error) {
return &rsp.Data, nil
}

func GetSystemNetStats() (*SystemNetworkData, error) {
statsData, err := client.DefaultClient.GetSystemStats(client.Net)
func GetSystemNetStats(c client.MalachiteClient) (*SystemNetworkData, error) {
statsData, err := c.GetSystemStats(client.Net)
if err != nil {
return nil, err
}
Expand Down
39 changes: 22 additions & 17 deletions pkg/metaserver/agent/metric/malachite/system/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,16 +89,17 @@ func TestGetSystemComputeStats(t *testing.T) {
server := getSystemTestServer(data)
defer server.Close()

client.DefaultClient.(*client.Client).SetURL(map[string]string{
malachiteClient := client.New()
malachiteClient.(*client.Client).SetURL(map[string]string{
client.SystemComputeResource: server.URL,
})
_, err := GetSystemComputeStats()
_, err := GetSystemComputeStats(malachiteClient)
assert.NoError(t, err)

client.DefaultClient.(*client.Client).SetURL(map[string]string{
malachiteClient.(*client.Client).SetURL(map[string]string{
client.SystemComputeResource: "none",
})
_, err = GetSystemComputeStats()
_, err = GetSystemComputeStats(malachiteClient)
assert.NotNil(t, err)
}

Expand All @@ -109,16 +110,17 @@ func TestGetSystemMemoryStats(t *testing.T) {
server := getSystemTestServer(data)
defer server.Close()

client.DefaultClient.(*client.Client).SetURL(map[string]string{
malachiteClient := client.New()
malachiteClient.(*client.Client).SetURL(map[string]string{
client.SystemMemoryResource: server.URL,
})
_, err := GetSystemMemoryStats()
_, err := GetSystemMemoryStats(malachiteClient)
assert.NoError(t, err)

client.DefaultClient.(*client.Client).SetURL(map[string]string{
malachiteClient.(*client.Client).SetURL(map[string]string{
client.SystemMemoryResource: "none",
})
_, err = GetSystemComputeStats()
_, err = GetSystemComputeStats(malachiteClient)
assert.NotNil(t, err)
}

Expand All @@ -129,16 +131,17 @@ func TestGetSystemIOStats(t *testing.T) {
server := getSystemTestServer(data)
defer server.Close()

client.DefaultClient.(*client.Client).SetURL(map[string]string{
malachiteClient := client.New()
malachiteClient.(*client.Client).SetURL(map[string]string{
client.SystemIOResource: server.URL,
})
_, err := GetSystemIOStats()
_, err := GetSystemIOStats(malachiteClient)
assert.NoError(t, err)

client.DefaultClient.(*client.Client).SetURL(map[string]string{
malachiteClient.(*client.Client).SetURL(map[string]string{
client.SystemIOResource: "none",
})
_, err = GetSystemComputeStats()
_, err = GetSystemComputeStats(malachiteClient)
assert.NotNil(t, err)
}

Expand All @@ -149,16 +152,17 @@ func TestGetSystemNetStats(t *testing.T) {
server := getSystemTestServer(data)
defer server.Close()

client.DefaultClient.(*client.Client).SetURL(map[string]string{
malachiteClient := client.New()
malachiteClient.(*client.Client).SetURL(map[string]string{
client.SystemNetResource: server.URL,
})
_, err := GetSystemNetStats()
_, err := GetSystemNetStats(malachiteClient)
assert.NoError(t, err)

client.DefaultClient.(*client.Client).SetURL(map[string]string{
malachiteClient.(*client.Client).SetURL(map[string]string{
client.SystemNetResource: "none",
})
_, err = GetSystemComputeStats()
_, err = GetSystemComputeStats(malachiteClient)
assert.NotNil(t, err)
}

Expand All @@ -168,6 +172,7 @@ func TestGetSystemNonExistStats(t *testing.T) {
server := getSystemTestServer([]byte{})
defer server.Close()

_, err := client.DefaultClient.GetSystemStats(100)
malachiteClient := client.New()
_, err := malachiteClient.GetSystemStats(100)
assert.ErrorContains(t, err, "unknown")
}
26 changes: 14 additions & 12 deletions pkg/metaserver/agent/metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,10 @@ type MetricsFetcher interface {
// NewMalachiteMetricsFetcher returns the default implementation of MetricsFetcher.
func NewMalachiteMetricsFetcher(emitter metrics.MetricEmitter, conf *config.Configuration) MetricsFetcher {
return &MalachiteMetricsFetcher{
metricStore: metric.NewMetricStore(),
emitter: emitter,
conf: conf,
malachiteClient: client.New(),
metricStore: metric.NewMetricStore(),
emitter: emitter,
conf: conf,
registeredNotifier: map[MetricsScope]map[string]NotifiedData{
MetricsScopeNode: make(map[string]NotifiedData),
MetricsScopeNuma: make(map[string]NotifiedData),
Expand All @@ -142,13 +143,14 @@ func NewMalachiteMetricsFetcher(emitter metrics.MetricEmitter, conf *config.Conf
}

type MalachiteMetricsFetcher struct {
metricStore *metric.MetricStore
conf *config.Configuration
metricStore *metric.MetricStore
malachiteClient client.MalachiteClient
conf *config.Configuration

sync.RWMutex
registeredMetric []func(store *metric.MetricStore)
registeredNotifier map[MetricsScope]map[string]NotifiedData

sync.RWMutex
startOnce sync.Once
emitter metrics.MetricEmitter
}
Expand Down Expand Up @@ -260,7 +262,7 @@ func (m *MalachiteMetricsFetcher) sample() {

// checkMalachiteHealthy is to check whether malachite is healthy
func (m *MalachiteMetricsFetcher) checkMalachiteHealthy() bool {
_, err := client.DefaultClient.GetSystemStats(client.Compute)
_, err := m.malachiteClient.GetSystemStats(client.Compute)
if err != nil {
klog.Errorf("[malachite] malachite is unhealthy: %v", err)
_ = m.emitter.StoreInt64(metricsNamMalachiteUnHealthy, 1, metrics.MetricTypeNameRaw)
Expand All @@ -272,7 +274,7 @@ func (m *MalachiteMetricsFetcher) checkMalachiteHealthy() bool {

// Get raw system stats by malachite sdk and set to metricStore
func (m *MalachiteMetricsFetcher) updateSystemStats() {
systemComputeData, err := system.GetSystemComputeStats()
systemComputeData, err := system.GetSystemComputeStats(m.malachiteClient)
if err != nil {
klog.Errorf("[malachite] get system compute stats failed, err %v", err)
_ = m.emitter.StoreInt64(metricsNameMalachiteGetSystemStatusFailed, 1, metrics.MetricTypeNameCount,
Expand All @@ -282,7 +284,7 @@ func (m *MalachiteMetricsFetcher) updateSystemStats() {
m.processSystemCPUComputeData(systemComputeData)
}

systemMemoryData, err := system.GetSystemMemoryStats()
systemMemoryData, err := system.GetSystemMemoryStats(m.malachiteClient)
if err != nil {
klog.Errorf("[malachite] get system memory stats failed, err %v", err)
_ = m.emitter.StoreInt64(metricsNameMalachiteGetSystemStatusFailed, 1, metrics.MetricTypeNameCount,
Expand All @@ -292,7 +294,7 @@ func (m *MalachiteMetricsFetcher) updateSystemStats() {
m.processSystemNumaData(systemMemoryData)
}

systemIOData, err := system.GetSystemIOStats()
systemIOData, err := system.GetSystemIOStats(m.malachiteClient)
if err != nil {
klog.Errorf("[malachite] get system io stats failed, err %v", err)
_ = m.emitter.StoreInt64(metricsNameMalachiteGetSystemStatusFailed, 1, metrics.MetricTypeNameCount,
Expand All @@ -305,7 +307,7 @@ func (m *MalachiteMetricsFetcher) updateSystemStats() {
func (m *MalachiteMetricsFetcher) updateCgroupData() {
cgroupPaths := []string{m.conf.ReclaimRelativeRootCgroupPath, common.CgroupFsRootPathBurstable, common.CgroupFsRootPathBestEffort}
for _, path := range cgroupPaths {
stats, err := cgroup.GetCgroupStats(path)
stats, err := cgroup.GetCgroupStats(m.malachiteClient, path)
if err != nil {
general.Errorf("GetCgroupStats %v err %v", path, err)
continue
Expand All @@ -320,7 +322,7 @@ func (m *MalachiteMetricsFetcher) updateCgroupData() {

// Get raw cgroup data by malachite sdk and set container metrics to metricStore, GC not existed pod metrics
func (m *MalachiteMetricsFetcher) updatePodsCgroupData() {
podsContainersStats, err := cgroup.GetAllPodsContainersStats()
podsContainersStats, err := cgroup.GetAllPodsContainersStats(m.malachiteClient)
if err != nil {
klog.Errorf("[malachite] GetAllPodsContainersStats failed, error %v", err)
_ = m.emitter.StoreInt64(metricsNameMalachiteGetPodStatusFailed, 1, metrics.MetricTypeNameCount)
Expand Down
2 changes: 1 addition & 1 deletion pkg/metaserver/external/cgroupid/manager_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ func TestClearResidualPodsInCache(t *testing.T) {
for wantPodUID, wantContainerMap := range tt.want {
cgroupIDManager.Lock()
gotContainerMap, ok := cgroupIDManager.podCgroupIDCache[wantPodUID]
cgroupIDManager.Unlock()

assert.True(t, ok)
assert.Equal(t, len(wantContainerMap), len(gotContainerMap))
Expand All @@ -206,6 +205,7 @@ func TestClearResidualPodsInCache(t *testing.T) {
assert.True(t, ok)
assert.Equal(t, wantCgID, gotCgID)
}
cgroupIDManager.Unlock()
}
})
}
Expand Down

0 comments on commit ae3b9c0

Please sign in to comment.