Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support active health check and merge with outlier detection #5

Merged
merged 6 commits into from
Sep 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 134 additions & 0 deletions examples/consumer/health_check.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
//
// Licensed under the BSD 3-Clause License (the "License"); you may not use this file
// except in compliance with the License. You may obtain a copy of the License at
//
// https://opensource.org/licenses/BSD-3-Clause
//
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the specific
// language governing permissions and limitations under the License.
//

#include <signal.h>
#include <stdlib.h>
#include <unistd.h>

#include <iostream>
#include <string>

#include "polaris/consumer.h"

bool signal_received = false;
void SignalHandler(int signum) {
std::cout << "Interrupt signal (" << signum << ") received." << std::endl;
signal_received = true;
}

int main(int argc, char** argv) {
signal(SIGINT, SignalHandler); // 注册ctrl+c事件回调,触发进程退出
if (argc < 3) {
std::cout << "usage: " << argv[0] << " service_namespace service_name [interval]" << std::endl;
return -1;
}
polaris::ServiceKey service_key = {argv[1], argv[2]};
int interval = argc >= 4 ? atoi(argv[3]) : 1000;

// 本示例展示使用北极星SDK进行服务发现的基本步骤

// 第一步:创建线程安全的Consumer对象
// 该方法检查当前程序【运行路径】下是否有polaris.yaml文件
// 如果有则加载该文件配置中的配置项覆盖默认配置,如果没有则使用默认配置
// 注意:其他创建方法参考头文件"polaris/consumer.h"中ConsumerApi::CreateXXX系列方法注释
std::string err_msg, content =
"global:\n"
" serverConnector:\n"
"consumer:\n"
" healthCheck:\n"
" when: always\n"
" interval: 1s\n"
" chain:\n"
" - http\n"
" plugin:\n"
" http:\n"
" path: /health\n"
" circuitBreaker:\n"
" enable: true\n"
" checkPeriod: 1s";
polaris::Config* config = polaris::Config::CreateFromString(content, err_msg);
polaris::Context* context = polaris::Context::Create(config);
delete config; // 创建完成后即可释放config对象
if (context == NULL) { // 创建错误,创建失败原因可查看日志~/polaris/log/polaris.log
abort();
}
// 再以共享模式Context创建ConsumerApi,用户自己维护Context的生命周期,该context还可以用于创建ProviderApi
polaris::ConsumerApi* consumer = polaris::ConsumerApi::Create(context);
if (consumer == NULL) {
std::cout << "create consumer api failed" << std::endl;
return -1;
}

// 【第二步】可选,预拉取服务数据。如果访问的被调服务是已知的,建议加上这个步骤
polaris::GetOneInstanceRequest request(service_key);
polaris::Instance instance;
polaris::ReturnCode ret;
if ((ret = consumer->InitService(request)) != polaris::kReturnOk) {
std::cout << "init service with error:" << polaris::ReturnCodeToMsg(ret).c_str() << std::endl;
delete consumer;
return -1;
}

// 调用接口
timespec ts;
uint64_t begin, end;
while (!signal_received) {
clock_gettime(CLOCK_REALTIME, &ts);
begin = ts.tv_sec * 1000000 + ts.tv_nsec / 1000;

// 【第三步】RPC调用前调用北极星接口获取一个被调服务实例,会执行服务路由和负载均衡
if ((ret = consumer->GetOneInstance(request, instance)) != polaris::kReturnOk) {
std::cout << "get instance for service with error:" << polaris::ReturnCodeToMsg(ret).c_str()
<< std::endl;
sleep(1);
continue;
}

clock_gettime(CLOCK_REALTIME, &ts);
end = ts.tv_sec * 1000000 + ts.tv_nsec / 1000;
std::cout << "get instance, ip:" << instance.GetHost() << ", port:" << instance.GetPort()
<< ", use time:" << end - begin << "us" << std::endl;

// 【第四步】使用获取到的服务实例,进行RPC,并获取RPC调用结果返回值和耗时
int rpc_result = 0;
clock_gettime(CLOCK_REALTIME, &ts);
begin = ts.tv_sec * 1000000 + ts.tv_nsec / 1000;
// ret_code = RPC_CALL(instance.GetHost(), instance.GetPort());
clock_gettime(CLOCK_REALTIME, &ts);
end = ts.tv_sec * 1000000 + ts.tv_nsec / 1000;

// 【第五步】上报使用该被调服务实例进行RPC调用结果
// 注意:本调用没有网络操作,只将结果写入本地内存
// 如果RPC是异步的,则异步RPC结束后进行上报即可
polaris::ServiceCallResult result;
result.SetServiceNamespace(service_key.namespace_);
result.SetServiceName(service_key.name_);
result.SetInstanceId(instance.GetId());
result.SetDelay(end - begin); // 上报延迟
result.SetRetCode(rpc_result); // 上报调用返回码
if (rpc_result >= 0) { // 【注意】这里假设返回码大于0时,rpc调用正常,RPC正常也要上报
result.SetRetStatus(polaris::kCallRetOk);
} else { // rpc调用出错,例如网络错误,超时等上报给北极星用于剔除故障节点
result.SetRetStatus(polaris::kCallRetError);
}
if ((ret = consumer->UpdateServiceCallResult(result)) != polaris::kReturnOk) {
std::cout << "update call result for instance with error:" << ret
<< " msg:" << polaris::ReturnCodeToMsg(ret).c_str() << std::endl;
}

usleep(interval * 1000);
}

delete consumer;
return 0;
}
8 changes: 4 additions & 4 deletions include/polaris/context.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,15 @@ class CircuitBreakerChain {
virtual void PrepareServicePbConfTrigger() = 0;
};

