Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #1276 after healthcheck double check success, register async #1290

Merged
merged 7 commits into from
Feb 27, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@
*/
package com.alipay.sofa.boot.actuator.autoconfigure.rpc;

import com.alipay.sofa.boot.actuator.health.ReadinessCheckListener;
import com.alipay.sofa.boot.actuator.health.ReadinessEndpoint;
import com.alipay.sofa.boot.actuator.rpc.HealthCheckProviderConfigDelayRegisterChecker;
import com.alipay.sofa.boot.actuator.rpc.RpcAfterHealthCheckCallback;
import com.alipay.sofa.boot.actuator.rpc.SofaRpcEndpoint;
import com.alipay.sofa.boot.autoconfigure.rpc.SofaBootRpcProperties;
import com.alipay.sofa.boot.autoconfigure.rpc.SofaRpcAutoConfiguration;
import com.alipay.sofa.rpc.boot.context.RpcStartApplicationListener;
import org.springframework.boot.actuate.autoconfigure.endpoint.condition.ConditionalOnAvailableEndpoint;
Expand All @@ -40,6 +43,19 @@
@ConditionalOnBean(RpcStartApplicationListener.class)
public class RpcActuatorAutoConfiguration {
wangchengming666 marked this conversation as resolved.
Show resolved Hide resolved

@Bean
@ConditionalOnMissingBean
@ConditionalOnAvailableEndpoint(endpoint = ReadinessEndpoint.class)
wangchengming666 marked this conversation as resolved.
Show resolved Hide resolved
public HealthCheckProviderConfigDelayRegisterChecker providerConfigDelayRegisterCheckerSupport(ReadinessCheckListener readinessCheckListener,
SofaBootRpcProperties sofaBootRpcProperties) {
HealthCheckProviderConfigDelayRegisterChecker healthCheckProviderConfigDelayRegisterChecker = new HealthCheckProviderConfigDelayRegisterChecker();
healthCheckProviderConfigDelayRegisterChecker
.setReadinessCheckListener(readinessCheckListener);
healthCheckProviderConfigDelayRegisterChecker.setEnableDelayRegister(sofaBootRpcProperties
.getEnableDelayRegister());
return healthCheckProviderConfigDelayRegisterChecker;
}

@Bean
@ConditionalOnMissingBean
@ConditionalOnAvailableEndpoint(endpoint = ReadinessEndpoint.class)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.boot.actuator.rpc;

import com.alipay.sofa.boot.actuator.health.ReadinessCheckListener;
import com.alipay.sofa.rpc.boot.container.ProviderConfigDelayRegisterChecker;
import org.springframework.boot.availability.ReadinessState;

/**
* ProviderConfigDelayRegisterChecker的具体实现
*/
public class HealthCheckProviderConfigDelayRegisterChecker implements
ProviderConfigDelayRegisterChecker {

private ReadinessCheckListener readinessCheckListener;

private boolean enableDelayRegister;
wangchengming666 marked this conversation as resolved.
Show resolved Hide resolved

@Override
public boolean allowRegister() {
wangchengming666 marked this conversation as resolved.
Show resolved Hide resolved
return ReadinessState.ACCEPTING_TRAFFIC.equals(readinessCheckListener.getReadinessState());
}

public void setReadinessCheckListener(ReadinessCheckListener readinessCheckListener) {
wangchengming666 marked this conversation as resolved.
Show resolved Hide resolved
this.readinessCheckListener = readinessCheckListener;
}

public void setEnableDelayRegister(boolean enableDelayRegister) {
this.enableDelayRegister = enableDelayRegister;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.boot.actuator.rpc;

import com.alipay.sofa.boot.actuator.health.ReadinessCheckListener;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.boot.availability.ReadinessState;

import static org.junit.jupiter.api.Assertions.assertTrue;
wangchengming666 marked this conversation as resolved.
Show resolved Hide resolved

/**
* Test for {@link HealthCheckProviderConfigDelayRegisterChecker}
*/
@ExtendWith(MockitoExtension.class)
public class HealthCheckProviderConfigDelayRegisterCheckerTest {

@InjectMocks
private HealthCheckProviderConfigDelayRegisterChecker healthCheckProviderConfigDelayRegisterChecker;

@Mock
private ReadinessCheckListener readinessCheckListener;

@Test
public void testAllowRegister() {
// Setup
Mockito.doReturn(ReadinessState.ACCEPTING_TRAFFIC).when(readinessCheckListener)
.getReadinessState();
// Run the test
boolean result = healthCheckProviderConfigDelayRegisterChecker.allowRegister();

// Verify the results
assertTrue(result);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,19 @@ public class SofaBootRpcProperties implements EnvironmentAware {

private List<String> providerRegisterBlackList;

/**
* 是否开启延时注册功能
*/
private Boolean enableDelayRegister;
wangchengming666 marked this conversation as resolved.
Show resolved Hide resolved

public Boolean getEnableDelayRegister() {
return enableDelayRegister;
}

public void setEnableDelayRegister(Boolean enableDelayRegister) {
this.enableDelayRegister = enableDelayRegister;
}

public boolean isEnableAutoPublish() {
return enableAutoPublish;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package com.alipay.sofa.boot.autoconfigure.rpc;

import com.alipay.sofa.boot.actuator.rpc.HealthCheckProviderConfigDelayRegisterChecker;
import com.alipay.sofa.boot.autoconfigure.condition.ConditionalOnSwitch;
import com.alipay.sofa.boot.autoconfigure.rpc.SofaRpcAutoConfiguration.RegistryConfigurationImportSelector;
import com.alipay.sofa.boot.autoconfigure.runtime.SofaRuntimeAutoConfiguration;
Expand Down Expand Up @@ -55,6 +56,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* {@link EnableAutoConfiguration Auto-configuration} for sofa Rpc.
Expand All @@ -72,12 +74,18 @@ public class SofaRpcAutoConfiguration {

@Bean
@ConditionalOnMissingBean
public ProviderConfigContainer providerConfigContainer(SofaBootRpcProperties sofaBootRpcProperties) {
public ProviderConfigContainer providerConfigContainer(SofaBootRpcProperties sofaBootRpcProperties,
wangchengming666 marked this conversation as resolved.
Show resolved Hide resolved
ObjectProvider<HealthCheckProviderConfigDelayRegisterChecker> healthCheckProviderConfigDelayRegisterChecker) {
wangchengming666 marked this conversation as resolved.
Show resolved Hide resolved
ProviderConfigContainer providerConfigContainer = new ProviderConfigContainer();
providerConfigContainer.setProviderRegisterWhiteList(sofaBootRpcProperties
.getProviderRegisterWhiteList());
providerConfigContainer.setProviderRegisterBlackList(sofaBootRpcProperties
.getProviderRegisterBlackList());
providerConfigContainer
.setProviderConfigDelayRegister(healthCheckProviderConfigDelayRegisterChecker.stream()
.collect(Collectors.toList()));
providerConfigContainer.setEnableDelayRegister(sofaBootRpcProperties
.getEnableDelayRegister());
return providerConfigContainer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package com.alipay.sofa.rpc.boot.container;

import com.alipay.sofa.common.thread.SofaScheduledThreadPoolExecutor;
import com.alipay.sofa.rpc.boot.config.SofaBootRpcConfigConstants;
import com.alipay.sofa.rpc.boot.log.SofaBootRpcLoggerFactory;
import com.alipay.sofa.rpc.boot.runtime.binding.RpcBinding;
Expand All @@ -33,6 +34,7 @@
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;

/**
* ProviderConfig持有者.维护编程界面级别的RPC组件。
Expand All @@ -58,6 +60,29 @@ public class ProviderConfigContainer {
private final ConcurrentMap<String, ProviderConfig> RPC_SERVICE_CONTAINER = new ConcurrentHashMap<String, ProviderConfig>(
256);

/**
* 用来延时发布的线程池
*/
private SofaScheduledThreadPoolExecutor scheduledExecutorService;

/**
* 延时发布时健康检查
*/
private List<ProviderConfigDelayRegisterChecker> providerConfigDelayRegisterCheckerList;

/**
* 是否开启延时加载,兼容老逻辑
*/
private boolean enableDelayRegister;

public void setProviderConfigDelayRegister(List<ProviderConfigDelayRegisterChecker> providerConfigDelayRegisterCheckerList) {
wangchengming666 marked this conversation as resolved.
Show resolved Hide resolved
this.providerConfigDelayRegisterCheckerList = providerConfigDelayRegisterCheckerList;
}

public void setEnableDelayRegister(boolean enableDelayRegister) {
this.enableDelayRegister = enableDelayRegister;
}

/**
* 增加 ProviderConfig
*
Expand Down Expand Up @@ -127,32 +152,68 @@ public Collection<ProviderConfig> getAllProviderConfig() {
*/
public void publishAllProviderConfig() {
for (ProviderConfig providerConfig : getAllProviderConfig()) {

ServerConfig serverConfig = (ServerConfig) providerConfig.getServer().get(0);
if (!serverConfig.getProtocol().equalsIgnoreCase(
SofaBootRpcConfigConstants.RPC_PROTOCOL_DUBBO)) {
if (allowProviderRegister(providerConfig)) {
providerConfig.setRegister(true);
} else {
LOGGER.info("Provider will not register: [{}]", providerConfig.buildKey());
int delay = providerConfig.getDelay();
wangchengming666 marked this conversation as resolved.
Show resolved Hide resolved
// 没有配置延时加载则直接去注册中心注册服务
if (delay <= 0 && !enableDelayRegister) {
wangchengming666 marked this conversation as resolved.
Show resolved Hide resolved
doPublishProviderConfig(providerConfig);
} else {
// 根据延时时间异步去注册中心注册服务
if (scheduledExecutorService == null) {
scheduledExecutorService = new SofaScheduledThreadPoolExecutor(16);
wangchengming666 marked this conversation as resolved.
Show resolved Hide resolved
}
scheduledExecutorService.schedule(() -> doDelayPublishProviderConfig(providerConfig,
allRegisterCheckerPass(providerConfigDelayRegisterCheckerList)), delay, TimeUnit.MILLISECONDS);
}
}
}

private void doDelayPublishProviderConfig(ProviderConfig providerConfig,
wangchengming666 marked this conversation as resolved.
Show resolved Hide resolved
boolean allRegisterCheckerPass) {
if (!allRegisterCheckerPass) {
if (LOGGER.isWarnEnabled()) {
LOGGER.warn("service publish failed, interfaceId["
+ providerConfig.getInterfaceId() + "], please check.");
}
return;
}
doPublishProviderConfig(providerConfig);
}

private boolean allRegisterCheckerPass(List<ProviderConfigDelayRegisterChecker> providerConfigDelayRegisterCheckerList) {
boolean allTrue = true;
for (ProviderConfigDelayRegisterChecker providerConfigDelayRegisterChecker : providerConfigDelayRegisterCheckerList) {
if (!providerConfigDelayRegisterChecker.allowRegister()) {
allTrue = false;
break;
}
}
return allTrue;
}

List<RegistryConfig> registrys = providerConfig.getRegistry();
for (RegistryConfig registryConfig : registrys) {
private void doPublishProviderConfig(ProviderConfig providerConfig) {
ServerConfig serverConfig = (ServerConfig) providerConfig.getServer().get(0);
if (!serverConfig.getProtocol().equalsIgnoreCase(
SofaBootRpcConfigConstants.RPC_PROTOCOL_DUBBO)) {
if (allowProviderRegister(providerConfig)) {
providerConfig.setRegister(true);
} else {
LOGGER.info("Provider will not register: [{}]", providerConfig.buildKey());
}

Registry registry = RegistryFactory.getRegistry(registryConfig);
registry.init();
registry.start();
List<RegistryConfig> registrys = providerConfig.getRegistry();
for (RegistryConfig registryConfig : registrys) {

registry.register(providerConfig);
Registry registry = RegistryFactory.getRegistry(registryConfig);
registry.init();
registry.start();

if (LOGGER.isInfoEnabled()) {
LOGGER.info("service published. interfaceId["
+ providerConfig.getInterfaceId() + "]; protocol["
+ serverConfig.getProtocol() + "]");
}
}
registry.register(providerConfig);

if (LOGGER.isInfoEnabled()) {
LOGGER.info("service published. interfaceId["
+ providerConfig.getInterfaceId() + "]; protocol["
+ serverConfig.getProtocol() + "]");
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.rpc.boot.container;

/**
* 延时发布后注册服务到注册中心的健康检查
wangchengming666 marked this conversation as resolved.
Show resolved Hide resolved
*/
public interface ProviderConfigDelayRegisterChecker {

/**
* 是否允许注册服务到注册中心
*
* @return
*/
boolean allowRegister();
}
Loading