-
Notifications
You must be signed in to change notification settings - Fork 930
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add polaris subscribe #2100
add polaris subscribe #2100
Changes from 14 commits
b8c8f5c
72c68f5
30a8e04
47f0b69
e78a6f3
1587b99
18fbee3
da71acf
8f3c0a5
5343be2
6a6b659
1ad48fd
f461883
0833961
b968e3c
acb2d86
e4156a2
fcfbef8
73eff57
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -52,12 +52,14 @@ func init() { | |
|
||
// newPolarisRegistry will create new instance | ||
func newPolarisRegistry(url *common.URL) (registry.Registry, error) { | ||
sdkCtx, _, err := polaris.GetPolarisConfig(url) | ||
sdkCtx, ns, err := polaris.GetPolarisConfig(url) | ||
if err != nil { | ||
return &polarisRegistry{}, err | ||
} | ||
pRegistry := &polarisRegistry{ | ||
namespace: ns, | ||
provider: api.NewProviderAPIByContext(sdkCtx), | ||
consumer: api.NewConsumerAPIByContext(sdkCtx), | ||
lock: &sync.RWMutex{}, | ||
registryUrls: make(map[string]*PolarisHeartbeat), | ||
listenerLock: &sync.RWMutex{}, | ||
|
@@ -67,11 +69,13 @@ func newPolarisRegistry(url *common.URL) (registry.Registry, error) { | |
} | ||
|
||
type polarisRegistry struct { | ||
consumer api.ConsumerAPI | ||
namespace string | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 这个 ns 信息在哪里初始化呢? |
||
url *common.URL | ||
provider api.ProviderAPI | ||
lock *sync.RWMutex | ||
registryUrls map[string]*PolarisHeartbeat | ||
|
||
watchers map[string]*PolarisServiceWatcher | ||
listenerLock *sync.RWMutex | ||
} | ||
|
||
|
@@ -147,33 +151,49 @@ func (pr *polarisRegistry) UnRegister(conf *common.URL) error { | |
|
||
// Subscribe returns nil if subscribing registry successfully. If not returns an error. | ||
func (pr *polarisRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) error { | ||
var ( | ||
newParam api.WatchServiceRequest | ||
newConsumer api.ConsumerAPI | ||
) | ||
|
||
role, _ := strconv.Atoi(url.GetParam(constant.RegistryRoleKey, "")) | ||
if role != common.CONSUMER { | ||
return nil | ||
} | ||
timer := time.NewTimer(time.Duration(RegistryConnDelay) * time.Second) | ||
defer timer.Stop() | ||
|
||
req := api.WatchServiceRequest{ | ||
WatchServiceRequest: model.WatchServiceRequest{ | ||
Key: model.ServiceKey{ | ||
Service: common.GetSubscribeName(url), | ||
Namespace: pr.namespace, | ||
}, | ||
}, | ||
} | ||
|
||
for { | ||
listener, err := NewPolarisListener(url) | ||
watcher, err := newPolarisWatcher(&req, pr.consumer) | ||
|
||
if err != nil { | ||
logger.Warnf("getwatcher() = err:%v", perrors.WithStack(err)) | ||
<-timer.C | ||
timer.Reset(time.Duration(RegistryConnDelay) * time.Second) | ||
continue | ||
} | ||
listener, err := NewPolarisListener(watcher) | ||
|
||
if err != nil { | ||
logger.Warnf("getListener() = err:%v", perrors.WithStack(err)) | ||
<-time.After(time.Duration(RegistryConnDelay) * time.Second) | ||
<-timer.C | ||
timer.Reset(time.Duration(RegistryConnDelay) * time.Second) | ||
continue | ||
} | ||
|
||
watcher, err := newPolarisWatcher(&newParam, newConsumer) | ||
if err != nil { | ||
logger.Warnf("getwatcher() = err:%v", perrors.WithStack(err)) | ||
timer := time.NewTimer(time.Duration(RegistryConnDelay) * time.Second) | ||
timer.Reset(time.Duration(RegistryConnDelay) * time.Second) | ||
continue | ||
} | ||
for { | ||
|
||
for { | ||
serviceEvent, err := listener.Next() | ||
|
||
if err != nil { | ||
|
@@ -199,6 +219,31 @@ func (pr *polarisRegistry) GetURL() *common.URL { | |
return pr.url | ||
} | ||
|
||
func (pr *polarisRegistry) createPolarisWatcher(serviceName string) (*PolarisServiceWatcher, error) { | ||
|
||
pr.listenerLock.Lock() | ||
defer pr.listenerLock.Unlock() | ||
|
||
if _, exist := pr.watchers[serviceName]; !exist { | ||
subscribeParam := &api.WatchServiceRequest{ | ||
WatchServiceRequest: model.WatchServiceRequest{ | ||
Key: model.ServiceKey{ | ||
Namespace: pr.namespace, | ||
Service: serviceName, | ||
}, | ||
}, | ||
} | ||
|
||
watcher, err := newPolarisWatcher(subscribeParam, pr.consumer) | ||
if err != nil { | ||
return nil, err | ||
} | ||
pr.watchers[serviceName] = watcher | ||
} | ||
|
||
return pr.watchers[serviceName], nil | ||
} | ||
|
||
// Destroy stop polaris registry. | ||
func (pr *polarisRegistry) Destroy() { | ||
for _, val := range pr.registryUrls { | ||
|
@@ -218,7 +263,8 @@ func (pr *polarisRegistry) IsAvailable() bool { | |
} | ||
|
||
// doHeartbeat Since polaris does not support automatic reporting of instance heartbeats, separate logic is | ||
// needed to implement it | ||
// | ||
// needed to implement it | ||
func (pr *polarisRegistry) doHeartbeat(ctx context.Context, ins *api.InstanceRegisterRequest) { | ||
ticker := time.NewTicker(time.Duration(4) * time.Second) | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,17 +30,6 @@ global: | |
plugin: | ||
grpc: | ||
maxCallRecvMsgSize: 52428800 | ||
statReporter: | ||
enable: true | ||
chain: | ||
- stat2Monitor | ||
- serviceCache | ||
plugin: | ||
stat2Monitor: | ||
metricsReportWindow: 1m | ||
metricsNumBuckets: 12 | ||
serviceCache: | ||
reportInterval: 3m | ||
consumer: | ||
localCache: | ||
type: inmemory | ||
|
@@ -93,3 +82,5 @@ consumer: | |
plugin: | ||
subscribeLocalChannel: | ||
channelBufferSize: 50 | ||
statReporter: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 放错位置了,就在原来的基础上改,不要调整 statReporter 的位置 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok |
||
enable: false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个日志打印调整下
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok