From ae3b9c0e830a4449e49d2ea7f03c6da4191a08f7 Mon Sep 17 00:00:00 2001 From: shaowei <9923595+waynepeking348@users.noreply.github.com> Date: Mon, 17 Jul 2023 20:14:53 +0800 Subject: [PATCH] move malachite client from global varibales to malachite fetcher (#152) --- pkg/controller/lifecycle/cnr_test.go | 2 +- .../agent/metric/malachite/cgroup/cgroup.go | 8 ++-- .../metric/malachite/cgroup/cgroup_test.go | 9 +++-- .../agent/metric/malachite/client/client.go | 3 +- .../agent/metric/malachite/system/system.go | 16 ++++---- .../metric/malachite/system/system_test.go | 39 +++++++++++-------- pkg/metaserver/agent/metric/metric.go | 26 +++++++------ .../external/cgroupid/manager_linux_test.go | 2 +- 8 files changed, 56 insertions(+), 49 deletions(-) diff --git a/pkg/controller/lifecycle/cnr_test.go b/pkg/controller/lifecycle/cnr_test.go index 3e872aa712..e2a631c708 100644 --- a/pkg/controller/lifecycle/cnr_test.go +++ b/pkg/controller/lifecycle/cnr_test.go @@ -142,7 +142,7 @@ func TestCNRLifecycle_Run(t *testing.T) { go cl.Run() cache.WaitForCacheSync(cl.ctx.Done(), cl.nodeListerSynced, cl.cnrListerSynced) - time.Sleep(10 * time.Millisecond) + time.Sleep(100 * time.Millisecond) gotCNR, err := cl.cnrLister.Get(tt.fields.node.Name) assert.NoError(t, err) diff --git a/pkg/metaserver/agent/metric/malachite/cgroup/cgroup.go b/pkg/metaserver/agent/metric/malachite/cgroup/cgroup.go index a229c9556f..e64e741b0c 100644 --- a/pkg/metaserver/agent/metric/malachite/cgroup/cgroup.go +++ b/pkg/metaserver/agent/metric/malachite/cgroup/cgroup.go @@ -25,8 +25,8 @@ import ( "github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric/malachite/client" ) -func GetCgroupStats(cgroupPath string) (*MalachiteCgroupInfo, error) { - cgroupStatsRaw, err := client.DefaultClient.GetCgroupStats(cgroupPath) +func GetCgroupStats(c client.MalachiteClient, cgroupPath string) (*MalachiteCgroupInfo, error) { + cgroupStatsRaw, err := c.GetCgroupStats(cgroupPath) if err != nil { return nil, err } @@ -81,7 +81,7 @@ func GetCgroupStats(cgroupPath string) (*MalachiteCgroupInfo, error) { return cgroupInfo, nil } -func GetAllPodsContainersStats() (map[string]map[string]*MalachiteCgroupInfo, error) { +func GetAllPodsContainersStats(c client.MalachiteClient) (map[string]map[string]*MalachiteCgroupInfo, error) { podsContainersCgroupsPath, err := GetAllPodsContainersCgroupsPath() if err != nil { return nil, fmt.Errorf("failed to GetAllPodsContainersCgroupsPath, err %v", err) @@ -91,7 +91,7 @@ func GetAllPodsContainersStats() (map[string]map[string]*MalachiteCgroupInfo, er for podUid, containersCgroupsPath := range podsContainersCgroupsPath { containersStats := make(map[string]*MalachiteCgroupInfo) for containerName, cgroupsPath := range containersCgroupsPath { - cgStats, err := GetCgroupStats(cgroupsPath) + cgStats, err := GetCgroupStats(c, cgroupsPath) if err != nil { klog.V(4).ErrorS(fmt.Errorf("failed to GetCgroupStats for cgroup %s in pod %s, err %s", cgroupsPath, podUid, err), "") continue diff --git a/pkg/metaserver/agent/metric/malachite/cgroup/cgroup_test.go b/pkg/metaserver/agent/metric/malachite/cgroup/cgroup_test.go index 2b46ed7b49..446e603b23 100644 --- a/pkg/metaserver/agent/metric/malachite/cgroup/cgroup_test.go +++ b/pkg/metaserver/agent/metric/malachite/cgroup/cgroup_test.go @@ -135,21 +135,22 @@ func TestGetCgroupStats(t *testing.T) { })) defer server.Close() - client.DefaultClient.(*client.Client).SetURL(map[string]string{ + malachiteClient := client.New() + malachiteClient.(*client.Client).SetURL(map[string]string{ client.CgroupResource: server.URL, }) - info, err := GetCgroupStats("v1-path") + info, err := GetCgroupStats(malachiteClient, "v1-path") assert.NoError(t, err) assert.NotNil(t, info.V1) assert.Nil(t, info.V2) - info, err = GetCgroupStats("v2-path") + info, err = GetCgroupStats(malachiteClient, "v2-path") assert.NoError(t, err) assert.NotNil(t, info.V2) assert.Nil(t, info.V1) - _, err = GetCgroupStats("no-exist-path") + _, err = GetCgroupStats(malachiteClient, "no-exist-path") assert.NotNil(t, err) assert.NotNil(t, info.V2) assert.Nil(t, info.V1) diff --git a/pkg/metaserver/agent/metric/malachite/client/client.go b/pkg/metaserver/agent/metric/malachite/client/client.go index 62b5e337ea..5d8634ed36 100644 --- a/pkg/metaserver/agent/metric/malachite/client/client.go +++ b/pkg/metaserver/agent/metric/malachite/client/client.go @@ -44,8 +44,6 @@ const ( Net ) -var DefaultClient = New() - type Client struct { sync.RWMutex urls map[string]string @@ -70,6 +68,7 @@ func New() MalachiteClient { func (c *Client) SetURL(urls map[string]string) { c.Lock() defer c.Unlock() + c.urls = urls } diff --git a/pkg/metaserver/agent/metric/malachite/system/system.go b/pkg/metaserver/agent/metric/malachite/system/system.go index b81a5d91ec..2dea1ea347 100644 --- a/pkg/metaserver/agent/metric/malachite/system/system.go +++ b/pkg/metaserver/agent/metric/malachite/system/system.go @@ -23,8 +23,8 @@ import ( "github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric/malachite/client" ) -func GetSystemComputeStats() (*SystemComputeData, error) { - statsData, err := client.DefaultClient.GetSystemStats(client.Compute) +func GetSystemComputeStats(c client.MalachiteClient) (*SystemComputeData, error) { + statsData, err := c.GetSystemStats(client.Compute) if err != nil { return nil, err } @@ -41,8 +41,8 @@ func GetSystemComputeStats() (*SystemComputeData, error) { return &rsp.Data, nil } -func GetSystemMemoryStats() (*SystemMemoryData, error) { - statsData, err := client.DefaultClient.GetSystemStats(client.Memory) +func GetSystemMemoryStats(c client.MalachiteClient) (*SystemMemoryData, error) { + statsData, err := c.GetSystemStats(client.Memory) if err != nil { return nil, err } @@ -59,8 +59,8 @@ func GetSystemMemoryStats() (*SystemMemoryData, error) { return &rsp.Data, nil } -func GetSystemIOStats() (*SystemDiskIoData, error) { - statsData, err := client.DefaultClient.GetSystemStats(client.IO) +func GetSystemIOStats(c client.MalachiteClient) (*SystemDiskIoData, error) { + statsData, err := c.GetSystemStats(client.IO) if err != nil { return nil, err } @@ -77,8 +77,8 @@ func GetSystemIOStats() (*SystemDiskIoData, error) { return &rsp.Data, nil } -func GetSystemNetStats() (*SystemNetworkData, error) { - statsData, err := client.DefaultClient.GetSystemStats(client.Net) +func GetSystemNetStats(c client.MalachiteClient) (*SystemNetworkData, error) { + statsData, err := c.GetSystemStats(client.Net) if err != nil { return nil, err } diff --git a/pkg/metaserver/agent/metric/malachite/system/system_test.go b/pkg/metaserver/agent/metric/malachite/system/system_test.go index a9cbd91bf2..e7e28da341 100644 --- a/pkg/metaserver/agent/metric/malachite/system/system_test.go +++ b/pkg/metaserver/agent/metric/malachite/system/system_test.go @@ -89,16 +89,17 @@ func TestGetSystemComputeStats(t *testing.T) { server := getSystemTestServer(data) defer server.Close() - client.DefaultClient.(*client.Client).SetURL(map[string]string{ + malachiteClient := client.New() + malachiteClient.(*client.Client).SetURL(map[string]string{ client.SystemComputeResource: server.URL, }) - _, err := GetSystemComputeStats() + _, err := GetSystemComputeStats(malachiteClient) assert.NoError(t, err) - client.DefaultClient.(*client.Client).SetURL(map[string]string{ + malachiteClient.(*client.Client).SetURL(map[string]string{ client.SystemComputeResource: "none", }) - _, err = GetSystemComputeStats() + _, err = GetSystemComputeStats(malachiteClient) assert.NotNil(t, err) } @@ -109,16 +110,17 @@ func TestGetSystemMemoryStats(t *testing.T) { server := getSystemTestServer(data) defer server.Close() - client.DefaultClient.(*client.Client).SetURL(map[string]string{ + malachiteClient := client.New() + malachiteClient.(*client.Client).SetURL(map[string]string{ client.SystemMemoryResource: server.URL, }) - _, err := GetSystemMemoryStats() + _, err := GetSystemMemoryStats(malachiteClient) assert.NoError(t, err) - client.DefaultClient.(*client.Client).SetURL(map[string]string{ + malachiteClient.(*client.Client).SetURL(map[string]string{ client.SystemMemoryResource: "none", }) - _, err = GetSystemComputeStats() + _, err = GetSystemComputeStats(malachiteClient) assert.NotNil(t, err) } @@ -129,16 +131,17 @@ func TestGetSystemIOStats(t *testing.T) { server := getSystemTestServer(data) defer server.Close() - client.DefaultClient.(*client.Client).SetURL(map[string]string{ + malachiteClient := client.New() + malachiteClient.(*client.Client).SetURL(map[string]string{ client.SystemIOResource: server.URL, }) - _, err := GetSystemIOStats() + _, err := GetSystemIOStats(malachiteClient) assert.NoError(t, err) - client.DefaultClient.(*client.Client).SetURL(map[string]string{ + malachiteClient.(*client.Client).SetURL(map[string]string{ client.SystemIOResource: "none", }) - _, err = GetSystemComputeStats() + _, err = GetSystemComputeStats(malachiteClient) assert.NotNil(t, err) } @@ -149,16 +152,17 @@ func TestGetSystemNetStats(t *testing.T) { server := getSystemTestServer(data) defer server.Close() - client.DefaultClient.(*client.Client).SetURL(map[string]string{ + malachiteClient := client.New() + malachiteClient.(*client.Client).SetURL(map[string]string{ client.SystemNetResource: server.URL, }) - _, err := GetSystemNetStats() + _, err := GetSystemNetStats(malachiteClient) assert.NoError(t, err) - client.DefaultClient.(*client.Client).SetURL(map[string]string{ + malachiteClient.(*client.Client).SetURL(map[string]string{ client.SystemNetResource: "none", }) - _, err = GetSystemComputeStats() + _, err = GetSystemComputeStats(malachiteClient) assert.NotNil(t, err) } @@ -168,6 +172,7 @@ func TestGetSystemNonExistStats(t *testing.T) { server := getSystemTestServer([]byte{}) defer server.Close() - _, err := client.DefaultClient.GetSystemStats(100) + malachiteClient := client.New() + _, err := malachiteClient.GetSystemStats(100) assert.ErrorContains(t, err, "unknown") } diff --git a/pkg/metaserver/agent/metric/metric.go b/pkg/metaserver/agent/metric/metric.go index fba3dd932a..1b0c4dafc0 100644 --- a/pkg/metaserver/agent/metric/metric.go +++ b/pkg/metaserver/agent/metric/metric.go @@ -128,9 +128,10 @@ type MetricsFetcher interface { // NewMalachiteMetricsFetcher returns the default implementation of MetricsFetcher. func NewMalachiteMetricsFetcher(emitter metrics.MetricEmitter, conf *config.Configuration) MetricsFetcher { return &MalachiteMetricsFetcher{ - metricStore: metric.NewMetricStore(), - emitter: emitter, - conf: conf, + malachiteClient: client.New(), + metricStore: metric.NewMetricStore(), + emitter: emitter, + conf: conf, registeredNotifier: map[MetricsScope]map[string]NotifiedData{ MetricsScopeNode: make(map[string]NotifiedData), MetricsScopeNuma: make(map[string]NotifiedData), @@ -142,13 +143,14 @@ func NewMalachiteMetricsFetcher(emitter metrics.MetricEmitter, conf *config.Conf } type MalachiteMetricsFetcher struct { - metricStore *metric.MetricStore - conf *config.Configuration + metricStore *metric.MetricStore + malachiteClient client.MalachiteClient + conf *config.Configuration + sync.RWMutex registeredMetric []func(store *metric.MetricStore) registeredNotifier map[MetricsScope]map[string]NotifiedData - sync.RWMutex startOnce sync.Once emitter metrics.MetricEmitter } @@ -260,7 +262,7 @@ func (m *MalachiteMetricsFetcher) sample() { // checkMalachiteHealthy is to check whether malachite is healthy func (m *MalachiteMetricsFetcher) checkMalachiteHealthy() bool { - _, err := client.DefaultClient.GetSystemStats(client.Compute) + _, err := m.malachiteClient.GetSystemStats(client.Compute) if err != nil { klog.Errorf("[malachite] malachite is unhealthy: %v", err) _ = m.emitter.StoreInt64(metricsNamMalachiteUnHealthy, 1, metrics.MetricTypeNameRaw) @@ -272,7 +274,7 @@ func (m *MalachiteMetricsFetcher) checkMalachiteHealthy() bool { // Get raw system stats by malachite sdk and set to metricStore func (m *MalachiteMetricsFetcher) updateSystemStats() { - systemComputeData, err := system.GetSystemComputeStats() + systemComputeData, err := system.GetSystemComputeStats(m.malachiteClient) if err != nil { klog.Errorf("[malachite] get system compute stats failed, err %v", err) _ = m.emitter.StoreInt64(metricsNameMalachiteGetSystemStatusFailed, 1, metrics.MetricTypeNameCount, @@ -282,7 +284,7 @@ func (m *MalachiteMetricsFetcher) updateSystemStats() { m.processSystemCPUComputeData(systemComputeData) } - systemMemoryData, err := system.GetSystemMemoryStats() + systemMemoryData, err := system.GetSystemMemoryStats(m.malachiteClient) if err != nil { klog.Errorf("[malachite] get system memory stats failed, err %v", err) _ = m.emitter.StoreInt64(metricsNameMalachiteGetSystemStatusFailed, 1, metrics.MetricTypeNameCount, @@ -292,7 +294,7 @@ func (m *MalachiteMetricsFetcher) updateSystemStats() { m.processSystemNumaData(systemMemoryData) } - systemIOData, err := system.GetSystemIOStats() + systemIOData, err := system.GetSystemIOStats(m.malachiteClient) if err != nil { klog.Errorf("[malachite] get system io stats failed, err %v", err) _ = m.emitter.StoreInt64(metricsNameMalachiteGetSystemStatusFailed, 1, metrics.MetricTypeNameCount, @@ -305,7 +307,7 @@ func (m *MalachiteMetricsFetcher) updateSystemStats() { func (m *MalachiteMetricsFetcher) updateCgroupData() { cgroupPaths := []string{m.conf.ReclaimRelativeRootCgroupPath, common.CgroupFsRootPathBurstable, common.CgroupFsRootPathBestEffort} for _, path := range cgroupPaths { - stats, err := cgroup.GetCgroupStats(path) + stats, err := cgroup.GetCgroupStats(m.malachiteClient, path) if err != nil { general.Errorf("GetCgroupStats %v err %v", path, err) continue @@ -320,7 +322,7 @@ func (m *MalachiteMetricsFetcher) updateCgroupData() { // Get raw cgroup data by malachite sdk and set container metrics to metricStore, GC not existed pod metrics func (m *MalachiteMetricsFetcher) updatePodsCgroupData() { - podsContainersStats, err := cgroup.GetAllPodsContainersStats() + podsContainersStats, err := cgroup.GetAllPodsContainersStats(m.malachiteClient) if err != nil { klog.Errorf("[malachite] GetAllPodsContainersStats failed, error %v", err) _ = m.emitter.StoreInt64(metricsNameMalachiteGetPodStatusFailed, 1, metrics.MetricTypeNameCount) diff --git a/pkg/metaserver/external/cgroupid/manager_linux_test.go b/pkg/metaserver/external/cgroupid/manager_linux_test.go index 6ac17ac1ed..a46d292d76 100644 --- a/pkg/metaserver/external/cgroupid/manager_linux_test.go +++ b/pkg/metaserver/external/cgroupid/manager_linux_test.go @@ -197,7 +197,6 @@ func TestClearResidualPodsInCache(t *testing.T) { for wantPodUID, wantContainerMap := range tt.want { cgroupIDManager.Lock() gotContainerMap, ok := cgroupIDManager.podCgroupIDCache[wantPodUID] - cgroupIDManager.Unlock() assert.True(t, ok) assert.Equal(t, len(wantContainerMap), len(gotContainerMap)) @@ -206,6 +205,7 @@ func TestClearResidualPodsInCache(t *testing.T) { assert.True(t, ok) assert.Equal(t, wantCgID, gotCgID) } + cgroupIDManager.Unlock() } }) }