Skip to content

Commit

Permalink
add weighted_lb feature
Browse files Browse the repository at this point in the history
  • Loading branch information
liangwei3 committed Apr 2, 2024
1 parent 4f95c41 commit f1309c3
Show file tree
Hide file tree
Showing 22 changed files with 1,426 additions and 31 deletions.
20 changes: 19 additions & 1 deletion agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"github.com/weibocom/motan-go/endpoint"
vlog "github.com/weibocom/motan-go/log"
"github.com/weibocom/motan-go/meta"
"github.com/weibocom/motan-go/provider"
"gopkg.in/yaml.v2"
"io/ioutil"
"net"
Expand Down Expand Up @@ -193,6 +195,8 @@ func (a *Agent) StartMotanAgentFromConfig(config *cfg.Config) {
return
}
fmt.Println("init agent context success.")
// initialize meta package
meta.Initialize(a.Context)
a.initParam()
a.SetSanpshotConf()
a.initAgentURL()
Expand Down Expand Up @@ -932,11 +936,18 @@ func (a *Agent) doExportService(url *motan.URL) {
}

type serverAgentMessageHandler struct {
providers *motan.CopyOnWriteMap
providers *motan.CopyOnWriteMap
frameworkProviders *motan.CopyOnWriteMap
}

func (sa *serverAgentMessageHandler) Initialize() {
sa.providers = motan.NewCopyOnWriteMap()
sa.frameworkProviders = motan.NewCopyOnWriteMap()
sa.initFrameworkServiceProvider()
}

func (sa *serverAgentMessageHandler) initFrameworkServiceProvider() {
sa.frameworkProviders.Store(meta.MetaServiceName, &provider.MetaProvider{})
}

