Skip to content

Commit

Permalink
lazyload: optimiz some svf reseted when master-slave switch
Browse files Browse the repository at this point in the history
  • Loading branch information
MouceL authored and YonkaFang committed Apr 21, 2023
1 parent fbb32c3 commit 0e4a68a
Show file tree
Hide file tree
Showing 9 changed files with 58 additions and 25 deletions.
15 changes: 4 additions & 11 deletions framework/model/metric/accesslog_convertor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,14 @@ type AccessLogConvertor struct {
}

func NewAccessLogConvertor(config AccessLogConvertorConfig) *AccessLogConvertor {

newCacheResultCopy := make(map[string]map[string]string)
for meta, value := range config.InitCache {
tmpValue := make(map[string]string)
for k, v := range value {
tmpValue[k] = v
}
newCacheResultCopy[meta] = tmpValue
}
result := make(map[string]map[string]string)
resultCopy := make(map[string]map[string]string)

return &AccessLogConvertor{
name: config.Name,
handler: config.Handler,
cacheResult: config.InitCache,
cacheResultCopy: newCacheResultCopy,
cacheResult: result,
cacheResultCopy: resultCopy,
}
}

Expand Down
18 changes: 18 additions & 0 deletions framework/model/metric/accesslog_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,21 @@ func (s *AccessLogSource) Reset(info string) error {

return nil
}

func (s *AccessLogSource) Fullfill(cache map[string]map[string]string) error {

for _, convertor := range s.convertors {
convertor.convertorLock.Lock()
for meta, value := range cache {
convertor.cacheResult[meta] = value
tmpValue := make(map[string]string)
for k, v := range value {
tmpValue[k] = v
}

convertor.cacheResultCopy[meta] = tmpValue
}
convertor.convertorLock.Unlock()
}
return nil
}
5 changes: 2 additions & 3 deletions framework/model/metric/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ type AccessLogSourceConfig struct {
}

type AccessLogConvertorConfig struct {
Name string // handler name
Handler func(logEntry []*data_accesslog.HTTPAccessLogEntry) (map[string]map[string]string, error)
InitCache map[string]map[string]string // for cacheResult init
Name string // handler name
Handler func(logEntry []*data_accesslog.HTTPAccessLogEntry) (map[string]map[string]string, error)
}
4 changes: 4 additions & 0 deletions framework/model/metric/mock_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,7 @@ func (s *MockSource) QueryMetric(queryMap QueryMap) (Metric, error) {
func (s *MockSource) Reset(info string) error {
return nil
}

func (s *MockSource) Fullfill(cache map[string]map[string]string) error {
return nil
}
1 change: 1 addition & 0 deletions framework/model/metric/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ type Source interface {
QueryMetric(queryMap QueryMap) (Metric, error)
Start() error
Reset(info string) error
Fullfill(map[string]map[string]string) error
}
4 changes: 4 additions & 0 deletions framework/model/metric/prometheus_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func (ps *PrometheusSource) Reset(info string) error {
return nil
}

func (ps *PrometheusSource) Fullfill(cache map[string]map[string]string) error {
return nil
}

func defaultConvertor(qv prometheusModel.Value) map[string]string {
result := make(map[string]string)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,21 +127,13 @@ func NewProducerConfig(env bootstrap.Environment) (*metric.ProducerConfig, error
// init log source port
port := env.Config.Global.Misc["logSourcePort"]

// init initCache
initCache, err := newInitCache(env)
if err != nil {
return nil, err
}
log.Debugf("initCache is %+v", initCache)

// init accessLog source config
accessLogSourceConfig = metric.AccessLogSourceConfig{
ServePort: port,
AccessLogConvertorConfigs: []metric.AccessLogConvertorConfig{
{
Name: AccessLogConvertorName,
Handler: nil,
InitCache: initCache,
Name: AccessLogConvertorName,
Handler: nil,
},
},
}
Expand Down Expand Up @@ -209,7 +201,7 @@ func newPrometheusSourceConfig(env bootstrap.Environment) (metric.PrometheusSour
}, nil
}

func newInitCache(env bootstrap.Environment) (map[string]map[string]string, error) {
func NewCache(env bootstrap.Environment) (map[string]map[string]string, error) {
result := make(map[string]map[string]string)

svfGvr := schema.GroupVersionResource{
Expand Down
1 change: 1 addition & 0 deletions staging/src/slime.io/slime/modules/lazyload/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
k8s.io/api v0.22.2
k8s.io/apimachinery v0.22.2
k8s.io/client-go v0.22.2
k8s.io/kube-openapi v0.0.0-20210421082810-95288971da7e
sigs.k8s.io/controller-runtime v0.10.3
sigs.k8s.io/yaml v1.2.0
slime.io/slime/framework v0.0.0-00010101000000-000000000000
Expand Down
21 changes: 21 additions & 0 deletions staging/src/slime.io/slime/modules/lazyload/module/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,17 @@ func (m *Module) Setup(opts module.ModuleOptions) error {
sfReconciler.StartIpToSvcCache(ctx)
})

// build metric source
source := metric.NewSource(pc)

cache, err := controllers.NewCache(env)
if err != nil {
return fmt.Errorf("GetCacheFromServicefence occured err: %s", err)
}
source.Fullfill(cache)
log.Debugf("GetCacheFromServicefence %+v", cache)

// register svf reset
handler := &server.Handler{
HttpPathHandler: env.HttpPathHandler,
Source: source,
Expand Down Expand Up @@ -122,6 +132,17 @@ func (m *Module) Setup(opts module.ModuleOptions) error {
return fmt.Errorf("unable to create controller,%+v", err)
}

le.AddOnStartedLeading(func(ctx context.Context) {
log.Infof("retrieve metric from svf status.metric")
cache, err := controllers.NewCache(env)
if err != nil {
log.Warnf("GetCacheFromServicefence occured err in StartedLeading: %s", err)
return
}
source.Fullfill(cache)
log.Debugf("GetCacheFromServicefence is %+v", cache)
})

le.AddOnStartedLeading(func(ctx context.Context) {
log.Infof("producers starts")
metric.NewProducer(pc, source)
Expand Down

0 comments on commit 0e4a68a

Please sign in to comment.