Skip to content

Commit

Permalink
add polaris subscribe (#2100)
Browse files Browse the repository at this point in the history
* Merge remote-tracking branch 'origin/3.0' into 3.0

* Merge remote-tracking branch 'origin/3.0' into 3.0

* Merge remote-tracking branch 'origin/3.0' into 3.0

* Merge remote-tracking branch 'origin/3.0' into 3.0
  • Loading branch information
jasondeng1997 committed Nov 11, 2022
1 parent c6ab462 commit 188b993
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 32 deletions.
27 changes: 17 additions & 10 deletions registry/polaris/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,32 +35,39 @@ import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/config_center"
"dubbo.apache.org/dubbo-go/v3/registry"
"dubbo.apache.org/dubbo-go/v3/remoting"
)

type polarisListener struct {
watcher *PolarisServiceWatcher
listenUrl *common.URL
events *gxchan.UnboundedChan
closeCh chan struct{}
watcher *PolarisServiceWatcher
events *gxchan.UnboundedChan
closeCh chan struct{}
}

// NewPolarisListener new polaris listener
func NewPolarisListener(url *common.URL) (*polarisListener, error) {
func NewPolarisListener(watcher *PolarisServiceWatcher) (*polarisListener, error) {
listener := &polarisListener{
listenUrl: url,
events: gxchan.NewUnboundedChan(32),
closeCh: make(chan struct{}),
watcher: watcher,
events: gxchan.NewUnboundedChan(32),
closeCh: make(chan struct{}),
}

listener.startListen()
return listener, nil
}
func (pl *polarisListener) startListen() {
pl.watcher.AddSubscriber(func(et remoting.EventType, ins []model.Instance) {
for i := range ins {
pl.events.In() <- &config_center.ConfigChangeEvent{Value: generateUrl(ins[i]), ConfigType: et}
}
})
}

// Next returns next service event once received
func (pl *polarisListener) Next() (*registry.ServiceEvent, error) {
for {
select {
case <-pl.closeCh:
logger.Warnf("polaris listener is close!listenUrl:%+v", pl.listenUrl)
logger.Warnf("polaris listener is close")
return nil, perrors.New("listener stopped")
case val := <-pl.events.Out():
e, _ := val.(*config_center.ConfigChangeEvent)
Expand Down
68 changes: 57 additions & 11 deletions registry/polaris/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,14 @@ func init() {

// newPolarisRegistry will create new instance
func newPolarisRegistry(url *common.URL) (registry.Registry, error) {
sdkCtx, _, err := polaris.GetPolarisConfig(url)
sdkCtx, ns, err := polaris.GetPolarisConfig(url)
if err != nil {
return &polarisRegistry{}, err
}
pRegistry := &polarisRegistry{
namespace: ns,
provider: api.NewProviderAPIByContext(sdkCtx),
consumer: api.NewConsumerAPIByContext(sdkCtx),
lock: &sync.RWMutex{},
registryUrls: make(map[string]*PolarisHeartbeat),
listenerLock: &sync.RWMutex{},
Expand All @@ -67,11 +69,13 @@ func newPolarisRegistry(url *common.URL) (registry.Registry, error) {
}

type polarisRegistry struct {
consumer api.ConsumerAPI
namespace string
url *common.URL
provider api.ProviderAPI
lock *sync.RWMutex
registryUrls map[string]*PolarisHeartbeat

watchers map[string]*PolarisServiceWatcher
listenerLock *sync.RWMutex
}

Expand Down Expand Up @@ -147,33 +151,49 @@ func (pr *polarisRegistry) UnRegister(conf *common.URL) error {

// Subscribe returns nil if subscribing registry successfully. If not returns an error.
func (pr *polarisRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) error {
var (
newParam api.WatchServiceRequest
newConsumer api.ConsumerAPI
)

role, _ := strconv.Atoi(url.GetParam(constant.RegistryRoleKey, ""))
if role != common.CONSUMER {
return nil
}
timer := time.NewTimer(time.Duration(RegistryConnDelay) * time.Second)
defer timer.Stop()

req := api.WatchServiceRequest{
WatchServiceRequest: model.WatchServiceRequest{
Key: model.ServiceKey{
Service: common.GetSubscribeName(url),
Namespace: pr.namespace,
},
},
}

for {
listener, err := NewPolarisListener(url)
watcher, err := newPolarisWatcher(&req, pr.consumer)

if err != nil {
logger.Warnf("getwatcher() = err:%v", perrors.WithStack(err))
<-timer.C
timer.Reset(time.Duration(RegistryConnDelay) * time.Second)
continue
}
listener, err := NewPolarisListener(watcher)

if err != nil {
logger.Warnf("getListener() = err:%v", perrors.WithStack(err))
<-time.After(time.Duration(RegistryConnDelay) * time.Second)
<-timer.C
timer.Reset(time.Duration(RegistryConnDelay) * time.Second)
continue
}

watcher, err := newPolarisWatcher(&newParam, newConsumer)
if err != nil {
logger.Warnf("getwatcher() = err:%v", perrors.WithStack(err))
timer := time.NewTimer(time.Duration(RegistryConnDelay) * time.Second)
timer.Reset(time.Duration(RegistryConnDelay) * time.Second)
continue
}
for {

for {
serviceEvent, err := listener.Next()

if err != nil {
Expand All @@ -199,6 +219,31 @@ func (pr *polarisRegistry) GetURL() *common.URL {
return pr.url
}

func (pr *polarisRegistry) createPolarisWatcher(serviceName string) (*PolarisServiceWatcher, error) {

pr.listenerLock.Lock()
defer pr.listenerLock.Unlock()

if _, exist := pr.watchers[serviceName]; !exist {
subscribeParam := &api.WatchServiceRequest{
WatchServiceRequest: model.WatchServiceRequest{
Key: model.ServiceKey{
Namespace: pr.namespace,
Service: serviceName,
},
},
}

watcher, err := newPolarisWatcher(subscribeParam, pr.consumer)
if err != nil {
return nil, err
}
pr.watchers[serviceName] = watcher
}

return pr.watchers[serviceName], nil
}

// Destroy stop polaris registry.
func (pr *polarisRegistry) Destroy() {
for _, val := range pr.registryUrls {
Expand All @@ -218,7 +263,8 @@ func (pr *polarisRegistry) IsAvailable() bool {
}

// doHeartbeat Since polaris does not support automatic reporting of instance heartbeats, separate logic is
// needed to implement it
//
// needed to implement it
func (pr *polarisRegistry) doHeartbeat(ctx context.Context, ins *api.InstanceRegisterRequest) {
ticker := time.NewTicker(time.Duration(4) * time.Second)

Expand Down
13 changes: 2 additions & 11 deletions remoting/polaris/polaris.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,7 @@ global:
grpc:
maxCallRecvMsgSize: 52428800
statReporter:
enable: true
chain:
- stat2Monitor
- serviceCache
plugin:
stat2Monitor:
metricsReportWindow: 1m
metricsNumBuckets: 12
serviceCache:
reportInterval: 3m
enable: false
consumer:
localCache:
type: inmemory
Expand Down Expand Up @@ -92,4 +83,4 @@ consumer:
type: subscribeLocalChannel
plugin:
subscribeLocalChannel:
channelBufferSize: 50
channelBufferSize: 50

0 comments on commit 188b993

Please sign in to comment.