Skip to content

Commit

Permalink
feat: support polaris service registration discovery
Browse files Browse the repository at this point in the history
  • Loading branch information
lbbniu committed Jun 18, 2023
1 parent 88e9920 commit bdfd320
Show file tree
Hide file tree
Showing 14 changed files with 1,105 additions and 0 deletions.
10 changes: 10 additions & 0 deletions contrib/registry/polaris/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
module github.com/TarsCloud/TarsGo/contrib/registry/polaris

go 1.16

require (
github.com/TarsCloud/TarsGo v1.3.9
github.com/polarismesh/polaris-go v1.3.0
)

replace github.com/TarsCloud/TarsGo v1.3.9 => ../../../
685 changes: 685 additions & 0 deletions contrib/registry/polaris/go.sum

Large diffs are not rendered by default.

167 changes: 167 additions & 0 deletions contrib/registry/polaris/registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package polaris

import (
"context"
"time"

"github.com/TarsCloud/TarsGo/tars/protocol/res/endpointf"
"github.com/TarsCloud/TarsGo/tars/registry"
"github.com/TarsCloud/TarsGo/tars/util/endpoint"
"github.com/polarismesh/polaris-go"
)

const (
endpointMeta = "endpoint"
)

type polarisRegistry struct {
namespace string
provider polaris.ProviderAPI
consumer polaris.ConsumerAPI
}

type RegistryOption func(pr *polarisRegistry)

func WithNamespace(namespace string) RegistryOption {
return func(pr *polarisRegistry) {
pr.namespace = namespace
}
}

func New(provider polaris.ProviderAPI, opts ...RegistryOption) registry.Registry {
consumer := polaris.NewConsumerAPIByContext(provider.SDKContext())
pr := &polarisRegistry{namespace: "tars", provider: provider, consumer: consumer}
for _, opt := range opts {
opt(pr)
}
//pr.addMiddleware()
return pr
}

/*func (pr *polarisRegistry) addMiddleware() {
tars.UseClientFilterMiddleware(func(next tars.ClientFilter) tars.ClientFilter {
return func(ctx context.Context, msg *tars.Message, invoke tars.Invoke, timeout time.Duration) (err error) {
start := time.Now()
defer func() {
delay := time.Since(start)
retStatus := model.RetSuccess
if msg.Resp.IRet != 0 {
retStatus = model.RetFail
}
ret := &polaris.ServiceCallResult{
ServiceCallResult: model.ServiceCallResult{
EmptyInstanceGauge: model.EmptyInstanceGauge{},
CalledInstance: nil, // todo: 怎么获取到或构造 Instance
Method: msg.Req.SServantName + "." + msg.Req.SFuncName,
RetStatus: retStatus,
},
}
ret.SetDelay(delay)
ret.SetRetCode(msg.Resp.IRet)
if er := pr.consumer.UpdateServiceCallResult(ret); er != nil {
TLOG.Errorf("do report service call result : %+v", er)
}
}()
return next(ctx, msg, invoke, timeout)
}
})
}*/

func (pr *polarisRegistry) Registry(_ context.Context, servant *registry.ServantInstance) error {
instance := &polaris.InstanceRegisterRequest{}
instance.Host = servant.Endpoint.Host
instance.Port = int(servant.Endpoint.Port)
instance.Protocol = &servant.Protocol
instance.Namespace = pr.namespace
instance.Service = servant.Servant
if servant.Endpoint.Weight > 0 {
weight := int(servant.Endpoint.Weight)
instance.Weight = &weight
}
if servant.Endpoint.Timeout > 0 {
timeout := time.Duration(servant.Endpoint.Timeout) * time.Millisecond
instance.Timeout = &timeout
}
instance.Metadata = createMetadata(servant)
_, err := pr.provider.RegisterInstance(instance)
return err
}

func (pr *polarisRegistry) Deregister(_ context.Context, servant *registry.ServantInstance) error {
instance := &polaris.InstanceDeRegisterRequest{}
instance.Namespace = pr.namespace
instance.Service = servant.Servant
instance.Host = servant.Endpoint.Host
instance.Port = int(servant.Endpoint.Port)
if servant.Endpoint.Timeout > 0 {
timeout := time.Duration(servant.Endpoint.Timeout) * time.Millisecond
instance.Timeout = &timeout
}
err := pr.provider.Deregister(instance)
return err
}

func (pr *polarisRegistry) QueryServant(_ context.Context, id string) (activeEp []endpointf.EndpointF, inactiveEp []endpointf.EndpointF, err error) {
req := &polaris.GetAllInstancesRequest{}
req.Namespace = pr.namespace
req.Service = id
resp, err := pr.consumer.GetAllInstances(req)
if err != nil {
return nil, nil, err
}
instances := resp.GetInstances()
for _, ins := range instances {
ep := endpoint.Parse(ins.GetMetadata()[endpointMeta])
ep.Host = ins.GetHost()
ep.Port = int32(ins.GetPort())
epf := endpoint.Endpoint2tars(ep)
if ins.IsHealthy() {
activeEp = append(activeEp, epf)
} else {
inactiveEp = append(inactiveEp, epf)
}
}
return activeEp, inactiveEp, err
}

func (pr *polarisRegistry) QueryServantBySet(_ context.Context, id, setId string) (activeEp []endpointf.EndpointF, inactiveEp []endpointf.EndpointF, err error) {
req := &polaris.GetInstancesRequest{}
req.Namespace = pr.namespace
req.Service = id
req.Metadata = map[string]string{
"internal-enable-set": "Y",
"internal-set-name": setId,
}
resp, err := pr.consumer.GetInstances(req)
if err != nil {
return nil, nil, err
}
instances := resp.GetInstances()
for _, ins := range instances {
ep := endpoint.Parse(ins.GetMetadata()[endpointMeta])
ep.Host = ins.GetHost()
ep.Port = int32(ins.GetPort())
epf := endpoint.Endpoint2tars(ep)
if ins.IsHealthy() {
activeEp = append(activeEp, epf)
} else {
inactiveEp = append(inactiveEp, epf)
}
}
return activeEp, inactiveEp, err
}

func createMetadata(servant *registry.ServantInstance) map[string]string {
metadata := make(map[string]string)
metadata["tarsVersion"] = servant.TarsVersion
metadata["app"] = servant.App
metadata["server"] = servant.Server
metadata[endpointMeta] = servant.Endpoint.String()
// polaris plugin
metadata["internal-enable-set"] = "N"
if servant.EnableSet {
metadata["internal-enable-set"] = "Y"
metadata["internal-set-name"] = servant.SetDivision
}
return metadata
}
2 changes: 2 additions & 0 deletions examples/PolarisServer/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
PolarisServer
*.log
8 changes: 8 additions & 0 deletions examples/PolarisServer/HelloObj.tars
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
module TestApp
{
interface HelloObj
{
int Add(int a,int b,out int c); // Some example function
int Sub(int a,int b,out int c); // Some example function
};
};
33 changes: 33 additions & 0 deletions examples/PolarisServer/HelloObj_imp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package main

import (
"context"
)

// HelloObjImp servant implementation
type HelloObjImp struct {
}

// Init servant init
func (imp *HelloObjImp) Init() error {
//initialize servant here:
//...
return nil
}

// Destroy servant destroy
func (imp *HelloObjImp) Destroy() {
//destroy servant here:
//...
}

func (imp *HelloObjImp) Add(ctx context.Context, a int32, b int32, c *int32) (int32, error) {
//Doing something in your function
//...
return 0, nil
}
func (imp *HelloObjImp) Sub(ctx context.Context, a int32, b int32, c *int32) (int32, error) {
//Doing something in your function
//...
return 0, nil
}
36 changes: 36 additions & 0 deletions examples/PolarisServer/client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package main

