Skip to content

Commit

Permalink
Merge pull request #406 from cocowh/fix_meta
Browse files Browse the repository at this point in the history
fix meta protocol support
  • Loading branch information
rayzhang0603 authored Oct 15, 2024
2 parents b035daf + 71649f7 commit 7179c8d
Show file tree
Hide file tree
Showing 6 changed files with 349 additions and 15 deletions.
12 changes: 12 additions & 0 deletions endpoint/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func GenerateRequestID() uint64 {
type MockEndpoint struct {
URL *motan.URL
MockResponse motan.Response
available atomic.Value
}

func (m *MockEndpoint) GetRuntimeInfo() map[string]interface{} {
Expand All @@ -85,9 +86,20 @@ func (m *MockEndpoint) SetURL(url *motan.URL) {
}

func (m *MockEndpoint) IsAvailable() bool {
if m.available.Load() == nil {
return true
}
b, ok := m.available.Load().(bool)
if ok {
return b
}
return true
}

func (m *MockEndpoint) SetAvailable(available bool) {
m.available.Store(available)
}

func (m *MockEndpoint) SetProxy(proxy bool) {}

func (m *MockEndpoint) SetSerialization(s motan.Serialization) {}
Expand Down
4 changes: 2 additions & 2 deletions lb/weightRoundRobinLb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,8 @@ func processCheck(t *testing.T, lb *WeightRoundRobinLB, typ string, eps []core.E
for i := 0; i < int(totalWeight)*round; i++ {
lb.Select(nil).Call(nil)
}
var maxDelta float64 = 0.0
var totalDelta float64 = 0.0
var maxDelta = 0.0
var totalDelta = 0.0
unavailableCount := 0
for _, ep := range eps {
if !ep.IsAvailable() {
Expand Down
33 changes: 22 additions & 11 deletions meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@ package meta

import (
"errors"
"github.com/patrickmn/go-cache"
"github.com/weibocom/motan-go/core"
"github.com/weibocom/motan-go/endpoint"
vlog "github.com/weibocom/motan-go/log"
mpro "github.com/weibocom/motan-go/protocol"
"os"
"strconv"
"strings"
"sync"
"time"

"github.com/patrickmn/go-cache"
"github.com/weibocom/motan-go/core"
"github.com/weibocom/motan-go/endpoint"
vlog "github.com/weibocom/motan-go/log"
mpro "github.com/weibocom/motan-go/protocol"
"github.com/weibocom/motan-go/serialize"
)

const (
Expand All @@ -33,7 +35,7 @@ var (
"grpc-pb-json": true,
}
supportProtocols = map[string]bool{
"motan": true,
"motan": false,
"motan2": true,
}
once = sync.Once{}
Expand Down Expand Up @@ -147,12 +149,17 @@ func getRemoteDynamicMeta(cacheKey string, endpoint core.EndPoint) (map[string]s
}
return nil, errors.New(resp.GetException().ErrMsg)
}
reply := make(map[string]string)
err := resp.ProcessDeserializable(&reply)
if err != nil {
return nil, err
if d, ok := resp.GetValue().(*core.DeserializableValue); ok {
reply := make(map[string]string)
// only support breeze serializer and motan2 protocol
breezeSerialize := &serialize.BreezeSerialization{}
_, err := breezeSerialize.DeSerialize(d.Body, &reply)
if err != nil {
return nil, err
}
return reply, nil
}
// multiple serialization might encode empty map into interface{}, not map[string]string
// multiple serialization might encode an empty map into interface{}, not map[string]string
// in this case, return a public empty string map
if res, ok := resp.GetValue().(map[string]string); ok && res != nil {
return res, nil
Expand All @@ -167,6 +174,10 @@ func getMetaServiceRequest() core.Request {
Method: MetaMethodName,
Attachment: core.NewStringMap(core.DefaultAttachmentSize),
Arguments: []interface{}{},
RPCContext: &core.RPCContext{
Serialized: true,
SerializeNum: serialize.BreezeNumber,
},
}
req.SetAttachment(mpro.MFrameworkService, "y")
return req
Expand Down
236 changes: 236 additions & 0 deletions meta/meta_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
package meta

import (
"errors"
"github.com/weibocom/motan-go/endpoint"
"github.com/weibocom/motan-go/serialize"
"testing"

"github.com/weibocom/motan-go/core"
)

func TestIsSupport(t *testing.T) {
tests := []struct {
name string
cacheKey string
url *core.URL
want bool
}{
{
name: "Supported Protocol and Serializer",
cacheKey: "test1",
url: &core.URL{
Protocol: "motan2",
Parameters: map[string]string{
core.DynamicMetaKey: "true",
core.SerializationKey: "breeze",
},
},
want: true,
},
{
name: "Unsupported Protocol",
cacheKey: "test2",
url: &core.URL{
Protocol: "motan",
Parameters: map[string]string{
core.DynamicMetaKey: "true",
core.SerializationKey: "breeze",
},
},
want: false,
},
{
name: "Unsupported Serializer",
cacheKey: "test3",
url: &core.URL{
Protocol: "motan2",
Parameters: map[string]string{
core.DynamicMetaKey: "true",
core.SerializationKey: "protobuf",
},
},
want: false,
},
{
name: "Dynamic Meta Disabled",
cacheKey: "test4",
url: &core.URL{
Protocol: "motan2",
Parameters: map[string]string{
core.DynamicMetaKey: "false",
core.SerializationKey: "breeze",
},
},
want: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := isSupport(tt.cacheKey, tt.url); got != tt.want {
t.Errorf("isSupport() = %v, want %v", got, tt.want)
}
notSupportCache.Flush()
})
}
}

func TestGetEpStaticMeta(t *testing.T) {
tests := []struct {
name string
endpoint core.EndPoint
want map[string]string
}{
{
name: "With DefaultMetaPrefix",
endpoint: &endpoint.MockEndpoint{
URL: &core.URL{
Parameters: map[string]string{
core.DefaultMetaPrefix + "key1": "value1",
"otherKey": "value2",
},
},
},
want: map[string]string{
core.DefaultMetaPrefix + "key1": "value1",
},
},
{
name: "With EnvPrefix",
endpoint: &endpoint.MockEndpoint{
URL: &core.URL{
Parameters: map[string]string{
envPrefix + "key2": "value3",
"otherKey": "value4",
},
},
},
want: map[string]string{
envPrefix + "key2": "value3",
},
},
{
name: "No Matching Prefix",
endpoint: &endpoint.MockEndpoint{
URL: &core.URL{
Parameters: map[string]string{
"key3": "value5",
},
},
},
want: map[string]string{},
},
{
name: "Nil URL",
endpoint: &endpoint.MockEndpoint{URL: nil},
want: map[string]string{},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := GetEpStaticMeta(tt.endpoint); !equalMaps(got, tt.want) {
t.Errorf("GetEpStaticMeta() = %v, want %v", got, tt.want)
}
})
}
}

func equalMaps(a, b map[string]string) bool {
if len(a) != len(b) {
return false
}
for k, v := range a {
if b[k] != v {
return false
}
}
return true
}

func TestGetRemoteDynamicMeta(t *testing.T) {
breezeSerialization := &serialize.BreezeSerialization{}
resultWant := map[string]string{"key": "value"}
resultBody, _ := breezeSerialization.Serialize(resultWant)
tests := []struct {
name string
cacheKey string
available bool
endpoint *endpoint.MockEndpoint
want map[string]string
wantErr error
}{
{
name: "Service Not Supported",
cacheKey: "test1",
available: true,
endpoint: &endpoint.MockEndpoint{
URL: &core.URL{
Protocol: "motan2",
Parameters: map[string]string{
core.DynamicMetaKey: "true",
core.SerializationKey: "breeze",
},
},
MockResponse: &core.MotanResponse{
Exception: &core.Exception{ErrMsg: core.ServiceNotSupport},
},
},
want: nil,
wantErr: ServiceNotSupportError,
},
{
name: "Endpoint Unavailable",
cacheKey: "test2",
available: false,
endpoint: &endpoint.MockEndpoint{
URL: &core.URL{
Protocol: "motan2",
Parameters: map[string]string{
core.DynamicMetaKey: "true",
core.SerializationKey: "breeze",
},
},
},
want: nil,
wantErr: errors.New("endpoint unavailable"),
},
{
name: "Successful Call",
cacheKey: "test3",
available: true,
endpoint: &endpoint.MockEndpoint{
URL: &core.URL{
Protocol: "motan2",
Parameters: map[string]string{
core.DynamicMetaKey: "true",
core.SerializationKey: "breeze",
},
},
MockResponse: &core.MotanResponse{
Value: &core.DeserializableValue{
Body: resultBody,
},
},
},
want: resultWant, // expected a deserialized map
wantErr: nil,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.endpoint.SetAvailable(tt.available)
got, err := getRemoteDynamicMeta(tt.cacheKey, tt.endpoint)
if err != nil && err.Error() != tt.wantErr.Error() {
t.Errorf("getRemoteDynamicMeta() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !equalMaps(got, tt.want) {
t.Errorf("getRemoteDynamicMeta() = %v, want %v", got, tt.want)
}
notSupportCache.Flush()
})
}
}
25 changes: 23 additions & 2 deletions provider/metaProvider.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,34 @@ package provider
import (
motan "github.com/weibocom/motan-go/core"
"github.com/weibocom/motan-go/meta"
"github.com/weibocom/motan-go/serialize"
)

type MetaProvider struct{}

func (m *MetaProvider) Initialize() {}

func (m *MetaProvider) Call(request motan.Request) motan.Response {
// only support motan2 protocol and breeze serialization
if request.GetRPCContext(true).IsMotanV1 {
return &motan.MotanResponse{
Exception: &motan.Exception{ErrCode: 500, ErrMsg: "meta provider not support motan1 protocol"},
}
}
serialization := &serialize.BreezeSerialization{}
b, err := serialization.Serialize(meta.GetDynamicMeta())
if err != nil {
return &motan.MotanResponse{
Exception: &motan.Exception{ErrCode: 500, ErrMsg: "meta provider serialize fail"},
}
}
resp := &motan.MotanResponse{
RequestID: request.GetRequestID(),
Value: meta.GetDynamicMeta(),
Value: b,
RPCContext: &motan.RPCContext{
Serialized: true,
SerializeNum: serialize.BreezeNumber,
},
}
return resp
}
Expand Down Expand Up @@ -46,5 +64,8 @@ func (m *MetaProvider) IsAvailable() bool {
}

func (m *MetaProvider) GetRuntimeInfo() map[string]interface{} {
return make(map[string]interface{})
info := map[string]interface{}{
motan.RuntimeNameKey: m.GetName(),
}
return info
}
Loading

0 comments on commit 7179c8d

Please sign in to comment.