Skip to content

Commit

Permalink
fix meta protocol support
Browse files Browse the repository at this point in the history
fix

unit test
  • Loading branch information
wuhua3 committed Oct 14, 2024
1 parent b035daf commit 984027a
Show file tree
Hide file tree
Showing 6 changed files with 477 additions and 114 deletions.
91 changes: 0 additions & 91 deletions endpoint/mockDynamicEndpoint.go

This file was deleted.

111 changes: 105 additions & 6 deletions lb/weightRoundRobinLb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ import (
"fmt"
"github.com/stretchr/testify/assert"
"github.com/weibocom/motan-go/core"
"github.com/weibocom/motan-go/endpoint"
"github.com/weibocom/motan-go/meta"
mpro "github.com/weibocom/motan-go/protocol"
"github.com/weibocom/motan-go/serialize"
"math"
"math/rand"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -45,7 +48,7 @@ func TestDynamicStaticWeight(t *testing.T) {
}

// test dynamic wight change
lb.refresher.weightedEpHolders.Load().([]*WeightedEpHolder)[3].ep.(*endpoint.MockDynamicEndpoint).SetWeight(true, 22)
lb.refresher.weightedEpHolders.Load().([]*WeightedEpHolder)[3].ep.(*MockDynamicEndpoint).SetWeight(true, 22)
meta.ClearMetaCache()
time.Sleep(time.Second * 5)
//assert.Equal(t, int64(22), lb.refresher.weightedEpHolders.Load().([]*WeightedEpHolder)[3].dynamicWeight)
Expand All @@ -65,7 +68,7 @@ func TestGetEpWeight(t *testing.T) {
Port: 8080,
Path: "mockService",
}
ep := endpoint.NewMockDynamicEndpoint(url)
ep := NewMockDynamicEndpoint(url)
type test struct {
expectWeight int64
fromDynamic bool
Expand Down Expand Up @@ -273,7 +276,7 @@ func processCheck(t *testing.T, lb *WeightRoundRobinLB, typ string, eps []core.E
if !ep.IsAvailable() {
continue
}
totalWeight += ep.(*endpoint.MockDynamicEndpoint).StaticWeight
totalWeight += ep.(*MockDynamicEndpoint).StaticWeight
}
for i := 0; i < int(totalWeight)*round; i++ {
lb.Select(nil).Call(nil)
Expand All @@ -285,7 +288,7 @@ func processCheck(t *testing.T, lb *WeightRoundRobinLB, typ string, eps []core.E
if !ep.IsAvailable() {
unavailableCount++
} else {
mep := ep.(*endpoint.MockDynamicEndpoint)
mep := ep.(*MockDynamicEndpoint)
ratio := float64(atomic.LoadInt64(&mep.Count)) / float64(mep.StaticWeight)
delta := math.Abs(ratio - float64(round))
if delta > maxDelta {
Expand Down Expand Up @@ -335,7 +338,7 @@ func buildTestEps(size int, sameStaticWeight bool, maxWeight int64, adjust bool,
if url.GetParam(core.DynamicMetaKey, "") == "false" {
curUrl.PutParam(core.DynamicMetaKey, "false")
}
ep := endpoint.NewMockDynamicEndpointWithWeight(curUrl, weight)
ep := NewMockDynamicEndpointWithWeight(curUrl, weight)
if i < unavailableSize {
ep.SetAvailable(false)
}
Expand All @@ -356,3 +359,99 @@ func doAdjust(w int64) int64 {
return w
}
}

type MockDynamicEndpoint struct {
URL *core.URL
available bool
DynamicWeight int64
StaticWeight int64
Count int64
dynamicMeta sync.Map
}

func (m *MockDynamicEndpoint) GetName() string {
return "mockEndpoint"
}

func (m *MockDynamicEndpoint) GetURL() *core.URL {
return m.URL
}

func (m *MockDynamicEndpoint) SetURL(url *core.URL) {
m.URL = url
}

func (m *MockDynamicEndpoint) IsAvailable() bool {
return m.available
}

func (m *MockDynamicEndpoint) SetAvailable(a bool) {
m.available = a
}

func (m *MockDynamicEndpoint) SetProxy(proxy bool) {}

func (m *MockDynamicEndpoint) SetSerialization(s core.Serialization) {}

func (m *MockDynamicEndpoint) Call(request core.Request) core.Response {
if isMetaServiceRequest(request) {
resMap := map[string]string{}
m.dynamicMeta.Range(func(key, value interface{}) bool {
resMap[key.(string)] = value.(string)
return true
})
atomic.AddInt64(&m.Count, 1)
breezeSerialize := &serialize.BreezeSerialization{}
b, err := breezeSerialize.Serialize(resMap)
if err != nil {
return &core.MotanResponse{
Exception: &core.Exception{ErrCode: 500, ErrMsg: "serialize fail"},
}
}

return &core.MotanResponse{
ProcessTime: 1,
Value: &core.DeserializableValue{Body: b, Serialization: breezeSerialize},
RPCContext: &core.RPCContext{
Serialized: true,
SerializeNum: serialize.BreezeNumber,
}}
}
atomic.AddInt64(&m.Count, 1)
return &core.MotanResponse{ProcessTime: 1, Value: "ok"}
}

func (m *MockDynamicEndpoint) Destroy() {}

func (m *MockDynamicEndpoint) GetRuntimeInfo() map[string]interface{} {
return make(map[string]interface{})
}

func (m *MockDynamicEndpoint) SetWeight(isDynamic bool, weight int64) {
if isDynamic {
m.DynamicWeight = weight
m.dynamicMeta.Store(core.DefaultMetaPrefix+core.WeightMetaSuffixKey, strconv.Itoa(int(weight)))
} else {
m.StaticWeight = weight
m.URL.PutParam(core.DefaultMetaPrefix+core.WeightMetaSuffixKey, strconv.Itoa(int(weight)))
}
}

func NewMockDynamicEndpoint(url *core.URL) *MockDynamicEndpoint {
return &MockDynamicEndpoint{
URL: url,
available: true,
}
}

func NewMockDynamicEndpointWithWeight(url *core.URL, staticWeight int64) *MockDynamicEndpoint {
res := NewMockDynamicEndpoint(url)
res.StaticWeight = staticWeight
res.URL.PutParam(core.DefaultMetaPrefix+core.WeightMetaSuffixKey, strconv.Itoa(int(staticWeight)))
return res
}

func isMetaServiceRequest(request core.Request) bool {
return request != nil && "com.weibo.api.motan.runtime.meta.MetaService" == request.GetServiceName() &&
"getDynamicMeta" == request.GetMethod() && "y" == request.GetAttachment(mpro.MFrameworkService)
}
36 changes: 21 additions & 15 deletions meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@ package meta

import (
"errors"
"github.com/patrickmn/go-cache"
"github.com/weibocom/motan-go/core"
"github.com/weibocom/motan-go/endpoint"
vlog "github.com/weibocom/motan-go/log"
mpro "github.com/weibocom/motan-go/protocol"
"os"
"strconv"
"strings"
"sync"
"time"

"github.com/patrickmn/go-cache"
"github.com/weibocom/motan-go/core"
"github.com/weibocom/motan-go/endpoint"
vlog "github.com/weibocom/motan-go/log"
mpro "github.com/weibocom/motan-go/protocol"
"github.com/weibocom/motan-go/serialize"
)

const (
Expand All @@ -33,7 +35,7 @@ var (
"grpc-pb-json": true,
}
supportProtocols = map[string]bool{
"motan": true,
"motan": false,
"motan2": true,
}
once = sync.Once{}
Expand Down Expand Up @@ -147,15 +149,15 @@ func getRemoteDynamicMeta(cacheKey string, endpoint core.EndPoint) (map[string]s
}
return nil, errors.New(resp.GetException().ErrMsg)
}
reply := make(map[string]string)
err := resp.ProcessDeserializable(&reply)
if err != nil {
return nil, err
}
// multiple serialization might encode empty map into interface{}, not map[string]string
// in this case, return a public empty string map
if res, ok := resp.GetValue().(map[string]string); ok && res != nil {
return res, nil
if d, ok := resp.GetValue().(*core.DeserializableValue); ok {
reply := make(map[string]string)
// only support breeze serializer and motan2 protocol
breezeSerialize := &serialize.BreezeSerialization{}
_, err := breezeSerialize.DeSerialize(d.Body, &reply)
if err != nil {
return nil, err
}
return reply, nil
}
return metaEmptyMap, nil
}
Expand All @@ -167,6 +169,10 @@ func getMetaServiceRequest() core.Request {
Method: MetaMethodName,
Attachment: core.NewStringMap(core.DefaultAttachmentSize),
Arguments: []interface{}{},
RPCContext: &core.RPCContext{
Serialized: true,
SerializeNum: serialize.BreezeNumber,
},
}
req.SetAttachment(mpro.MFrameworkService, "y")
return req
Expand Down
Loading

0 comments on commit 984027a

Please sign in to comment.