import (
"fmt"
"log"

"github.com/polarismesh/polaris-go"

pr "github.com/TarsCloud/TarsGo/contrib/registry/polaris"
"github.com/TarsCloud/TarsGo/tars"

"polarisserver/tars-protocol/TestApp"
)

func main() {
//provider, err := polaris.NewProviderAPI()
// 或者使用以下方法,则不需要创建配置文件
provider, err := polaris.NewProviderAPIByAddress("127.0.0.1:8091")
if err != nil {
log.Fatalf("fail to create providerAPI, err is %v", err)
}
defer provider.Destroy()
// 注册中心
comm := tars.NewCommunicator(tars.WithRegistry(pr.New(provider, pr.WithNamespace("tars"))))
obj := fmt.Sprintf("TestApp.PolarisServer.HelloObj")
app := new(TestApp.HelloObj)
comm.StringToProxy(obj, app)
var out, i int32
i = 123
ret, err := app.Add(i, i*2, &out)
if err != nil {
fmt.Println(err)
return
}
fmt.Println(ret, out)
}
25 changes: 25 additions & 0 deletions examples/PolarisServer/config/config.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<tars>
<application>
enableset=y
setdivision=public.ali.1
<server>
app=TestApp
server=PolarisServer
local=tcp -h 127.0.0.1 -p 10014 -t 30000
logpath=/tmp
<TestApp.PolarisServer.HelloObjAdapter>
allow
endpoint=tcp -h 127.0.0.1 -p 10015 -t 60000
handlegroup=TestApp.PolarisServer.HelloObjAdapter
maxconns=200000
protocol=tars
queuecap=10000
queuetimeout=60000
servant=TestApp.PolarisServer.HelloObj
shmcap=0
shmkey=0
threads=1
</TestApp.PolarisServer.HelloObjAdapter>
</server>
</application>
</tars>
21 changes: 21 additions & 0 deletions examples/PolarisServer/debugtool/dumpstack.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package main

import (
"fmt"

"github.com/TarsCloud/TarsGo/tars"
"github.com/TarsCloud/TarsGo/tars/protocol/res/adminf"
)

func main() {
comm := tars.NewCommunicator()
obj := "TestApp.PolarisServer.HelloObjObj@tcp -h 127.0.0.1 -p 10014 -t 60000"
app := new(adminf.AdminF)
comm.StringToProxy(obj, app)
ret, err := app.Notify("tars.dumpstack")
if err != nil {
fmt.Println(err)
return
}
fmt.Println(ret)
}
53 changes: 53 additions & 0 deletions examples/PolarisServer/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
module polarisserver

go 1.17

require (
github.com/TarsCloud/TarsGo v1.3.6
github.com/polarismesh/polaris-go v1.2.0-beta.3
)

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/dlclark/regexp2 v1.7.0 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/gin-gonic/gin v1.8.1 // indirect
github.com/go-playground/locales v0.14.0 // indirect
github.com/go-playground/universal-translator v0.18.0 // indirect
github.com/go-playground/validator/v10 v10.10.0 // indirect
github.com/goccy/go-json v0.9.7 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/leodido/go-urn v1.2.1 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/natefinch/lumberjack v2.0.0+incompatible // indirect
github.com/pelletier/go-toml/v2 v2.0.1 // indirect
github.com/prometheus/client_golang v1.12.1 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/ugorji/go/codec v1.2.7 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/automaxprocs v1.5.1 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.21.0 // indirect
golang.org/x/crypto v0.0.0-20210920023735-84f357641f63 // indirect
golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4 // indirect
golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6 // indirect
golang.org/x/text v0.3.7 // indirect
google.golang.org/genproto v0.0.0-20220504150022-98cd25cafc72 // indirect
google.golang.org/grpc v1.46.2 // indirect
google.golang.org/protobuf v1.28.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)

replace github.com/TarsCloud/TarsGo => ../../
Loading

0 comments on commit bdfd320

Please sign in to comment.