class OutlierDetectorChain {
class HealthCheckerChain {
public:
virtual ~OutlierDetectorChain() {}
virtual ~HealthCheckerChain() {}

virtual ReturnCode Init(Config* config, Context* context) = 0;

virtual ReturnCode DetectInstance() = 0;

virtual std::vector<OutlierDetector*> GetOutlierDetectors() = 0;
virtual std::vector<HealthChecker*> GetHealthCheckers() = 0;
};

class ServiceContextImpl;
Expand All @@ -161,7 +161,7 @@ class ServiceContext : public ServiceBase {

CircuitBreakerChain* GetCircuitBreakerChain();

OutlierDetectorChain* GetOutlierDetectorChain();
HealthCheckerChain* GetHealthCheckerChain();

ServiceRouterChain* GetServiceRouterChain();

Expand Down
1 change: 1 addition & 0 deletions include/polaris/defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ enum ReturnCode {
kReturnNotInit = 1288, ///< 资源未初始化
kReturnResourceNotFound = 1289, // 资源未找到
kReturnServerUnknownError = 1299, ///< 服务端返回客户端未知的错误
kReturnSystemServiceNotConfigured = 1300, ///< 没有配置系统服务名字
};

/// @brief 返回码转换为字符串消息
Expand Down
6 changes: 3 additions & 3 deletions include/polaris/plugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ enum PluginType {
kPluginLocalRegistry, ///< 本地缓存扩展点
kPluginServiceRouter, ///< 服务路由扩展点
kPluginLoadBalancer, ///< 负载均衡扩展点
kPluginOutlierDetector, ///< 健康探测扩展点
kPluginHealthChecker, ///< 健康探测扩展点
kPluginCircuitBreaker, ///< 节点熔断扩展点
kPluginWeightAdjuster, ///< 动态权重调整扩展点
kPluginStatReporter, ///< 统计上报扩展点
Expand Down Expand Up @@ -411,10 +411,10 @@ struct DetectResult {
};

/// @brief 扩展点接口:主动健康探测策略
class OutlierDetector : public Plugin {
class HealthChecker : public Plugin {
public:
/// @brief 析构函数
virtual ~OutlierDetector() {}
virtual ~HealthChecker() {}

/// @brief 通过配置进行初始化
virtual ReturnCode Init(Config* config, Context* context) = 0;
Expand Down
3 changes: 3 additions & 0 deletions polaris/api/consumer_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,9 @@ ReturnCode ConsumerApiImpl::UpdateServiceCallResult(Context* context, const Inst
ReturnCode ConsumerApiImpl::GetSystemServer(Context* context, const ServiceKey& service_key,
const Criteria& criteria, Instance*& instance,
uint64_t timeout, const std::string& protocol) {
if (service_key.name_.empty() || service_key.namespace_.empty()) {
return kReturnSystemServiceNotConfigured;
}
ContextImpl* context_impl = context->GetContextImpl();
context_impl->RcuEnter();
ServiceContext* service_context = context->GetOrCreateServiceContext(service_key);
Expand Down
32 changes: 16 additions & 16 deletions polaris/context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
#include "monitor/api_stat_registry.h"
#include "monitor/service_record.h"
#include "plugin/circuit_breaker/circuit_breaker.h"
#include "plugin/outlier_detector/outlier_detector.h"
#include "plugin/health_checker/health_checker.h"
#include "plugin/plugin_manager.h"
#include "polaris/config.h"
#include "polaris/context.h"
Expand Down Expand Up @@ -94,9 +94,7 @@ CircuitBreakerChain* ServiceContext::GetCircuitBreakerChain() {
return impl_->circuit_breaker_chain_;
}

OutlierDetectorChain* ServiceContext::GetOutlierDetectorChain() {
return impl_->outlier_detector_chain_;
}
HealthCheckerChain* ServiceContext::GetHealthCheckerChain() { return impl_->health_checker_chain_; }

ServiceContextImpl* ServiceContext::GetServiceContextImpl() { return impl_; }

Expand All @@ -107,9 +105,9 @@ ServiceContextImpl::ServiceContextImpl() {
for (int i = 0; i < kLoadBalanceTypeDefaultConfig; ++i) {
lb_table_[i] = NULL;
}
weight_adjuster_ = NULL;
circuit_breaker_chain_ = NULL;
outlier_detector_chain_ = NULL;
weight_adjuster_ = NULL;
circuit_breaker_chain_ = NULL;
health_checker_chain_ = NULL;
UpdateLastUseTime();
}

Expand All @@ -132,9 +130,9 @@ ServiceContextImpl::~ServiceContextImpl() {
delete circuit_breaker_chain_;
circuit_breaker_chain_ = NULL;
}
if (outlier_detector_chain_ != NULL) {
delete outlier_detector_chain_;
outlier_detector_chain_ = NULL;
if (health_checker_chain_ != NULL) {
delete health_checker_chain_;
health_checker_chain_ = NULL;
}
}

Expand Down Expand Up @@ -175,10 +173,10 @@ ReturnCode ServiceContextImpl::Init(const ServiceKey& service_key, Config* confi
}

// 初始化探活插件
plugin_config = config->GetSubConfig("outlierDetection");
outlier_detector_chain_ = new OutlierDetectorChainImpl(service_key, context->GetLocalRegistry(),
circuit_breaker_chain_);
ret = outlier_detector_chain_->Init(plugin_config, context);
plugin_config = config->GetSubConfig("healthCheck");
health_checker_chain_ =
new HealthCheckerChainImpl(service_key, context->GetLocalRegistry(), circuit_breaker_chain_);
ret = health_checker_chain_->Init(plugin_config, context);
delete plugin_config;
if (ret != kReturnOk) {
return ret;
Expand All @@ -188,7 +186,7 @@ ReturnCode ServiceContextImpl::Init(const ServiceKey& service_key, Config* confi
plugin_config = config->GetSubConfig("circuitBreaker");
circuit_breaker_chain_ = new CircuitBreakerChainImpl(
service_key, context->GetLocalRegistry(),
outlier_detector_chain_->GetOutlierDetectors().empty()); // 探测插件为空,则表示开启自动半开
health_checker_chain_->GetHealthCheckers().empty()); // 探测插件为空,则表示开启自动半开
ret = circuit_breaker_chain_->Init(plugin_config, context);
delete plugin_config;
if (ret != kReturnOk) {
Expand Down Expand Up @@ -247,7 +245,9 @@ Context* Context::Create(Config* config, ContextMode mode) {
return NULL;
}
// Polaris discover先请求一下
if (context_impl->InitSystemService(context_impl->GetDiscoverService()) != kReturnOk) {
const PolarisCluster& discover_cluster = context_impl->GetDiscoverService();
if (!discover_cluster.service_.name_.empty() &&
context_impl->InitSystemService(discover_cluster) != kReturnOk) {
delete context;
return NULL;
}
Expand Down
4 changes: 2 additions & 2 deletions polaris/context_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
#include "monitor/service_record.h"
#include "plugin/circuit_breaker/circuit_breaker.h"
#include "plugin/circuit_breaker/set_circuit_breaker.h"
#include "plugin/outlier_detector/outlier_detector.h"
#include "plugin/health_checker/health_checker.h"
#include "plugin/service_router/service_router.h"
#include "polaris/context.h"
#include "polaris/defs.h"
Expand Down Expand Up @@ -63,7 +63,7 @@ class ServiceContextImpl {
LoadBalancer* lb_table_[kLoadBalanceTypeDefaultConfig];
WeightAdjuster* weight_adjuster_;
CircuitBreakerChain* circuit_breaker_chain_;
OutlierDetectorChain* outlier_detector_chain_;
HealthCheckerChain* health_checker_chain_;
uint64_t last_use_time_;
};

Expand Down
6 changes: 3 additions & 3 deletions polaris/engine/engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class Context;
Engine::Engine(Context* context)
: context_(context), main_executor_(context), cache_manager_(context),
monitor_reporter_(context_), circuit_breaker_executor_(context),
outlier_detector_executor_(context) {}
health_check_executor_(context) {}

Engine::~Engine() {
StopAndWait();
Expand All @@ -38,7 +38,7 @@ ReturnCode Engine::Start() {
(ret_code = cache_manager_.Start()) != kReturnOk ||
(ret_code = monitor_reporter_.Start()) != kReturnOk ||
(ret_code = circuit_breaker_executor_.Start()) != kReturnOk ||
(ret_code = outlier_detector_executor_.Start()) != kReturnOk) {
(ret_code = health_check_executor_.Start()) != kReturnOk) {
return ret_code;
}
return kReturnOk;
Expand All @@ -49,7 +49,7 @@ ReturnCode Engine::StopAndWait() {
cache_manager_.StopAndWait();
monitor_reporter_.StopAndWait();
circuit_breaker_executor_.StopAndWait();
outlier_detector_executor_.StopAndWait();
health_check_executor_.StopAndWait();
return kReturnOk;
}

Expand Down
4 changes: 2 additions & 2 deletions polaris/engine/engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

#include "cache/cache_manager.h"
#include "engine/circuit_breaker_executor.h"
#include "engine/health_check_executor.h"
#include "engine/main_executor.h"
#include "engine/outlier_detection_executor.h"
#include "monitor/monitor_reporter.h"
#include "polaris/defs.h"

Expand Down Expand Up @@ -47,7 +47,7 @@ class Engine {
CacheManager cache_manager_;
MonitorReporter monitor_reporter_;
CircuitBreakerExecutor circuit_breaker_executor_;
OutlierDetectionExecutor outlier_detector_executor_;
HealthCheckExecutor health_check_executor_;
};

} // namespace polaris
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// language governing permissions and limitations under the License.
//

#include "engine/outlier_detection_executor.h"
#include "engine/health_check_executor.h"

#include <iosfwd>
#include <vector>
Expand All @@ -23,25 +23,24 @@

namespace polaris {

OutlierDetectionExecutor::OutlierDetectionExecutor(Context* context) : Executor(context) {}
HealthCheckExecutor::HealthCheckExecutor(Context* context) : Executor(context) {}

void OutlierDetectionExecutor::SetupWork() {
reactor_.SubmitTask(new FuncTask<OutlierDetectionExecutor>(TimingDetect, this));
void HealthCheckExecutor::SetupWork() {
reactor_.SubmitTask(new FuncTask<HealthCheckExecutor>(TimingDetect, this));
}

void OutlierDetectionExecutor::TimingDetect(OutlierDetectionExecutor* executor) {
void HealthCheckExecutor::TimingDetect(HealthCheckExecutor* executor) {
std::vector<ServiceContext*> all_service_contexts;
executor->context_->GetContextImpl()->GetAllServiceContext(all_service_contexts);
for (std::size_t i = 0; i < all_service_contexts.size(); ++i) {
OutlierDetectorChain* outlier_detector_chain =
all_service_contexts[i]->GetOutlierDetectorChain();
outlier_detector_chain->DetectInstance();
HealthCheckerChain* health_checker_chain = all_service_contexts[i]->GetHealthCheckerChain();
health_checker_chain->DetectInstance();
all_service_contexts[i]->DecrementRef();
}
all_service_contexts.clear();
// 设置定时任务
executor->reactor_.AddTimingTask(
new TimingFuncTask<OutlierDetectionExecutor>(TimingDetect, executor, 1000));
new TimingFuncTask<HealthCheckExecutor>(TimingDetect, executor, 1000));
}

} // namespace polaris
Loading