func getServiceKey(group, path string) string {
Expand All @@ -954,6 +965,13 @@ func (sa *serverAgentMessageHandler) Call(request motan.Request) (res motan.Resp
group = request.GetAttachment(motan.GroupKey)
}
serviceKey := getServiceKey(group, request.GetServiceName())
if mfs := request.GetAttachment(mpro.MFrameworkService); mfs != "" {
if fp, ok := sa.frameworkProviders.Load(request.GetServiceName()); ok {
return fp.(motan.Provider).Call(request)
}
//throw specific exception to avoid triggering forced fusing on the client side。
return motan.BuildExceptionResponse(request.GetRequestID(), &motan.Exception{ErrCode: 501, ErrMsg: motan.ServiceNotSupport, ErrType: motan.ServiceException})
}
if p := sa.providers.LoadOrNil(serviceKey); p != nil {
p := p.(motan.Provider)
res = p.Call(request)
Expand Down
4 changes: 4 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,3 +173,7 @@ func (m *MCContext) GetRefer(service string) interface{} {
// TODO 对client的封装,可以根据idl自动生成代码时支持
return nil
}

func (m *MCContext) GetContext() *motan.Context {
return m.context
}
3 changes: 3 additions & 0 deletions cluster/motanCluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,9 @@ func (m *MotanCluster) Destroy() {
vlog.Infof("destroy endpoint %s .", e.GetURL().GetIdentity())
e.Destroy()
}
if d, ok := m.LoadBalance.(motan.Destroyable); ok {
d.Destroy()
}
m.closed = true
}
}
Expand Down
9 changes: 7 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,17 @@ func (c *Config) Int64(key string) (int64, error) {

// String returns the string value for a given key.
func (c *Config) String(key string) string {
return c.GetStringWithDefault(key, "")
}

// String returns the string value for a given key.
func (c *Config) GetStringWithDefault(key string, def string) string {
if value, err := c.getData(key); err != nil {
return ""
return def
} else if vv, ok := value.(string); ok {
return vv
}
return ""
return def
}

// GetSection returns map for the given key
Expand Down
14 changes: 14 additions & 0 deletions core/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,17 @@ const (
const (
DefaultReferVersion = "1.0"
)

// meta info
const (
DefaultMetaPrefix = "META_"
EnvMetaPrefixKey = "envMetaPrefix"
URLRegisterMeta = "registerMeta"
DefaultRegisterMeta = true
MetaCacheExpireSecondKey = "metaCacheExpireSecond"
DynamicMetaKey = "dynamicMeta"
DefaultDynamicMeta = true
WeightRefreshPeriodSecondKey = "weightRefreshPeriodSecond"
WeightMetaSuffixKey = "WEIGHT"
ServiceNotSupport = "service not support"
)
1 change: 0 additions & 1 deletion core/globalContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,6 @@ func (c *Context) basicConfToURLs(section string) map[string]*URL {
if len(finalFilters) > 0 {
newURL.PutParam(FilterKey, c.FilterSetToStr(finalFilters))
}

newURLs[key] = newURL
}
return newURLs
Expand Down
26 changes: 17 additions & 9 deletions core/motan.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,12 @@ type LoadBalance interface {
SetWeight(weight string)
}

// WeightLoadBalance : weight loadBalance for cluster
type WeightLoadBalance interface {
LoadBalance
NotifyWeightChange()
}

// DiscoverService : discover service for cluster
type DiscoverService interface {
Subscribe(url *URL, listener NotifyListener)
Expand Down Expand Up @@ -724,6 +730,7 @@ func BuildExceptionResponse(requestid uint64, e *Exception) *MotanResponse {
type DefaultFilterFunc func() Filter
type NewHaFunc func(url *URL) HaStrategy
type NewLbFunc func(url *URL) LoadBalance
type NewLbWithDestroyFunc func(url *URL) (LoadBalance, func())
type NewEndpointFunc func(url *URL) EndPoint
type NewProviderFunc func(url *URL) Provider
type NewRegistryFunc func(url *URL) Registry
Expand All @@ -733,15 +740,16 @@ type NewSerializationFunc func() Serialization

type DefaultExtensionFactory struct {
// factories
filterFactories map[string]DefaultFilterFunc
haFactories map[string]NewHaFunc
lbFactories map[string]NewLbFunc
endpointFactories map[string]NewEndpointFunc
providerFactories map[string]NewProviderFunc
registryFactories map[string]NewRegistryFunc
servers map[string]NewServerFunc
messageHandlers map[string]NewMessageHandlerFunc
serializations map[string]NewSerializationFunc
filterFactories map[string]DefaultFilterFunc
haFactories map[string]NewHaFunc
lbFactories map[string]NewLbFunc
endpointFactories map[string]NewEndpointFunc
providerFactories map[string]NewProviderFunc
frameworkFactories map[string]NewProviderFunc
registryFactories map[string]NewRegistryFunc
servers map[string]NewServerFunc
messageHandlers map[string]NewMessageHandlerFunc
serializations map[string]NewSerializationFunc

// singleton instance
registries map[string]Registry
Expand Down
8 changes: 5 additions & 3 deletions core/url.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,9 +336,11 @@ func (u *URL) CanServe(other *URL) bool {
vlog.Errorf("can not serve path, err : p1:%s, p2:%s", u.Path, other.Path)
return false
}
if !IsSame(u.Parameters, other.Parameters, SerializationKey, "") {
vlog.Errorf("can not serve serialization, err : s1:%s, s2:%s", u.Parameters[SerializationKey], other.Parameters[SerializationKey])
return false
if u.Protocol != "motan2" {
if !IsSame(u.Parameters, other.Parameters, SerializationKey, "") {
vlog.Errorf("can not serve serialization, err : s1:%s, s2:%s", u.Parameters[SerializationKey], other.Parameters[SerializationKey])
return false
}
}
// compatible with old version: 0.1
if !(IsSame(u.Parameters, other.Parameters, VersionKey, "0.1") || IsSame(u.Parameters, other.Parameters, VersionKey, DefaultReferVersion)) {
Expand Down
25 changes: 25 additions & 0 deletions core/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,24 @@ func SliceShuffle(slice []string) []string {
return slice
}

func EndpointShuffle(slice []EndPoint) []EndPoint {
for i := 0; i < len(slice); i++ {
a := rand.Intn(len(slice))
b := rand.Intn(len(slice))
slice[a], slice[b] = slice[b], slice[a]
}
return slice
}

func ByteSliceShuffle(slice []byte) []byte {
for i := 0; i < len(slice); i++ {
a := rand.Intn(len(slice))
b := rand.Intn(len(slice))
slice[a], slice[b] = slice[b], slice[a]
}
return slice
}

func FirstUpper(s string) string {
r := []rune(s)

Expand Down Expand Up @@ -286,3 +304,10 @@ func ClearDirectEnvRegistry() {
directRpc = nil
initDirectEnv = sync.Once{}
}

func GetNonNegative(originValue int64) int64 {
if originValue > 0 {
return originValue
}
return 0x7fffffff & originValue
}
7 changes: 7 additions & 0 deletions default.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,15 @@ func GetDefaultManageHandlers() map[string]http.Handler {
defaultManageHandlers["/registry/list"] = dynamicConfigurer
defaultManageHandlers["/registry/info"] = dynamicConfigurer

metaInfo := &MetaInfo{}
defaultManageHandlers["/meta/update"] = metaInfo
defaultManageHandlers["/meta/delete"] = metaInfo
defaultManageHandlers["/meta/get"] = metaInfo
defaultManageHandlers["/meta/getAll"] = metaInfo

hotReload := &HotReload{}
defaultManageHandlers["/reload/clusters"] = hotReload

})
return defaultManageHandlers
}
Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/weibocom/motan-go

go 1.11
go 1.16

require (
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5
Expand All @@ -16,12 +16,13 @@ require (
github.com/mitchellh/mapstructure v1.1.2
github.com/opentracing/opentracing-go v1.0.2
github.com/panjf2000/ants/v2 v2.9.0
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a
github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec
github.com/shirou/gopsutil/v3 v3.21.9
github.com/smartystreets/goconvey v1.6.4 // indirect
github.com/stretchr/testify v1.8.2
github.com/stretchr/testify v1.8.4
github.com/valyala/fasthttp v1.2.0
github.com/weibreeze/breeze-go v0.1.1
go.uber.org/atomic v1.4.0 // indirect
Expand Down
27 changes: 27 additions & 0 deletions lb/lb.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const (
Random = "random"
Roundrobin = "roundrobin"
ConsistentHashKey = "consistentHashKey"
WeightRoundRobin = "wrr"
)

const (
Expand All @@ -39,6 +40,9 @@ func RegistDefaultLb(extFactory motan.ExtensionFactory) {
extFactory.RegistExtLb(ConsistentHashKey, NewWeightLbFunc(func(url *motan.URL) motan.LoadBalance {
return &ConsistentHashLB{url: url}
}))
extFactory.RegistExtLb(WeightRoundRobin, NewWeightLbFunc(func(url *motan.URL) motan.LoadBalance {
return NewWeightRondRobinLb(url)
}))
}

// WeightedLbWrapper support multi group weighted LB
Expand All @@ -55,6 +59,25 @@ func NewWeightLbFunc(newLb motan.NewLbFunc) motan.NewLbFunc {
}
}

func (w *WeightedLbWrapper) Destroy() {
destroyInnerRefers(w.refers)
}

func destroyInnerRefers(refers innerRefers) {
if v, ok := refers.(*singleGroupRefers); ok {
if vlb, ok := v.lb.(motan.Destroyable); ok {
vlb.Destroy()
}
}
if v, ok := refers.(*weightedRefers); ok {
for _, lb := range v.groupLb {
if vlb, ok := lb.(motan.Destroyable); ok {
vlb.Destroy()
}
}
}
}

func (w *WeightedLbWrapper) OnRefresh(endpoints []motan.EndPoint) {
if w.weightString == "" { //not weighted lb
vlog.Infof("WeightedLbWrapper: %s - OnRefresh:not have weight", w.url.GetIdentity())
Expand Down Expand Up @@ -130,7 +153,9 @@ func (w *WeightedLbWrapper) OnRefresh(endpoints []motan.EndPoint) {
}
wr.weightRing = motan.SliceShuffle(ring)
wr.ringSize = len(wr.weightRing)
oldRefers := w.refers
w.refers = wr
destroyInnerRefers(oldRefers)
vlog.Infof("WeightedLbWrapper: %s - OnRefresh: weight:%s", w.url.GetIdentity(), w.weightString)
}

Expand All @@ -140,7 +165,9 @@ func (w *WeightedLbWrapper) onRefreshSingleGroup(endpoints []motan.EndPoint) {
} else {
lb := w.newLb(w.url)
lb.OnRefresh(endpoints)
oldRefers := w.refers
w.refers = &singleGroupRefers{lb: lb}
destroyInnerRefers(oldRefers)
}
}

Expand Down
Loading

0 comments on commit f1309c3

Please sign in to comment.