Skip to content

Commit

Permalink
enhancement(kcmas): lock optimize
Browse files Browse the repository at this point in the history
  • Loading branch information
zzzzhhb authored and waynepeking348 committed Nov 6, 2023
1 parent 0c0b2ed commit 5a79950
Show file tree
Hide file tree
Showing 6 changed files with 239 additions and 80 deletions.
6 changes: 4 additions & 2 deletions pkg/config/metric/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import (
)

type StoreConfiguration struct {
StoreName string
GCPeriod time.Duration
StoreName string
GCPeriod time.Duration
PurgePeriod time.Duration

StoreServerShardCount int
StoreServerReplicaTotal int
Expand All @@ -35,6 +36,7 @@ type StoreConfiguration struct {
func NewStoreConfiguration() *StoreConfiguration {
return &StoreConfiguration{
GCPeriod: time.Second * 10,
PurgePeriod: time.Minute * 10,
ServiceDiscoveryConf: generic.NewServiceDiscoveryConf(),
}
}
1 change: 1 addition & 0 deletions pkg/custom-metric/provider/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ func testWithRemoteStoreWithIndex(t *testing.T, index []int) {
},
StoreServerReplicaTotal: len(index),
GCPeriod: time.Second,
PurgePeriod: time.Second,
}

lp1 := generateStorePodMeta("ns-1", "pod-1", "full_metric_with_conflict_time", 11)
Expand Down
204 changes: 144 additions & 60 deletions pkg/custom-metric/store/data/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"k8s.io/apimachinery/pkg/runtime/schema"

"github.com/kubewharf/katalyst-core/pkg/custom-metric/store/data/internal"
"github.com/kubewharf/katalyst-core/pkg/custom-metric/store/data/types"
"github.com/kubewharf/katalyst-core/pkg/metrics"
"github.com/kubewharf/katalyst-core/pkg/util/general"
Expand All @@ -43,44 +44,60 @@ const (
type CachedMetric struct {
sync.RWMutex
emitter metrics.MetricEmitter
metricMap map[types.MetricMeta]map[types.ObjectMeta]*internalMetricImp
metricMap map[types.MetricMeta]*objectMetricStore
}

func NewCachedMetric(metricsEmitter metrics.MetricEmitter) *CachedMetric {
return &CachedMetric{
emitter: metricsEmitter,
metricMap: make(map[types.MetricMeta]map[types.ObjectMeta]*internalMetricImp),
metricMap: make(map[types.MetricMeta]*objectMetricStore),
}
}

func (c *CachedMetric) addNewMetricMeta(metricMeta types.MetricMetaImp) {
c.Lock()
defer c.Unlock()

if _, ok := c.metricMap[metricMeta]; !ok {
c.metricMap[metricMeta] = newObjectMetricStore(metricMeta)
}
}

func (c *CachedMetric) getObjectMetricStore(metricMeta types.MetricMetaImp) *objectMetricStore {
c.RLock()
defer c.RUnlock()

return c.metricMap[metricMeta]
}

func (c *CachedMetric) AddSeriesMetric(sList ...types.Metric) {
start := time.Now()

c.Lock()
defer c.Unlock()
defer func() {
_ = c.emitter.StoreInt64(metricsNameKCMASStoreDataSetCost, time.Now().Sub(start).Microseconds(), metrics.MetricTypeNameRaw)
}()

var needReAggregate []*internalMetricImp
var needReAggregate []*internal.MetricImp
for _, s := range sList {
d, ok := s.(*types.SeriesMetric)
if !ok || d == nil || len(d.GetItemList()) == 0 || d.GetName() == "" {
continue
}

if _, ok := c.metricMap[d.MetricMetaImp]; !ok {
c.metricMap[d.MetricMetaImp] = make(map[types.ObjectMeta]*internalMetricImp)
c.addNewMetricMeta(d.MetricMetaImp)
}
if _, ok := c.metricMap[d.MetricMetaImp][d.ObjectMetaImp]; !ok {
c.metricMap[d.MetricMetaImp][d.ObjectMetaImp] = newInternalMetric(d.MetricMetaImp, d.ObjectMetaImp, d.BasicMetric)
objectMetricStore := c.getObjectMetricStore(d.MetricMetaImp)

if !objectMetricStore.objectExists(d.ObjectMetaImp) {
objectMetricStore.add(d.ObjectMetaImp, d.BasicMetric)
}
internalMetric := objectMetricStore.getInternalMetricImp(d.ObjectMetaImp)

added := c.metricMap[d.MetricMetaImp][d.ObjectMetaImp].addSeriesMetric(d)
added := internalMetric.AddSeriesMetric(d)
if len(added) > 0 {
needReAggregate = append(needReAggregate, c.metricMap[d.MetricMetaImp][d.ObjectMetaImp])
index := c.metricMap[d.MetricMetaImp][d.ObjectMetaImp].seriesMetric.Len() - 1
latestTimestamp := c.metricMap[d.MetricMetaImp][d.ObjectMetaImp].seriesMetric.Values[index].Timestamp
needReAggregate = append(needReAggregate, internalMetric)
latestTimestamp := internalMetric.GetLatestTimestamp()
costs := start.Sub(time.UnixMilli(latestTimestamp)).Microseconds()
general.InfofV(6, "set cache,metric name: %v, series length: %v, add length:%v, latest timestamp: %v, costs: %v(microsecond)", d.MetricMetaImp.Name,
s.Len(), len(added), latestTimestamp, costs)
Expand All @@ -90,14 +107,11 @@ func (c *CachedMetric) AddSeriesMetric(sList ...types.Metric) {
}

for _, i := range needReAggregate {
i.aggregateMetric()
i.AggregateMetric()
}
}

func (c *CachedMetric) AddAggregatedMetric(aList ...types.Metric) {
c.Lock()
defer c.Unlock()

for _, a := range aList {
d, ok := a.(*types.AggregatedMetric)
if !ok || d == nil || len(d.GetItemList()) != 1 || d.GetName() == "" {
Expand All @@ -106,13 +120,15 @@ func (c *CachedMetric) AddAggregatedMetric(aList ...types.Metric) {

baseMetricMetaImp := d.GetBaseMetricMetaImp()
if _, ok := c.metricMap[baseMetricMetaImp]; !ok {
c.metricMap[baseMetricMetaImp] = make(map[types.ObjectMeta]*internalMetricImp)
}
if _, ok := c.metricMap[baseMetricMetaImp][d.ObjectMetaImp]; !ok {
c.metricMap[baseMetricMetaImp][d.ObjectMetaImp] = newInternalMetric(baseMetricMetaImp, d.ObjectMetaImp, d.BasicMetric)
c.addNewMetricMeta(baseMetricMetaImp)
}
objectMetricStore := c.getObjectMetricStore(baseMetricMetaImp)

c.metricMap[baseMetricMetaImp][d.ObjectMetaImp].mergeAggregatedMetric(d)
if !objectMetricStore.objectExists(d.ObjectMetaImp) {
objectMetricStore.add(d.ObjectMetaImp, d.BasicMetric)
}
internalMetric := objectMetricStore.getInternalMetricImp(d.ObjectMetaImp)
internalMetric.MergeAggregatedMetric(d)
}
}

Expand All @@ -138,8 +154,8 @@ func (c *CachedMetric) ListAllMetricNames() []string {
defer c.RUnlock()

var res []string
for metricMeta, internalMap := range c.metricMap {
if len(internalMap) == 0 {
for metricMeta, objectMetricStore := range c.metricMap {
if objectMetricStore.len() == 0 {
continue
}
res = append(res, metricMeta.GetName())
Expand All @@ -151,8 +167,6 @@ func (c *CachedMetric) GetMetric(namespace, metricName string, objName string, g
start := time.Now()
originMetricName, aggName := types.ParseAggregator(metricName)

c.RLock()
defer c.RUnlock()
defer func() {
_ = c.emitter.StoreInt64(metricsNameKCMASStoreDataGetCost, time.Now().Sub(start).Microseconds(), metrics.MetricTypeNameRaw)
}()
Expand All @@ -166,18 +180,19 @@ func (c *CachedMetric) GetMetric(namespace, metricName string, objName string, g
metricMeta.ObjectKind = gr.String()
}

if internalMap, ok := c.metricMap[metricMeta]; ok {
for _, internal := range internalMap {
if internal.GetObjectNamespace() != namespace || (objName != "" && internal.GetObjectName() != objName) {
continue
objectMetricStore := c.getObjectMetricStore(metricMeta)
if objectMetricStore != nil {
objectMetricStore.iterate(func(internalMetric *internal.MetricImp) {
if internalMetric.GetObjectNamespace() != namespace || (objName != "" && internalMetric.GetObjectName() != objName) {
return
}

var metricItem types.Metric
var exist bool
if aggName == "" {
metricItem, exist = internal.getSeriesItems(latest)
metricItem, exist = internalMetric.GetSeriesItems(latest)
} else {
metricItem, exist = internal.getAggregatedItems(aggName)
metricItem, exist = internalMetric.GetAggregatedItems(aggName)
}

if exist && metricItem.Len() > 0 {
Expand All @@ -186,7 +201,7 @@ func (c *CachedMetric) GetMetric(namespace, metricName string, objName string, g
//costs := now.Sub(time.UnixMilli(internal.seriesMetric.Values[internal.len()-1].Timestamp)).Microseconds()
//_ = c.emitter.StoreInt64(metricsNameKCMASStoreDataLatencyGet, costs, metrics.MetricTypeNameRaw, internal.generateTags()...)
}
}
})
return res, true
}

Expand All @@ -203,18 +218,18 @@ func (c *CachedMetric) GetAllMetricsInNamespace(namespace string) []types.Metric

var res []types.Metric
for _, internalMap := range c.metricMap {
for _, internal := range internalMap {
if internal.GetObjectNamespace() != namespace {
continue
internalMap.iterate(func(internalMetric *internal.MetricImp) {
if internalMetric.GetObjectNamespace() != namespace {
return
}

metricItem, exist := internal.getSeriesItems(false)
metricItem, exist := internalMetric.GetSeriesItems(false)
if exist && metricItem.Len() > 0 {
res = append(res, metricItem.DeepCopy())
costs := now.Sub(time.UnixMilli(internal.seriesMetric.Values[internal.len()-1].Timestamp)).Microseconds()
_ = c.emitter.StoreInt64(metricsNameKCMASStoreDataLatencyGet, costs, metrics.MetricTypeNameRaw, internal.generateTags()...)
costs := now.Sub(time.UnixMilli(internalMetric.GetLatestTimestamp())).Microseconds()
_ = c.emitter.StoreInt64(metricsNameKCMASStoreDataLatencyGet, costs, metrics.MetricTypeNameRaw, internalMetric.GenerateTags()...)
}
}
})
}

return res
Expand All @@ -224,22 +239,29 @@ func (c *CachedMetric) GC(expiredTime time.Time) {
c.gcWithTimestamp(expiredTime.UnixMilli())
}
func (c *CachedMetric) gcWithTimestamp(expiredTimestamp int64) {
c.RLock()
defer c.RUnlock()

for _, objectMetricStore := range c.metricMap {
objectMetricStore.iterate(func(internalMetric *internal.MetricImp) {
internalMetric.GC(expiredTimestamp)
if internalMetric.Len() != 0 {
_ = c.emitter.StoreInt64(metricsNameKCMASStoreDataLength, int64(internalMetric.Len()),
metrics.MetricTypeNameRaw, internalMetric.GenerateTags()...)
_ = c.emitter.StoreInt64(metricsNameKCMASStoreWindowSeconds, (internalMetric.GetLatestTimestamp()-
internalMetric.GetOldestTimestamp())/time.Second.Milliseconds(), metrics.MetricTypeNameRaw, internalMetric.GenerateTags()...)
}
})
}
}

func (c *CachedMetric) Purge() {
c.Lock()
defer c.Unlock()

for metricMeta, internalMap := range c.metricMap {
for objectMeta, internal := range internalMap {
internal.gc(expiredTimestamp)
if len(internal.seriesMetric.Values) == 0 {
delete(internalMap, objectMeta)
} else {
_ = c.emitter.StoreInt64(metricsNameKCMASStoreDataLength, int64(len(internal.seriesMetric.Values)),
metrics.MetricTypeNameRaw, internal.generateTags()...)
_ = c.emitter.StoreInt64(metricsNameKCMASStoreWindowSeconds, (internal.seriesMetric.Values[len(internal.seriesMetric.Values)-1].Timestamp-
internal.seriesMetric.Values[0].Timestamp)/time.Second.Milliseconds(), metrics.MetricTypeNameRaw, internal.generateTags()...)
}
}
if len(internalMap) == 0 {
for metricMeta, store := range c.metricMap {
store.purge()
if store.len() == 0 {
delete(c.metricMap, metricMeta)
}
}
Expand All @@ -263,25 +285,87 @@ func MergeInternalMetricList(metricName string, metricLists ...[]types.Metric) [
for _, metricList := range metricLists {
c.AddSeriesMetric(metricList...)
}
for _, internalMap := range c.metricMap {
for _, internal := range internalMap {
if metricItem, exist := internal.getSeriesItems(false); exist && metricItem.Len() > 0 {
for _, objectMetricStore := range c.metricMap {
objectMetricStore.iterate(func(internalMetric *internal.MetricImp) {
if metricItem, exist := internalMetric.GetSeriesItems(false); exist && metricItem.Len() > 0 {
res = append(res, metricItem)
}
}
})
}
} else {
for _, metricList := range metricLists {
c.AddAggregatedMetric(metricList...)
}
for _, internalMap := range c.metricMap {
for _, internal := range internalMap {
if metricItem, exist := internal.getAggregatedItems(aggName); exist && metricItem.Len() > 0 {
for _, objectMetricStore := range c.metricMap {
objectMetricStore.iterate(func(internalMetric *internal.MetricImp) {
if metricItem, exist := internalMetric.GetAggregatedItems(aggName); exist && metricItem.Len() > 0 {
res = append(res, metricItem)
}
}
})
}
}

return res
}

type objectMetricStore struct {
metricMeta types.MetricMetaImp
objectMap map[types.ObjectMeta]*internal.MetricImp
sync.RWMutex
}

func newObjectMetricStore(metricMeta types.MetricMetaImp) *objectMetricStore {
return &objectMetricStore{
metricMeta: metricMeta,
objectMap: make(map[types.ObjectMeta]*internal.MetricImp),
}
}

func (s *objectMetricStore) add(objectMeta types.ObjectMetaImp, basicMeta types.BasicMetric) {
s.Lock()
defer s.Unlock()

if _, ok := s.objectMap[objectMeta]; !ok {
s.objectMap[objectMeta] = internal.NewInternalMetric(s.metricMeta, objectMeta, basicMeta)
}
}

func (s *objectMetricStore) objectExists(objectMeta types.ObjectMeta) (exist bool) {
s.RLock()
defer s.RUnlock()

_, exist = s.objectMap[objectMeta]
return
}

func (s *objectMetricStore) getInternalMetricImp(objectMeta types.ObjectMeta) *internal.MetricImp {
s.RLock()
defer s.RUnlock()

return s.objectMap[objectMeta]
}

// this function is read only,please do not perform any write operation like add/delete to this object
func (s *objectMetricStore) iterate(f func(internalMetric *internal.MetricImp)) {
s.RLock()
defer s.RUnlock()

for _, InternalMetricImp := range s.objectMap {
f(InternalMetricImp)
}
}

func (s *objectMetricStore) purge() {
s.Lock()
defer s.Unlock()

for _, internalMetric := range s.objectMap {
if internalMetric.Empty() {
delete(s.objectMap, internalMetric.ObjectMetaImp)
}
}
}

func (s *objectMetricStore) len() int {
return len(s.objectMap)
}
4 changes: 3 additions & 1 deletion pkg/custom-metric/store/data/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ func TestCache(t *testing.T) {

t.Log("#### 8: gcMetric")
c.gcWithTimestamp(3)
c.Purge()
names = c.ListAllMetricNames()
assert.ElementsMatch(t, []string{"m-3"}, names)

Expand Down Expand Up @@ -607,7 +608,8 @@ func TestMergeInternalMetricList(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equalf(t, tt.want, MergeInternalMetricList(tt.args.metricName, tt.args.metricLists...), "MergeInternalMetricList(%v, %v)", tt.args.metricName, tt.args.metricLists)
got := MergeInternalMetricList(tt.args.metricName, tt.args.metricLists...)
assert.Equalf(t, tt.want, got, "MergeInternalMetricList(%v, %v)", tt.args.metricName, tt.args.metricLists)
})
}
}
Loading

0 comments on commit 5a79950

Please sign in to comment.