Skip to content

Commit

Permalink
refactor: invocation interface (#1702)
Browse files Browse the repository at this point in the history
* refactor: invocation interface

* fix: set attachements use interface{}

* fix: reset result set attachments
  • Loading branch information
justxuewei authored Jan 15, 2022
1 parent 0f93d1c commit f271fc1
Show file tree
Hide file tree
Showing 24 changed files with 148 additions and 119 deletions.
2 changes: 2 additions & 0 deletions cluster/cluster/adaptivesvc/cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import (
"dubbo.apache.org/dubbo-go/v3/protocol"
)

var _ protocol.Invoker = (*adaptiveServiceClusterInvoker)(nil)

type adaptiveServiceClusterInvoker struct {
base.BaseClusterInvoker
}
Expand Down
4 changes: 2 additions & 2 deletions cluster/cluster/zoneaware/cluster_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ func (z *interceptor) Invoke(ctx context.Context, invoker protocol.Invoker, invo
switch value := force.(type) {
case bool:
if value {
invocation.SetAttachments(key, "true")
invocation.SetAttachment(key, "true")
}
case string:
if "true" == value {
invocation.SetAttachments(key, "true")
invocation.SetAttachment(key, "true")
}
default:
// ignore
Expand Down
4 changes: 2 additions & 2 deletions cluster/cluster/zoneaware/cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,15 @@ func (invoker *zoneawareClusterInvoker) Invoke(ctx context.Context, invocation p

// providers in the registry with the same zone
key := constant.RegistryKey + "." + constant.RegistryZoneKey
zone := invocation.AttachmentsByKey(key, "")
zone := invocation.GetAttachmentWithDefaultValue(key, "")
if "" != zone {
for _, invoker := range invokers {
if invoker.IsAvailable() && matchParam(zone, key, "", invoker) {
return invoker.Invoke(ctx, invocation)
}
}

force := invocation.AttachmentsByKey(constant.RegistryKey+"."+constant.RegistryZoneForceKey, "")
force := invocation.GetAttachmentWithDefaultValue(constant.RegistryKey+"."+constant.RegistryZoneForceKey, "")
if "true" == force {
return &protocol.RPCResult{
Err: fmt.Errorf("no registry instance in zone or "+
Expand Down
6 changes: 3 additions & 3 deletions cluster/cluster/zoneaware/cluster_invoker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func TestZoneWareInvokerWithZoneSuccess(t *testing.T) {
inv := &invocation.RPCInvocation{}
// zone hangzhou
hz := zoneArray[0]
inv.SetAttachments(constant.RegistryKey+"."+constant.RegistryZoneKey, hz)
inv.SetAttachment(constant.RegistryKey+"."+constant.RegistryZoneKey, hz)

result := clusterInvoker.Invoke(context.Background(), inv)

Expand Down Expand Up @@ -206,9 +206,9 @@ func TestZoneWareInvokerWithZoneForceFail(t *testing.T) {

inv := &invocation.RPCInvocation{}
// zone hangzhou
inv.SetAttachments(constant.RegistryKey+"."+constant.RegistryZoneKey, "hangzhou")
inv.SetAttachment(constant.RegistryKey+"."+constant.RegistryZoneKey, "hangzhou")
// zone force
inv.SetAttachments(constant.RegistryKey+"."+constant.RegistryZoneForceKey, "true")
inv.SetAttachment(constant.RegistryKey+"."+constant.RegistryZoneForceKey, "true")

result := clusterInvoker.Invoke(context.Background(), inv)

Expand Down
6 changes: 3 additions & 3 deletions common/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,19 +181,19 @@ func DefaultProxyImplementFunc(p *Proxy, v common.RPCService) {
}

for k, value := range p.attachments {
inv.SetAttachments(k, value)
inv.SetAttachment(k, value)
}

// add user setAttachment. It is compatibility with previous versions.
atm := invCtx.Value(constant.AttachmentKey)
if m, ok := atm.(map[string]string); ok {
for k, value := range m {
inv.SetAttachments(k, value)
inv.SetAttachment(k, value)
}
} else if m2, ok2 := atm.(map[string]interface{}); ok2 {
// it is support to transfer map[string]interface{}. It refers to dubbo-java 2.7.
for k, value := range m2 {
inv.SetAttachments(k, value)
inv.SetAttachment(k, value)
}
}

Expand Down
2 changes: 1 addition & 1 deletion common/proxy/proxy_factory/pass_through.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (pi *PassThroughProxyInvoker) Invoke(ctx context.Context, invocation protoc
in := make([]reflect.Value, 5)
in = append(in, srv.Rcvr())
in = append(in, reflect.ValueOf(invocation.MethodName()))
in = append(in, reflect.ValueOf(invocation.Attachment(constant.ParamsTypeKey)))
in = append(in, reflect.ValueOf(invocation.GetAttachmentInterface(constant.ParamsTypeKey)))
in = append(in, reflect.ValueOf(args))
in = append(in, reflect.ValueOf(invocation.Attachments()))

Expand Down
4 changes: 2 additions & 2 deletions filter/active/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ func newActiveFilter() filter.Filter {

// Invoke starts to record the requests status
func (f *activeFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
invocation.(*invocation2.RPCInvocation).SetAttachments(dubboInvokeStartTime, strconv.FormatInt(protocol.CurrentTimeMillis(), 10))
invocation.(*invocation2.RPCInvocation).SetAttachment(dubboInvokeStartTime, strconv.FormatInt(protocol.CurrentTimeMillis(), 10))
protocol.BeginCount(invoker.GetURL(), invocation.MethodName())
return invoker.Invoke(ctx, invocation)
}

// OnResponse update the active count base on the request result.
func (f *activeFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
startTime, err := strconv.ParseInt(invocation.(*invocation2.RPCInvocation).AttachmentsByKey(dubboInvokeStartTime, "0"), 10, 64)
startTime, err := strconv.ParseInt(invocation.(*invocation2.RPCInvocation).GetAttachmentWithDefaultValue(dubboInvokeStartTime, "0"), 10, 64)
if err != nil {
result.SetError(err)
logger.Errorf("parse dubbo_invoke_start_time to int64 failed")
Expand Down
2 changes: 1 addition & 1 deletion filter/active/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestFilterInvoke(t *testing.T) {
invoker.EXPECT().Invoke(gomock.Any()).Return(nil)
invoker.EXPECT().GetUrl().Return(url).Times(1)
filter.Invoke(context.Background(), invoker, invoc)
assert.True(t, invoc.AttachmentsByKey(dubboInvokeStartTime, "") != "")
assert.True(t, invoc.GetAttachmentWithDefaultValue(dubboInvokeStartTime, "") != "")
}

func TestFilterOnResponse(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion filter/adaptivesvc/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (f *adaptiveServiceProviderFilter) Invoke(ctx context.Context, invoker prot
func (f *adaptiveServiceProviderFilter) OnResponse(_ context.Context, result protocol.Result, invoker protocol.Invoker,
invocation protocol.Invocation) protocol.Result {
// get updater from the attributes
updaterIface := invocation.AttributeByKey(constant.AdaptiveServiceUpdaterKey, nil)
updaterIface, _ := invocation.GetAttribute(constant.AdaptiveServiceUpdaterKey)
if updaterIface == nil {
return &protocol.RPCResult{Err: ErrUpdaterNotFound}
}
Expand Down
16 changes: 8 additions & 8 deletions filter/auth/default_authenticator.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,10 @@ func (authenticator *defaultAuthenticator) Sign(invocation protocol.Invocation,
if err != nil {
return err
}
inv.SetAttachments(constant.RequestSignatureKey, signature)
inv.SetAttachments(constant.RequestTimestampKey, currentTimeMillis)
inv.SetAttachments(constant.AKKey, accessKeyPair.AccessKey)
inv.SetAttachments(constant.Consumer, consumer)
inv.SetAttachment(constant.RequestSignatureKey, signature)
inv.SetAttachment(constant.RequestTimestampKey, currentTimeMillis)
inv.SetAttachment(constant.AKKey, accessKeyPair.AccessKey)
inv.SetAttachment(constant.Consumer, consumer)
return nil
}

Expand All @@ -97,11 +97,11 @@ func getSignature(url *common.URL, invocation protocol.Invocation, secrectKey st

// Authenticate verifies whether the signature sent by the requester is correct
func (authenticator *defaultAuthenticator) Authenticate(invocation protocol.Invocation, url *common.URL) error {
accessKeyId := invocation.AttachmentsByKey(constant.AKKey, "")
accessKeyId := invocation.GetAttachmentWithDefaultValue(constant.AKKey, "")

requestTimestamp := invocation.AttachmentsByKey(constant.RequestTimestampKey, "")
originSignature := invocation.AttachmentsByKey(constant.RequestSignatureKey, "")
consumer := invocation.AttachmentsByKey(constant.Consumer, "")
requestTimestamp := invocation.GetAttachmentWithDefaultValue(constant.RequestTimestampKey, "")
originSignature := invocation.GetAttachmentWithDefaultValue(constant.RequestSignatureKey, "")
consumer := invocation.GetAttachmentWithDefaultValue(constant.Consumer, "")
if IsEmpty(accessKeyId, false) || IsEmpty(consumer, false) ||
IsEmpty(requestTimestamp, false) || IsEmpty(originSignature, false) {
return errors.New("failed to authenticate your ak/sk, maybe the consumer has not enabled the auth")
Expand Down
8 changes: 4 additions & 4 deletions filter/auth/default_authenticator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,10 @@ func TestDefaultAuthenticator_Sign(t *testing.T) {
testurl.SetParam(constant.ParameterSignatureEnableKey, "false")
inv := invocation.NewRPCInvocation("test", []interface{}{"OK"}, nil)
_ = authenticator.Sign(inv, testurl)
assert.NotEqual(t, inv.AttachmentsByKey(constant.RequestSignatureKey, ""), "")
assert.NotEqual(t, inv.AttachmentsByKey(constant.Consumer, ""), "")
assert.NotEqual(t, inv.AttachmentsByKey(constant.RequestTimestampKey, ""), "")
assert.Equal(t, inv.AttachmentsByKey(constant.AKKey, ""), "akey")
assert.NotEqual(t, inv.GetAttachmentWithDefaultValue(constant.RequestSignatureKey, ""), "")
assert.NotEqual(t, inv.GetAttachmentWithDefaultValue(constant.Consumer, ""), "")
assert.NotEqual(t, inv.GetAttachmentWithDefaultValue(constant.RequestTimestampKey, ""), "")
assert.Equal(t, inv.GetAttachmentWithDefaultValue(constant.AKKey, ""), "akey")
}

func Test_getAccessKeyPairSuccess(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion filter/generic/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (f *genericFilter) Invoke(ctx context.Context, invoker protocol.Invoker, in
args := make([]hessian.Object, 0, len(oldargs))

// get generic info from attachments of invocation, the default value is "true"
generic := invocation.AttachmentsByKey(constant.GenericKey, constant.GenericSerializationDefault)
generic := invocation.GetAttachmentWithDefaultValue(constant.GenericKey, constant.GenericSerializationDefault)
// get generalizer according to value in the `generic`
g := getGeneralizer(generic)

Expand Down
4 changes: 2 additions & 2 deletions filter/generic/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestFilter_Invoke(t *testing.T) {
assert.Equal(t, "Hello", args[0])
assert.Equal(t, "java.lang.String", args[1].([]string)[0])
assert.Equal(t, "arg1", args[2].([]hessian.Object)[0].(string))
assert.Equal(t, constant.GenericSerializationDefault, invocation.AttachmentsByKey(constant.GenericKey, ""))
assert.Equal(t, constant.GenericSerializationDefault, invocation.GetAttachmentWithDefaultValue(constant.GenericKey, ""))
return &protocol.RPCResult{}
})

Expand Down Expand Up @@ -93,7 +93,7 @@ func TestFilter_InvokeWithGenericCall(t *testing.T) {
assert.Equal(t, "hello", args[0])
assert.Equal(t, "java.lang.String", args[1].([]string)[0])
assert.Equal(t, "arg1", args[2].([]string)[0])
assert.Equal(t, constant.GenericSerializationDefault, invocation.AttachmentsByKey(constant.GenericKey, ""))
assert.Equal(t, constant.GenericSerializationDefault, invocation.GetAttachmentWithDefaultValue(constant.GenericKey, ""))
return &protocol.RPCResult{}
})

Expand Down
4 changes: 2 additions & 2 deletions filter/generic/service_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (f *genericServiceFilter) Invoke(ctx context.Context, invoker protocol.Invo
argsType := method.ArgsType()

// get generic info from attachments of invocation, the default value is "true"
generic := invocation.AttachmentsByKey(constant.GenericKey, constant.GenericSerializationDefault)
generic := invocation.GetAttachmentWithDefaultValue(constant.GenericKey, constant.GenericSerializationDefault)
// get generalizer according to value in the `generic`
g := getGeneralizer(generic)

Expand Down Expand Up @@ -126,7 +126,7 @@ func (f *genericServiceFilter) Invoke(ctx context.Context, invoker protocol.Invo
func (f *genericServiceFilter) OnResponse(_ context.Context, result protocol.Result, _ protocol.Invoker, invocation protocol.Invocation) protocol.Result {
if invocation.IsGenericInvocation() && result.Result() != nil {
// get generic info from attachments of invocation, the default value is "true"
generic := invocation.AttachmentsByKey(constant.GenericKey, constant.GenericSerializationDefault)
generic := invocation.GetAttachmentWithDefaultValue(constant.GenericKey, constant.GenericSerializationDefault)
// get generalizer according to value in the `generic`
g := getGeneralizer(generic)

Expand Down
2 changes: 1 addition & 1 deletion filter/seata/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func newSeataFilter() filter.Filter {

// Invoke Get Xid by attachment key `SEATA_XID`. When use Seata, transfer xid by attachments
func (f *seataFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
xid := invocation.AttachmentsByKey(string(SEATA_XID), "")
xid := invocation.GetAttachmentWithDefaultValue(string(SEATA_XID), "")
if len(strings.TrimSpace(xid)) > 0 {
logger.Debugf("Method: %v,Xid: %v", invocation.MethodName(), xid)
return invoker.Invoke(context.WithValue(ctx, SEATA_XID, xid), invocation)
Expand Down
18 changes: 9 additions & 9 deletions protocol/dubbo/dubbo_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,20 +65,20 @@ func (c *DubboCodec) EncodeRequest(request *remoting.Request) (*bytes.Buffer, er
invocation := *invoc

svc := impl.Service{}
svc.Path = invocation.AttachmentsByKey(constant.PathKey, "")
svc.Interface = invocation.AttachmentsByKey(constant.InterfaceKey, "")
svc.Version = invocation.AttachmentsByKey(constant.VersionKey, "")
svc.Group = invocation.AttachmentsByKey(constant.GroupKey, "")
svc.Path = invocation.GetAttachmentWithDefaultValue(constant.PathKey, "")
svc.Interface = invocation.GetAttachmentWithDefaultValue(constant.InterfaceKey, "")
svc.Version = invocation.GetAttachmentWithDefaultValue(constant.VersionKey, "")
svc.Group = invocation.GetAttachmentWithDefaultValue(constant.GroupKey, "")
svc.Method = invocation.MethodName()
timeout, err := strconv.Atoi(invocation.AttachmentsByKey(constant.TimeoutKey, strconv.Itoa(constant.DefaultRemotingTimeout)))
timeout, err := strconv.Atoi(invocation.GetAttachmentWithDefaultValue(constant.TimeoutKey, strconv.Itoa(constant.DefaultRemotingTimeout)))
if err != nil {
// it will be wrapped in readwrite.Write .
return nil, perrors.WithStack(err)
}
svc.Timeout = time.Duration(timeout)

header := impl.DubboHeader{}
serialization := invocation.AttachmentsByKey(constant.SerializationKey, constant.Hessian2Serialization)
serialization := invocation.GetAttachmentWithDefaultValue(constant.SerializationKey, constant.Hessian2Serialization)
if serialization == constant.ProtobufSerialization {
header.SerialID = constant.SProto
} else {
Expand Down Expand Up @@ -254,20 +254,20 @@ func (c *DubboCodec) decodeResponse(data []byte) (*remoting.Response, int, error
Status: pkg.Header.ResponseStatus,
Event: (pkg.Header.Type & impl.PackageHeartbeat) != 0,
}
var error error
var pkgerr error
if pkg.Header.Type&impl.PackageHeartbeat != 0x00 {
if pkg.Header.Type&impl.PackageResponse != 0x00 {
logger.Debugf("get rpc heartbeat response{header: %#v, body: %#v}", pkg.Header, pkg.Body)
if pkg.Err != nil {
logger.Errorf("rpc heartbeat response{error: %#v}", pkg.Err)
error = pkg.Err
pkgerr = pkg.Err
}
} else {
logger.Debugf("get rpc heartbeat request{header: %#v, service: %#v, body: %#v}", pkg.Header, pkg.Service, pkg.Body)
response.Status = hessian.Response_OK
// reply(session, p, hessian.PackageHeartbeat)
}
return response, hessian.HEADER_LENGTH + pkg.Header.BodyLen, error
return response, hessian.HEADER_LENGTH + pkg.Header.BodyLen, pkgerr
}
logger.Debugf("get rpc response{header: %#v, body: %#v}", pkg.Header, pkg.Body)
rpcResult := &protocol.RPCResult{}
Expand Down
10 changes: 5 additions & 5 deletions protocol/dubbo/dubbo_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,10 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati

inv := invocation.(*invocation_impl.RPCInvocation)
// init param
inv.SetAttachments(constant.PathKey, di.GetURL().GetParam(constant.InterfaceKey, ""))
inv.SetAttachment(constant.PathKey, di.GetURL().GetParam(constant.InterfaceKey, ""))
for _, k := range attachmentKey {
if v := di.GetURL().GetParam(k, ""); len(v) > 0 {
inv.SetAttachments(k, v)
inv.SetAttachment(k, v)
}
}

Expand All @@ -133,7 +133,7 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
url.SetParam(constant.SerializationKey, constant.Hessian2Serialization)
}
// async
async, err := strconv.ParseBool(inv.AttachmentsByKey(constant.AsyncKey, "false"))
async, err := strconv.ParseBool(inv.GetAttachmentWithDefaultValue(constant.AsyncKey, "false"))
if err != nil {
logger.Errorf("ParseBool - error: %v", err)
async = false
Expand Down Expand Up @@ -172,12 +172,12 @@ func (di *DubboInvoker) getTimeout(invocation *invocation_impl.RPCInvocation) ti
if len(timeout) != 0 {
if t, err := time.ParseDuration(timeout); err == nil {
// config timeout into attachment
invocation.SetAttachments(constant.TimeoutKey, strconv.Itoa(int(t.Milliseconds())))
invocation.SetAttachment(constant.TimeoutKey, strconv.Itoa(int(t.Milliseconds())))
return t
}
}
// set timeout into invocation at method level
invocation.SetAttachments(constant.TimeoutKey, strconv.Itoa(int(di.timeout.Milliseconds())))
invocation.SetAttachment(constant.TimeoutKey, strconv.Itoa(int(di.timeout.Milliseconds())))
return di.timeout
}

Expand Down
4 changes: 2 additions & 2 deletions protocol/dubbo/dubbo_invoker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ package dubbo
// assert.Equal(t, User{ID: "1", Name: "username"}, *res.Result().(*User))
//
// // CallOneway
// inv.SetAttachments(constant.ASYNC_KEY, "true")
// inv.SetAttachment(constant.ASYNC_KEY, "true")
// res = invoker.Invoke(context.Background(), inv)
// assert.NoError(t, res.Error())
//
Expand All @@ -81,7 +81,7 @@ package dubbo
// assert.NoError(t, res.Error())
//
// // Err_No_Reply
// inv.SetAttachments(constant.ASYNC_KEY, "false")
// inv.SetAttachment(constant.ASYNC_KEY, "false")
// inv.SetReply(nil)
// res = invoker.Invoke(context.Background(), inv)
// assert.EqualError(t, res.Error(), "request need @response")
Expand Down
4 changes: 2 additions & 2 deletions protocol/dubbo3/dubbo3_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,12 +217,12 @@ func (di *DubboInvoker) getTimeout(invocation *invocation_impl.RPCInvocation) ti
if len(timeout) != 0 {
if t, err := time.ParseDuration(timeout); err == nil {
// config timeout into attachment
invocation.SetAttachments(constant.TimeoutKey, strconv.Itoa(int(t.Milliseconds())))
invocation.SetAttachment(constant.TimeoutKey, strconv.Itoa(int(t.Milliseconds())))
return t
}
}
// set timeout into invocation at method level
invocation.SetAttachments(constant.TimeoutKey, strconv.Itoa(int(di.timeout.Milliseconds())))
invocation.SetAttachment(constant.TimeoutKey, strconv.Itoa(int(di.timeout.Milliseconds())))
return di.timeout
}

Expand Down
25 changes: 15 additions & 10 deletions protocol/invocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,23 @@ type Invocation interface {
// Reply gets response of request
Reply() interface{}
// Attachments gets all attachments
Attachments() map[string]interface{}
// AttachmentsByKey gets attachment by key , if nil then return default value. (It will be deprecated in the future)
AttachmentsByKey(string, string) string
Attachment(string) interface{}
// Attributes refers to dubbo 2.7.6. It is different from attachment. It is used in internal process.
Attributes() map[string]interface{}
// AttributeByKey gets attribute by key , if nil then return default value
AttributeByKey(string, interface{}) interface{}
// SetAttachments sets attribute by @key and @value.
SetAttachments(key string, value interface{})

// Invoker gets the invoker in current context.
Invoker() Invoker
// IsGenericInvocation gets if this is a generic invocation
IsGenericInvocation() bool

Attachments() map[string]interface{}
SetAttachment(key string, value interface{})
GetAttachment(key string) (string, bool)
GetAttachmentInterface(string) interface{}
GetAttachmentWithDefaultValue(key string, defaultValue string) string

// Attributes firstly introduced on dubbo-java 2.7.6. It is
// used in internal invocation, that is, it's not passed between
// server and client.
Attributes() map[string]interface{}
SetAttribute(key string, value interface{})
GetAttribute(key string) (interface{}, bool)
GetAttributeWithDefaultValue(key string, defaultValue interface{}) interface{}
}
Loading

0 comments on commit f271fc1

Please sign in to comment.