diff --git a/cluster/cluster/adaptivesvc/cluster_invoker.go b/cluster/cluster/adaptivesvc/cluster_invoker.go index 30f07725a6..d21c8d7b56 100644 --- a/cluster/cluster/adaptivesvc/cluster_invoker.go +++ b/cluster/cluster/adaptivesvc/cluster_invoker.go @@ -36,6 +36,8 @@ import ( "dubbo.apache.org/dubbo-go/v3/protocol" ) +var _ protocol.Invoker = (*adaptiveServiceClusterInvoker)(nil) + type adaptiveServiceClusterInvoker struct { base.BaseClusterInvoker } diff --git a/cluster/cluster/zoneaware/cluster_interceptor.go b/cluster/cluster/zoneaware/cluster_interceptor.go index 67844d2b60..c92edc7e69 100644 --- a/cluster/cluster/zoneaware/cluster_interceptor.go +++ b/cluster/cluster/zoneaware/cluster_interceptor.go @@ -38,11 +38,11 @@ func (z *interceptor) Invoke(ctx context.Context, invoker protocol.Invoker, invo switch value := force.(type) { case bool: if value { - invocation.SetAttachments(key, "true") + invocation.SetAttachment(key, "true") } case string: if "true" == value { - invocation.SetAttachments(key, "true") + invocation.SetAttachment(key, "true") } default: // ignore diff --git a/cluster/cluster/zoneaware/cluster_invoker.go b/cluster/cluster/zoneaware/cluster_invoker.go index e8cd3fd88c..f6cbacf1f5 100644 --- a/cluster/cluster/zoneaware/cluster_invoker.go +++ b/cluster/cluster/zoneaware/cluster_invoker.go @@ -65,7 +65,7 @@ func (invoker *zoneawareClusterInvoker) Invoke(ctx context.Context, invocation p // providers in the registry with the same zone key := constant.RegistryKey + "." + constant.RegistryZoneKey - zone := invocation.AttachmentsByKey(key, "") + zone := invocation.GetAttachmentWithDefaultValue(key, "") if "" != zone { for _, invoker := range invokers { if invoker.IsAvailable() && matchParam(zone, key, "", invoker) { @@ -73,7 +73,7 @@ func (invoker *zoneawareClusterInvoker) Invoke(ctx context.Context, invocation p } } - force := invocation.AttachmentsByKey(constant.RegistryKey+"."+constant.RegistryZoneForceKey, "") + force := invocation.GetAttachmentWithDefaultValue(constant.RegistryKey+"."+constant.RegistryZoneForceKey, "") if "true" == force { return &protocol.RPCResult{ Err: fmt.Errorf("no registry instance in zone or "+ diff --git a/cluster/cluster/zoneaware/cluster_invoker_test.go b/cluster/cluster/zoneaware/cluster_invoker_test.go index d32a3e7658..705218eb93 100644 --- a/cluster/cluster/zoneaware/cluster_invoker_test.go +++ b/cluster/cluster/zoneaware/cluster_invoker_test.go @@ -177,7 +177,7 @@ func TestZoneWareInvokerWithZoneSuccess(t *testing.T) { inv := &invocation.RPCInvocation{} // zone hangzhou hz := zoneArray[0] - inv.SetAttachments(constant.RegistryKey+"."+constant.RegistryZoneKey, hz) + inv.SetAttachment(constant.RegistryKey+"."+constant.RegistryZoneKey, hz) result := clusterInvoker.Invoke(context.Background(), inv) @@ -206,9 +206,9 @@ func TestZoneWareInvokerWithZoneForceFail(t *testing.T) { inv := &invocation.RPCInvocation{} // zone hangzhou - inv.SetAttachments(constant.RegistryKey+"."+constant.RegistryZoneKey, "hangzhou") + inv.SetAttachment(constant.RegistryKey+"."+constant.RegistryZoneKey, "hangzhou") // zone force - inv.SetAttachments(constant.RegistryKey+"."+constant.RegistryZoneForceKey, "true") + inv.SetAttachment(constant.RegistryKey+"."+constant.RegistryZoneForceKey, "true") result := clusterInvoker.Invoke(context.Background(), inv) diff --git a/common/proxy/proxy.go b/common/proxy/proxy.go index 1676e75fb4..3b39a4c00b 100644 --- a/common/proxy/proxy.go +++ b/common/proxy/proxy.go @@ -181,19 +181,19 @@ func DefaultProxyImplementFunc(p *Proxy, v common.RPCService) { } for k, value := range p.attachments { - inv.SetAttachments(k, value) + inv.SetAttachment(k, value) } // add user setAttachment. It is compatibility with previous versions. atm := invCtx.Value(constant.AttachmentKey) if m, ok := atm.(map[string]string); ok { for k, value := range m { - inv.SetAttachments(k, value) + inv.SetAttachment(k, value) } } else if m2, ok2 := atm.(map[string]interface{}); ok2 { // it is support to transfer map[string]interface{}. It refers to dubbo-java 2.7. for k, value := range m2 { - inv.SetAttachments(k, value) + inv.SetAttachment(k, value) } } diff --git a/common/proxy/proxy_factory/pass_through.go b/common/proxy/proxy_factory/pass_through.go index 6ad28dccbc..41a2b57316 100644 --- a/common/proxy/proxy_factory/pass_through.go +++ b/common/proxy/proxy_factory/pass_through.go @@ -102,7 +102,7 @@ func (pi *PassThroughProxyInvoker) Invoke(ctx context.Context, invocation protoc in := make([]reflect.Value, 5) in = append(in, srv.Rcvr()) in = append(in, reflect.ValueOf(invocation.MethodName())) - in = append(in, reflect.ValueOf(invocation.Attachment(constant.ParamsTypeKey))) + in = append(in, reflect.ValueOf(invocation.GetAttachmentInterface(constant.ParamsTypeKey))) in = append(in, reflect.ValueOf(args)) in = append(in, reflect.ValueOf(invocation.Attachments())) diff --git a/filter/active/filter.go b/filter/active/filter.go index 4228c194c0..8ee781b822 100644 --- a/filter/active/filter.go +++ b/filter/active/filter.go @@ -59,14 +59,14 @@ func newActiveFilter() filter.Filter { // Invoke starts to record the requests status func (f *activeFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { - invocation.(*invocation2.RPCInvocation).SetAttachments(dubboInvokeStartTime, strconv.FormatInt(protocol.CurrentTimeMillis(), 10)) + invocation.(*invocation2.RPCInvocation).SetAttachment(dubboInvokeStartTime, strconv.FormatInt(protocol.CurrentTimeMillis(), 10)) protocol.BeginCount(invoker.GetURL(), invocation.MethodName()) return invoker.Invoke(ctx, invocation) } // OnResponse update the active count base on the request result. func (f *activeFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { - startTime, err := strconv.ParseInt(invocation.(*invocation2.RPCInvocation).AttachmentsByKey(dubboInvokeStartTime, "0"), 10, 64) + startTime, err := strconv.ParseInt(invocation.(*invocation2.RPCInvocation).GetAttachmentWithDefaultValue(dubboInvokeStartTime, "0"), 10, 64) if err != nil { result.SetError(err) logger.Errorf("parse dubbo_invoke_start_time to int64 failed") diff --git a/filter/active/filter_test.go b/filter/active/filter_test.go index 1d751aa8e6..a4b0f5cdb2 100644 --- a/filter/active/filter_test.go +++ b/filter/active/filter_test.go @@ -47,7 +47,7 @@ func TestFilterInvoke(t *testing.T) { invoker.EXPECT().Invoke(gomock.Any()).Return(nil) invoker.EXPECT().GetUrl().Return(url).Times(1) filter.Invoke(context.Background(), invoker, invoc) - assert.True(t, invoc.AttachmentsByKey(dubboInvokeStartTime, "") != "") + assert.True(t, invoc.GetAttachmentWithDefaultValue(dubboInvokeStartTime, "") != "") } func TestFilterOnResponse(t *testing.T) { diff --git a/filter/adaptivesvc/filter.go b/filter/adaptivesvc/filter.go index 04dde942ba..17472ba486 100644 --- a/filter/adaptivesvc/filter.go +++ b/filter/adaptivesvc/filter.go @@ -91,7 +91,7 @@ func (f *adaptiveServiceProviderFilter) Invoke(ctx context.Context, invoker prot func (f *adaptiveServiceProviderFilter) OnResponse(_ context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { // get updater from the attributes - updaterIface := invocation.AttributeByKey(constant.AdaptiveServiceUpdaterKey, nil) + updaterIface, _ := invocation.GetAttribute(constant.AdaptiveServiceUpdaterKey) if updaterIface == nil { return &protocol.RPCResult{Err: ErrUpdaterNotFound} } diff --git a/filter/auth/default_authenticator.go b/filter/auth/default_authenticator.go index 71da50b116..db178c7d55 100644 --- a/filter/auth/default_authenticator.go +++ b/filter/auth/default_authenticator.go @@ -69,10 +69,10 @@ func (authenticator *defaultAuthenticator) Sign(invocation protocol.Invocation, if err != nil { return err } - inv.SetAttachments(constant.RequestSignatureKey, signature) - inv.SetAttachments(constant.RequestTimestampKey, currentTimeMillis) - inv.SetAttachments(constant.AKKey, accessKeyPair.AccessKey) - inv.SetAttachments(constant.Consumer, consumer) + inv.SetAttachment(constant.RequestSignatureKey, signature) + inv.SetAttachment(constant.RequestTimestampKey, currentTimeMillis) + inv.SetAttachment(constant.AKKey, accessKeyPair.AccessKey) + inv.SetAttachment(constant.Consumer, consumer) return nil } @@ -97,11 +97,11 @@ func getSignature(url *common.URL, invocation protocol.Invocation, secrectKey st // Authenticate verifies whether the signature sent by the requester is correct func (authenticator *defaultAuthenticator) Authenticate(invocation protocol.Invocation, url *common.URL) error { - accessKeyId := invocation.AttachmentsByKey(constant.AKKey, "") + accessKeyId := invocation.GetAttachmentWithDefaultValue(constant.AKKey, "") - requestTimestamp := invocation.AttachmentsByKey(constant.RequestTimestampKey, "") - originSignature := invocation.AttachmentsByKey(constant.RequestSignatureKey, "") - consumer := invocation.AttachmentsByKey(constant.Consumer, "") + requestTimestamp := invocation.GetAttachmentWithDefaultValue(constant.RequestTimestampKey, "") + originSignature := invocation.GetAttachmentWithDefaultValue(constant.RequestSignatureKey, "") + consumer := invocation.GetAttachmentWithDefaultValue(constant.Consumer, "") if IsEmpty(accessKeyId, false) || IsEmpty(consumer, false) || IsEmpty(requestTimestamp, false) || IsEmpty(originSignature, false) { return errors.New("failed to authenticate your ak/sk, maybe the consumer has not enabled the auth") diff --git a/filter/auth/default_authenticator_test.go b/filter/auth/default_authenticator_test.go index 57197f11b0..3ff17b9fe9 100644 --- a/filter/auth/default_authenticator_test.go +++ b/filter/auth/default_authenticator_test.go @@ -79,10 +79,10 @@ func TestDefaultAuthenticator_Sign(t *testing.T) { testurl.SetParam(constant.ParameterSignatureEnableKey, "false") inv := invocation.NewRPCInvocation("test", []interface{}{"OK"}, nil) _ = authenticator.Sign(inv, testurl) - assert.NotEqual(t, inv.AttachmentsByKey(constant.RequestSignatureKey, ""), "") - assert.NotEqual(t, inv.AttachmentsByKey(constant.Consumer, ""), "") - assert.NotEqual(t, inv.AttachmentsByKey(constant.RequestTimestampKey, ""), "") - assert.Equal(t, inv.AttachmentsByKey(constant.AKKey, ""), "akey") + assert.NotEqual(t, inv.GetAttachmentWithDefaultValue(constant.RequestSignatureKey, ""), "") + assert.NotEqual(t, inv.GetAttachmentWithDefaultValue(constant.Consumer, ""), "") + assert.NotEqual(t, inv.GetAttachmentWithDefaultValue(constant.RequestTimestampKey, ""), "") + assert.Equal(t, inv.GetAttachmentWithDefaultValue(constant.AKKey, ""), "akey") } func Test_getAccessKeyPairSuccess(t *testing.T) { diff --git a/filter/generic/filter.go b/filter/generic/filter.go index c63b30c53c..ee93b23eaf 100644 --- a/filter/generic/filter.go +++ b/filter/generic/filter.go @@ -67,7 +67,7 @@ func (f *genericFilter) Invoke(ctx context.Context, invoker protocol.Invoker, in args := make([]hessian.Object, 0, len(oldargs)) // get generic info from attachments of invocation, the default value is "true" - generic := invocation.AttachmentsByKey(constant.GenericKey, constant.GenericSerializationDefault) + generic := invocation.GetAttachmentWithDefaultValue(constant.GenericKey, constant.GenericSerializationDefault) // get generalizer according to value in the `generic` g := getGeneralizer(generic) diff --git a/filter/generic/filter_test.go b/filter/generic/filter_test.go index cccd21a7e1..4b99f952b8 100644 --- a/filter/generic/filter_test.go +++ b/filter/generic/filter_test.go @@ -60,7 +60,7 @@ func TestFilter_Invoke(t *testing.T) { assert.Equal(t, "Hello", args[0]) assert.Equal(t, "java.lang.String", args[1].([]string)[0]) assert.Equal(t, "arg1", args[2].([]hessian.Object)[0].(string)) - assert.Equal(t, constant.GenericSerializationDefault, invocation.AttachmentsByKey(constant.GenericKey, "")) + assert.Equal(t, constant.GenericSerializationDefault, invocation.GetAttachmentWithDefaultValue(constant.GenericKey, "")) return &protocol.RPCResult{} }) @@ -93,7 +93,7 @@ func TestFilter_InvokeWithGenericCall(t *testing.T) { assert.Equal(t, "hello", args[0]) assert.Equal(t, "java.lang.String", args[1].([]string)[0]) assert.Equal(t, "arg1", args[2].([]string)[0]) - assert.Equal(t, constant.GenericSerializationDefault, invocation.AttachmentsByKey(constant.GenericKey, "")) + assert.Equal(t, constant.GenericSerializationDefault, invocation.GetAttachmentWithDefaultValue(constant.GenericKey, "")) return &protocol.RPCResult{} }) diff --git a/filter/generic/service_filter.go b/filter/generic/service_filter.go index d9f1bbade8..b6293e7648 100644 --- a/filter/generic/service_filter.go +++ b/filter/generic/service_filter.go @@ -87,7 +87,7 @@ func (f *genericServiceFilter) Invoke(ctx context.Context, invoker protocol.Invo argsType := method.ArgsType() // get generic info from attachments of invocation, the default value is "true" - generic := invocation.AttachmentsByKey(constant.GenericKey, constant.GenericSerializationDefault) + generic := invocation.GetAttachmentWithDefaultValue(constant.GenericKey, constant.GenericSerializationDefault) // get generalizer according to value in the `generic` g := getGeneralizer(generic) @@ -126,7 +126,7 @@ func (f *genericServiceFilter) Invoke(ctx context.Context, invoker protocol.Invo func (f *genericServiceFilter) OnResponse(_ context.Context, result protocol.Result, _ protocol.Invoker, invocation protocol.Invocation) protocol.Result { if invocation.IsGenericInvocation() && result.Result() != nil { // get generic info from attachments of invocation, the default value is "true" - generic := invocation.AttachmentsByKey(constant.GenericKey, constant.GenericSerializationDefault) + generic := invocation.GetAttachmentWithDefaultValue(constant.GenericKey, constant.GenericSerializationDefault) // get generalizer according to value in the `generic` g := getGeneralizer(generic) diff --git a/filter/seata/filter.go b/filter/seata/filter.go index b1a75e489a..f05b738877 100644 --- a/filter/seata/filter.go +++ b/filter/seata/filter.go @@ -58,7 +58,7 @@ func newSeataFilter() filter.Filter { // Invoke Get Xid by attachment key `SEATA_XID`. When use Seata, transfer xid by attachments func (f *seataFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { - xid := invocation.AttachmentsByKey(string(SEATA_XID), "") + xid := invocation.GetAttachmentWithDefaultValue(string(SEATA_XID), "") if len(strings.TrimSpace(xid)) > 0 { logger.Debugf("Method: %v,Xid: %v", invocation.MethodName(), xid) return invoker.Invoke(context.WithValue(ctx, SEATA_XID, xid), invocation) diff --git a/protocol/dubbo/dubbo_codec.go b/protocol/dubbo/dubbo_codec.go index 477e6c71be..b35d909295 100644 --- a/protocol/dubbo/dubbo_codec.go +++ b/protocol/dubbo/dubbo_codec.go @@ -65,12 +65,12 @@ func (c *DubboCodec) EncodeRequest(request *remoting.Request) (*bytes.Buffer, er invocation := *invoc svc := impl.Service{} - svc.Path = invocation.AttachmentsByKey(constant.PathKey, "") - svc.Interface = invocation.AttachmentsByKey(constant.InterfaceKey, "") - svc.Version = invocation.AttachmentsByKey(constant.VersionKey, "") - svc.Group = invocation.AttachmentsByKey(constant.GroupKey, "") + svc.Path = invocation.GetAttachmentWithDefaultValue(constant.PathKey, "") + svc.Interface = invocation.GetAttachmentWithDefaultValue(constant.InterfaceKey, "") + svc.Version = invocation.GetAttachmentWithDefaultValue(constant.VersionKey, "") + svc.Group = invocation.GetAttachmentWithDefaultValue(constant.GroupKey, "") svc.Method = invocation.MethodName() - timeout, err := strconv.Atoi(invocation.AttachmentsByKey(constant.TimeoutKey, strconv.Itoa(constant.DefaultRemotingTimeout))) + timeout, err := strconv.Atoi(invocation.GetAttachmentWithDefaultValue(constant.TimeoutKey, strconv.Itoa(constant.DefaultRemotingTimeout))) if err != nil { // it will be wrapped in readwrite.Write . return nil, perrors.WithStack(err) @@ -78,7 +78,7 @@ func (c *DubboCodec) EncodeRequest(request *remoting.Request) (*bytes.Buffer, er svc.Timeout = time.Duration(timeout) header := impl.DubboHeader{} - serialization := invocation.AttachmentsByKey(constant.SerializationKey, constant.Hessian2Serialization) + serialization := invocation.GetAttachmentWithDefaultValue(constant.SerializationKey, constant.Hessian2Serialization) if serialization == constant.ProtobufSerialization { header.SerialID = constant.SProto } else { @@ -254,20 +254,20 @@ func (c *DubboCodec) decodeResponse(data []byte) (*remoting.Response, int, error Status: pkg.Header.ResponseStatus, Event: (pkg.Header.Type & impl.PackageHeartbeat) != 0, } - var error error + var pkgerr error if pkg.Header.Type&impl.PackageHeartbeat != 0x00 { if pkg.Header.Type&impl.PackageResponse != 0x00 { logger.Debugf("get rpc heartbeat response{header: %#v, body: %#v}", pkg.Header, pkg.Body) if pkg.Err != nil { logger.Errorf("rpc heartbeat response{error: %#v}", pkg.Err) - error = pkg.Err + pkgerr = pkg.Err } } else { logger.Debugf("get rpc heartbeat request{header: %#v, service: %#v, body: %#v}", pkg.Header, pkg.Service, pkg.Body) response.Status = hessian.Response_OK // reply(session, p, hessian.PackageHeartbeat) } - return response, hessian.HEADER_LENGTH + pkg.Header.BodyLen, error + return response, hessian.HEADER_LENGTH + pkg.Header.BodyLen, pkgerr } logger.Debugf("get rpc response{header: %#v, body: %#v}", pkg.Header, pkg.Body) rpcResult := &protocol.RPCResult{} diff --git a/protocol/dubbo/dubbo_invoker.go b/protocol/dubbo/dubbo_invoker.go index 904bfb036c..6d74f71ce2 100644 --- a/protocol/dubbo/dubbo_invoker.go +++ b/protocol/dubbo/dubbo_invoker.go @@ -117,10 +117,10 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati inv := invocation.(*invocation_impl.RPCInvocation) // init param - inv.SetAttachments(constant.PathKey, di.GetURL().GetParam(constant.InterfaceKey, "")) + inv.SetAttachment(constant.PathKey, di.GetURL().GetParam(constant.InterfaceKey, "")) for _, k := range attachmentKey { if v := di.GetURL().GetParam(k, ""); len(v) > 0 { - inv.SetAttachments(k, v) + inv.SetAttachment(k, v) } } @@ -133,7 +133,7 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati url.SetParam(constant.SerializationKey, constant.Hessian2Serialization) } // async - async, err := strconv.ParseBool(inv.AttachmentsByKey(constant.AsyncKey, "false")) + async, err := strconv.ParseBool(inv.GetAttachmentWithDefaultValue(constant.AsyncKey, "false")) if err != nil { logger.Errorf("ParseBool - error: %v", err) async = false @@ -172,12 +172,12 @@ func (di *DubboInvoker) getTimeout(invocation *invocation_impl.RPCInvocation) ti if len(timeout) != 0 { if t, err := time.ParseDuration(timeout); err == nil { // config timeout into attachment - invocation.SetAttachments(constant.TimeoutKey, strconv.Itoa(int(t.Milliseconds()))) + invocation.SetAttachment(constant.TimeoutKey, strconv.Itoa(int(t.Milliseconds()))) return t } } // set timeout into invocation at method level - invocation.SetAttachments(constant.TimeoutKey, strconv.Itoa(int(di.timeout.Milliseconds()))) + invocation.SetAttachment(constant.TimeoutKey, strconv.Itoa(int(di.timeout.Milliseconds()))) return di.timeout } diff --git a/protocol/dubbo/dubbo_invoker_test.go b/protocol/dubbo/dubbo_invoker_test.go index 5f5dd86cb7..8d12bba69e 100644 --- a/protocol/dubbo/dubbo_invoker_test.go +++ b/protocol/dubbo/dubbo_invoker_test.go @@ -63,7 +63,7 @@ package dubbo // assert.Equal(t, User{ID: "1", Name: "username"}, *res.Result().(*User)) // // // CallOneway -// inv.SetAttachments(constant.ASYNC_KEY, "true") +// inv.SetAttachment(constant.ASYNC_KEY, "true") // res = invoker.Invoke(context.Background(), inv) // assert.NoError(t, res.Error()) // @@ -81,7 +81,7 @@ package dubbo // assert.NoError(t, res.Error()) // // // Err_No_Reply -// inv.SetAttachments(constant.ASYNC_KEY, "false") +// inv.SetAttachment(constant.ASYNC_KEY, "false") // inv.SetReply(nil) // res = invoker.Invoke(context.Background(), inv) // assert.EqualError(t, res.Error(), "request need @response") diff --git a/protocol/dubbo3/dubbo3_invoker.go b/protocol/dubbo3/dubbo3_invoker.go index 7b158388cf..247a159096 100644 --- a/protocol/dubbo3/dubbo3_invoker.go +++ b/protocol/dubbo3/dubbo3_invoker.go @@ -205,12 +205,12 @@ func (di *DubboInvoker) getTimeout(invocation *invocation_impl.RPCInvocation) ti if len(timeout) != 0 { if t, err := time.ParseDuration(timeout); err == nil { // config timeout into attachment - invocation.SetAttachments(constant.TimeoutKey, strconv.Itoa(int(t.Milliseconds()))) + invocation.SetAttachment(constant.TimeoutKey, strconv.Itoa(int(t.Milliseconds()))) return t } } // set timeout into invocation at method level - invocation.SetAttachments(constant.TimeoutKey, strconv.Itoa(int(di.timeout.Milliseconds()))) + invocation.SetAttachment(constant.TimeoutKey, strconv.Itoa(int(di.timeout.Milliseconds()))) return di.timeout } diff --git a/protocol/invocation.go b/protocol/invocation.go index a85b6b6bbb..4580250d32 100644 --- a/protocol/invocation.go +++ b/protocol/invocation.go @@ -38,18 +38,23 @@ type Invocation interface { // Reply gets response of request Reply() interface{} // Attachments gets all attachments - Attachments() map[string]interface{} - // AttachmentsByKey gets attachment by key , if nil then return default value. (It will be deprecated in the future) - AttachmentsByKey(string, string) string - Attachment(string) interface{} - // Attributes refers to dubbo 2.7.6. It is different from attachment. It is used in internal process. - Attributes() map[string]interface{} - // AttributeByKey gets attribute by key , if nil then return default value - AttributeByKey(string, interface{}) interface{} - // SetAttachments sets attribute by @key and @value. - SetAttachments(key string, value interface{}) + // Invoker gets the invoker in current context. Invoker() Invoker // IsGenericInvocation gets if this is a generic invocation IsGenericInvocation() bool + + Attachments() map[string]interface{} + SetAttachment(key string, value interface{}) + GetAttachment(key string) (string, bool) + GetAttachmentInterface(string) interface{} + GetAttachmentWithDefaultValue(key string, defaultValue string) string + + // Attributes firstly introduced on dubbo-java 2.7.6. It is + // used in internal invocation, that is, it's not passed between + // server and client. + Attributes() map[string]interface{} + SetAttribute(key string, value interface{}) + GetAttribute(key string) (interface{}, bool) + GetAttributeWithDefaultValue(key string, defaultValue interface{}) interface{} } diff --git a/protocol/invocation/rpcinvocation.go b/protocol/invocation/rpcinvocation.go index aa1f0d9900..399e3a1335 100644 --- a/protocol/invocation/rpcinvocation.go +++ b/protocol/invocation/rpcinvocation.go @@ -129,32 +129,14 @@ func (r *RPCInvocation) Attachments() map[string]interface{} { return r.attachments } -// AttachmentsByKey gets RPC attachment by key, if nil then return default value. -func (r *RPCInvocation) AttachmentsByKey(key string, defaultValue string) string { - r.lock.RLock() - defer r.lock.RUnlock() - if r.attachments == nil { - return defaultValue - } - value, ok := r.attachments[key] - if ok { - return value.(string) - } - return defaultValue -} - // Attachment returns the corresponding value from dubbo's attachment with the given key. -func (r *RPCInvocation) Attachment(key string) interface{} { +func (r *RPCInvocation) GetAttachmentInterface(key string) interface{} { r.lock.RLock() defer r.lock.RUnlock() if r.attachments == nil { return nil } - value, ok := r.attachments[key] - if ok { - return value - } - return nil + return r.attachments[key] } // Attributes gets all attributes of RPC. @@ -162,34 +144,6 @@ func (r *RPCInvocation) Attributes() map[string]interface{} { return r.attributes } -// AttributeByKey gets attribute by @key. If it is not exist, it will return default value. -func (r *RPCInvocation) AttributeByKey(key string, defaultValue interface{}) interface{} { - r.lock.RLock() - defer r.lock.RUnlock() - value, ok := r.attributes[key] - if ok { - return value - } - return defaultValue -} - -// SetAttachments sets attribute by @key and @value. -func (r *RPCInvocation) SetAttachments(key string, value interface{}) { - r.lock.Lock() - defer r.lock.Unlock() - if r.attachments == nil { - r.attachments = make(map[string]interface{}) - } - r.attachments[key] = value -} - -// SetAttribute sets attribute by @key and @value. -func (r *RPCInvocation) SetAttribute(key string, value interface{}) { - r.lock.Lock() - defer r.lock.Unlock() - r.attributes[key] = value -} - // Invoker gets the invoker in current context. func (r *RPCInvocation) Invoker() protocol.Invoker { return r.invoker @@ -213,8 +167,76 @@ func (r *RPCInvocation) SetCallBack(c interface{}) { } func (r *RPCInvocation) ServiceKey() string { - return common.ServiceKey(strings.TrimPrefix(r.AttachmentsByKey(constant.PathKey, r.AttachmentsByKey(constant.InterfaceKey, "")), "/"), - r.AttachmentsByKey(constant.GroupKey, ""), r.AttachmentsByKey(constant.VersionKey, "")) + return common.ServiceKey(strings.TrimPrefix(r.GetAttachmentWithDefaultValue(constant.PathKey, r.GetAttachmentWithDefaultValue(constant.InterfaceKey, "")), "/"), + r.GetAttachmentWithDefaultValue(constant.GroupKey, ""), r.GetAttachmentWithDefaultValue(constant.VersionKey, "")) +} + +func (r *RPCInvocation) SetAttachment(key string, value interface{}) { + r.lock.Lock() + defer r.lock.Unlock() + if r.attachments == nil { + r.attachments = make(map[string]interface{}) + } + r.attachments[key] = value +} + +func (r *RPCInvocation) GetAttachment(key string) (string, bool) { + r.lock.RLock() + defer r.lock.RUnlock() + if r.attachments == nil { + return "", false + } + if value, ok := r.attachments[key]; ok { + if str, ok := value.(string); ok { + return str, true + } + } + return "", false +} + +func (r *RPCInvocation) GetAttachmentWithDefaultValue(key string, defaultValue string) string { + r.lock.RLock() + defer r.lock.RUnlock() + if r.attachments == nil { + return defaultValue + } + if value, ok := r.attachments[key]; ok { + if str, ok := value.(string); ok { + return str + } + } + return defaultValue +} + +func (r *RPCInvocation) SetAttribute(key string, value interface{}) { + r.lock.Lock() + defer r.lock.Unlock() + if r.attributes == nil { + r.attributes = make(map[string]interface{}) + } + r.attributes[key] = value +} + +func (r *RPCInvocation) GetAttribute(key string) (interface{}, bool) { + r.lock.RLock() + defer r.lock.RUnlock() + if r.attributes == nil { + return nil, false + } + value, ok := r.attributes[key] + return value, ok +} + +func (r *RPCInvocation) GetAttributeWithDefaultValue(key string, defaultValue interface{}) interface{} { + r.lock.RLock() + defer r.lock.RUnlock() + if r.attributes == nil { + return defaultValue + } + if value, ok := r.attachments[key]; ok { + return value + } + return defaultValue } // ///////////////////////// diff --git a/protocol/result.go b/protocol/result.go index a2169a155f..709c7ecf9d 100644 --- a/protocol/result.go +++ b/protocol/result.go @@ -71,7 +71,7 @@ func (r *RPCResult) Result() interface{} { return r.Rest } -// SetAttachments replaces the existing attachments with the specified param. +// SetAttachment replaces the existing attachments with the specified param. func (r *RPCResult) SetAttachments(attr map[string]interface{}) { r.Attrs = attr } diff --git a/remoting/getty/dubbo_codec_for_test.go b/remoting/getty/dubbo_codec_for_test.go index 6a90a0b995..de3783e04d 100644 --- a/remoting/getty/dubbo_codec_for_test.go +++ b/remoting/getty/dubbo_codec_for_test.go @@ -59,12 +59,12 @@ func (c *DubboTestCodec) EncodeRequest(request *remoting.Request) (*bytes.Buffer tmpInvocation := invoc svc := impl.Service{} - svc.Path = tmpInvocation.AttachmentsByKey(constant.PathKey, "") - svc.Interface = tmpInvocation.AttachmentsByKey(constant.InterfaceKey, "") - svc.Version = tmpInvocation.AttachmentsByKey(constant.VersionKey, "") - svc.Group = tmpInvocation.AttachmentsByKey(constant.GroupKey, "") + svc.Path = tmpInvocation.GetAttachmentWithDefaultValue(constant.PathKey, "") + svc.Interface = tmpInvocation.GetAttachmentWithDefaultValue(constant.InterfaceKey, "") + svc.Version = tmpInvocation.GetAttachmentWithDefaultValue(constant.VersionKey, "") + svc.Group = tmpInvocation.GetAttachmentWithDefaultValue(constant.GroupKey, "") svc.Method = tmpInvocation.MethodName() - timeout, err := strconv.Atoi(tmpInvocation.AttachmentsByKey(constant.TimeoutKey, strconv.Itoa(constant.DefaultRemotingTimeout))) + timeout, err := strconv.Atoi(tmpInvocation.GetAttachmentWithDefaultValue(constant.TimeoutKey, strconv.Itoa(constant.DefaultRemotingTimeout))) if err != nil { // it will be wrapped in readwrite.Write . return nil, perrors.WithStack(err) @@ -72,7 +72,7 @@ func (c *DubboTestCodec) EncodeRequest(request *remoting.Request) (*bytes.Buffer svc.Timeout = time.Duration(timeout) header := impl.DubboHeader{} - serialization := tmpInvocation.AttachmentsByKey(constant.SerializationKey, constant.Hessian2Serialization) + serialization := tmpInvocation.GetAttachmentWithDefaultValue(constant.SerializationKey, constant.Hessian2Serialization) if serialization == constant.ProtobufSerialization { header.SerialID = constant.SProto } else { diff --git a/remoting/getty/getty_client_test.go b/remoting/getty/getty_client_test.go index 346bd7c4c0..3d4216b403 100644 --- a/remoting/getty/getty_client_test.go +++ b/remoting/getty/getty_client_test.go @@ -76,7 +76,7 @@ func createInvocation(methodName string, callback interface{}, reply interface{} func setAttachment(invocation *invocation.RPCInvocation, attachments map[string]string) { for key, value := range attachments { - invocation.SetAttachments(key, value) + invocation.SetAttachment(key, value) } }