diff --git a/examples/quickstart/Makefile b/examples/quickstart/Makefile new file mode 100644 index 0000000..230c88d --- /dev/null +++ b/examples/quickstart/Makefile @@ -0,0 +1,27 @@ +BIT = $(shell getconf LONG_BIT) + +POLARIS_CPP = ../.. +POLARIS_INCL = $(POLARIS_CPP)/include +POLARIS_LIB_DIR = $(POLARIS_CPP)/build$(BIT)/lib +POLARIS_LIB = $(POLARIS_LIB_DIR)/libpolaris_api.a +PROTOBUF_LIB = $(POLARIS_CPP)/third_party/protobuf/build$(BIT)/libprotobuf.a + +CXX = g++ +CXXFLAGS += -g -Wall -Wno-write-strings -Werror -std=c++11 + +SRC = $(wildcard *.cpp) +OBJECTS = $(SRC:%.cpp=%) + +all: $(OBJECTS) + +%: %.cpp $(POLARIS_LIB) + @echo -e Building $< ... + $(CXX) $(CXXFLAGS) -I$(POLARIS_INCL) $< $(POLARIS_LIB) $(PROTOBUF_LIB) -pthread -lz -o $@ + +$(POLARIS_LIB): + @echo build polaris-cpp lib + make -C ${POLARIS_CPP} + +clean: + @echo -e Clean $(OBJECTS) + @-rm -rf $(OBJECTS) diff --git a/examples/quickstart/README-zh.md b/examples/quickstart/README-zh.md new file mode 100644 index 0000000..37ee1f2 --- /dev/null +++ b/examples/quickstart/README-zh.md @@ -0,0 +1,53 @@ +# 快速开始样例 + +## 样例说明 + +样例演示如何使用 polaris-cpp 完成被调端以及主调端应用接入polaris,并完成服务调用流程。 + +consumer: 接收用户tcp请求,通过polaris发现provider服务,并将请求转发给provider处理 +provider:启动服务监听端口,并自身注册到北极星,且开启健康检查 + +## 编译 + +```bash +make clean +make +``` + +编译完成后,会在当前目录生成两个二进制文件,consumer和provider + +## 运行 + +在当前目录下创建配置文件 polaris.yaml,配置文件中配置 Polaris Server 地址。 + +```yaml +global: + serverConnector: + addresses: + - 127.0.0.1:8081 +``` + +执行样例 + +需要在控制台先创建被调服务 Test/quickstart.echo.service + +启动被调方: + +```bash +# 监听地址端口 127.0.0.1:9091 注册到服务 Test/quickstart.echo.service 服务token为xxx +./provider Test quickstart.echo.service xxx 127.0.0.1 9092 +``` + +启动主调方: + +```bash +# 监听地址端口 127.0.0.1:9093 转发请求到被调服务 Test/quickstart.echo.service +./consumer 127.0.0.1 9093 Test quickstart.echo.service +``` + +发送测试请求: + +```bash +# 向consumer监听端口发送hello +echo hello | nc 127.0.0.1 9093 +``` diff --git a/examples/quickstart/consumer.cpp b/examples/quickstart/consumer.cpp new file mode 100644 index 0000000..fa4a25b --- /dev/null +++ b/examples/quickstart/consumer.cpp @@ -0,0 +1,258 @@ +// Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. +// +// Licensed under the BSD 3-Clause License (the "License"); you may not use this file +// except in compliance with the License. You may obtain a copy of the License at +// +// https://opensource.org/licenses/BSD-3-Clause +// +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the specific +// language governing permissions and limitations under the License. +// + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "polaris/consumer.h" + +class ConsumerServer { +public: + ConsumerServer(const std::string& host, int port, const polaris::ServiceKey& provider_service); + + ~ConsumerServer(); + + int Start(); + + void Stop(); + +private: + std::string Proccess(const std::string& message); + + int Send(const std::string& host, int port, const std::string& request, std::string& response); + +private: + std::string host_; + int port_; + polaris::ServiceKey provider_service_; + + std::atomic stop_; + std::unique_ptr accept_thread_; + + std::unique_ptr consumer_; +}; + +bool signal_received = false; +void SignalHandler(int signum) { + std::cout << "Interrupt signal (" << signum << ") received." << std::endl; + signal_received = true; +} + +int main(int argc, char** argv) { + if (argc < 5) { + std::cout << "usage: " << argv[0] << " host port service_namespace service_name" << std::endl; + return -1; + } + // register signal handler + signal(SIGINT, SignalHandler); + + polaris::ServiceKey service_key = {argv[3], argv[4]}; + ConsumerServer server(argv[1], atoi(argv[2]), service_key); + + // 启动服务 + if (server.Start() != 0) { + return -2; + } + + // 循环等待退出信号 + while (!signal_received) { + sleep(1); + } + + // 反注册完成以后再停止服务 + server.Stop(); + + return 0; +} + +ConsumerServer::ConsumerServer(const std::string& host, int port, + const polaris::ServiceKey& provider_service) + : host_(host), port_(port), provider_service_(provider_service), stop_(false) { + consumer_ = std::unique_ptr(polaris::ConsumerApi::CreateWithDefaultFile()); +} + +ConsumerServer::~ConsumerServer() {} + +int ConsumerServer::Start() { + auto sock_listener = socket(AF_INET, SOCK_STREAM, 0); + if (sock_listener < 0) { + std::cerr << "create socket with error: " << errno << std::endl; + return -1; + } + + // address info to bind socket + sockaddr_in server_addr; + server_addr.sin_family = AF_INET; + inet_pton(AF_INET, host_.c_str(), &server_addr.sin_addr); + server_addr.sin_port = htons(port_); + + // bind socket + if (bind(sock_listener, (sockaddr*)&server_addr, sizeof(server_addr)) < 0) { + std::cerr << "bind to " << host_ << ":" << port_ << " failed with error: " << errno + << std::endl; + close(sock_listener); + return -2; + } + + // start listening + if (listen(sock_listener, SOMAXCONN) < 0) { + std::cerr << "listen to " << host_ << ":" << port_ << " failed with error: " << errno + << std::endl; + close(sock_listener); + return -3; + } + std::cout << "listen to " << host_ << ":" << port_ << " success" << std::endl; + + // create accept thread + accept_thread_ = std::unique_ptr(new std::thread([=] { + while (!stop_) { + fd_set set; + FD_ZERO(&set); + FD_SET(sock_listener, &set); + struct timeval timeout; + timeout.tv_sec = 2; + timeout.tv_usec = 0; + int ret = select(sock_listener + 1, &set, NULL, NULL, &timeout); + if (ret <= 0) { + continue; + } + sockaddr_in client_addr; + socklen_t client_addr_size = sizeof(client_addr); + int sock_client; + if ((sock_client = accept(sock_listener, (sockaddr*)&client_addr, &client_addr_size)) < 0) { + std::cerr << "accept connection failed with error:" << errno << std::endl; + continue; + } + + // 处理客户端连接 + std::async(std::launch::async, [=] { + char buffer[1024]; + auto bytes = recv(sock_client, buffer, sizeof(buffer), 0); + if (bytes <= 0) { + std::cerr << "received message failed: " << errno << std::endl; + close(sock_client); + return; + } + std::string response = Proccess(buffer); + bytes = send(sock_client, response.data(), response.size(), 0); + close(sock_client); + + if (bytes < 0) { + std::cerr << "send response failed: " << errno << std::endl; + } + }); + } + close(sock_listener); + })); + + return 0; +} + +std::string ConsumerServer::Proccess(const std::string& message) { + // 获取provider服务实例 + polaris::GetOneInstanceRequest instance_requst(provider_service_); + polaris::Instance instance; + auto ret_code = consumer_->GetOneInstance(instance_requst, instance); + if (ret_code != polaris::kReturnOk) { + std::cout << "get one instance for service with error: " + << polaris::ReturnCodeToMsg(ret_code).c_str() << std::endl; + } + + // 调用业务 + std::string response; + auto begin_time = std::chrono::steady_clock::now(); + int send_ret = Send(instance.GetHost(), instance.GetPort(), message, response); + auto end_time = std::chrono::steady_clock::now(); + + // 上报调用结果 + polaris::ServiceCallResult result; + result.SetServiceNamespace(provider_service_.namespace_); + result.SetServiceName(provider_service_.name_); + result.SetInstanceId(instance.GetId()); + result.SetDelay( + std::chrono::duration_cast(end_time - begin_time).count()); + result.SetRetCode(send_ret); + result.SetRetStatus(send_ret >= 0 ? polaris::kCallRetOk : polaris::kCallRetError); + if ((ret_code = consumer_->UpdateServiceCallResult(result)) != polaris::kReturnOk) { + std::cout << "update call result for instance with error:" + << " msg:" << polaris::ReturnCodeToMsg(ret_code).c_str() << std::endl; + } + + if (send_ret) { + response = + "send msg to " + instance.GetHost() + ":" + std::to_string(instance.GetPort()) + " failed"; + } + std::cout << response << std::endl; + return response; +} + +int ConsumerServer::Send(const std::string& host, int port, const std::string& request, + std::string& response) { + // create a socket + int sock_fd = socket(AF_INET, SOCK_STREAM, 0); + if (sock_fd < 0) { + std::cout << "create socket failed: " << errno << std::endl; + return -1; + } + + sockaddr_in server_addr; + server_addr.sin_family = AF_INET; + inet_pton(AF_INET, host.c_str(), &server_addr.sin_addr); + server_addr.sin_port = htons(port); + + if (connect(sock_fd, (sockaddr*)&server_addr, sizeof(server_addr)) < 0) { + std::cerr << "connection establish failed: " << errno << std::endl; + close(sock_fd); + return -2; + } + + // send the message + int bytes_send = send(sock_fd, request.data(), request.length(), 0); + if (bytes_send < 0) { + std::cerr << "send message failed: " << errno << std::endl; + close(sock_fd); + return -3; + } + + char buffer[4096]; + int bytes_recv = recv(sock_fd, &buffer, sizeof(buffer), 0); + if (bytes_recv <= 0) { + std::cerr << "receive message failed: " << errno << std::endl; + close(sock_fd); + return -4; + } + + close(sock_fd); + response = std::string(buffer); + return 0; +} + +void ConsumerServer::Stop() { + stop_ = true; + if (accept_thread_) { + accept_thread_->join(); + } +} \ No newline at end of file diff --git a/examples/quickstart/provider.cpp b/examples/quickstart/provider.cpp new file mode 100644 index 0000000..f91ec3b --- /dev/null +++ b/examples/quickstart/provider.cpp @@ -0,0 +1,252 @@ +// Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. +// +// Licensed under the BSD 3-Clause License (the "License"); you may not use this file +// except in compliance with the License. You may obtain a copy of the License at +// +// https://opensource.org/licenses/BSD-3-Clause +// +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the specific +// language governing permissions and limitations under the License. +// + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "polaris/provider.h" + +class ProviderServer { +public: + ProviderServer(const std::string& service_namespace, const std::string& service_name, + const std::string& service_token, const std::string& host, int port); + + ~ProviderServer(); + + // 启动服务 + int Start(); + + // 注册服务实例 + int Register(); + + // 反注册服务实例 + void Deregister(); + + // 停止服务 + void Stop(); + +private: + std::string service_namespace_; + std::string service_name_; + std::string service_token_; + std::string host_; + int port_; + std::string instance_id_; + + std::atomic stop_; + std::unique_ptr accept_thread_; + + std::unique_ptr provider_; + std::unique_ptr heartbeat_thread_; +}; + +bool signal_received = false; +void SignalHandler(int signum) { + std::cout << "Interrupt signal (" << signum << ") received." << std::endl; + signal_received = true; +} + +constexpr auto kHeartbeatTtl = 5; + +int main(int argc, char** argv) { + if (argc < 6) { + std::cout << "usage: " << argv[0] << " service_namespace service_name service_token host port" + << std::endl; + return -1; + } + // 注册信号 + signal(SIGINT, SignalHandler); + + ProviderServer server(argv[1], argv[2], argv[3], argv[4], atoi(argv[5])); + + // 先启动服务 + if (server.Start() != 0) { + return -2; + } + + // 启动服务成功以后 再注册服务实例, 并开启心跳上报 + if (server.Register() != 0) { + return -3; + } + + // 循环等待退出信号 + while (!signal_received) { + sleep(1); + } + + // 先反注册实例 + server.Deregister(); + + // 反注册完成以后再停止服务 + server.Stop(); + + return 0; +} + +ProviderServer::ProviderServer(const std::string& service_namespace, + const std::string& service_name, const std::string& service_token, + const std::string& host, int port) + : service_namespace_(service_namespace), service_name_(service_name), + service_token_(service_token), host_(host), port_(port), stop_(false) {} + +ProviderServer::~ProviderServer() { Stop(); } + +int ProviderServer::Start() { + // create a socket + auto sock_listener = socket(AF_INET, SOCK_STREAM, 0); + if (sock_listener < 0) { + std::cerr << "create socket with error: " << errno << std::endl; + return -1; + } + + // address info to bind socket + sockaddr_in server_addr; + server_addr.sin_family = AF_INET; + inet_pton(AF_INET, host_.c_str(), &server_addr.sin_addr); + server_addr.sin_port = htons(port_); + + // bind socket + if (bind(sock_listener, (sockaddr*)&server_addr, sizeof(server_addr)) < 0) { + std::cerr << "bind to " << host_ << ":" << port_ << " failed with error: " << errno + << std::endl; + close(sock_listener); + return -2; + } + + // start listening + if (listen(sock_listener, SOMAXCONN) < 0) { + std::cerr << "listen to " << host_ << ":" << port_ << " failed with error: " << errno + << std::endl; + close(sock_listener); + return -3; + } + std::cout << "listen to " << host_ << ":" << port_ << " success" << std::endl; + + // create accept thread + accept_thread_ = std::unique_ptr(new std::thread([=] { + while (!stop_) { + fd_set set; + FD_ZERO(&set); + FD_SET(sock_listener, &set); + struct timeval timeout; + timeout.tv_sec = 2; + timeout.tv_usec = 0; + int ret = select(sock_listener + 1, &set, NULL, NULL, &timeout); + if (ret <= 0) { + continue; + } + sockaddr_in client_addr; + socklen_t client_addr_size = sizeof(client_addr); + int sock_client; + if ((sock_client = accept(sock_listener, (sockaddr*)&client_addr, &client_addr_size)) < 0) { + std::cerr << "accept connection failed with error:" << errno << std::endl; + continue; + } + + // 处理客户端连接 + std::async(std::launch::async, [=] { + char buffer[1024]; + auto bytes = recv(sock_client, buffer, sizeof(buffer), 0); + if (bytes <= 0) { + std::cerr << "received message failed: " << errno << std::endl; + close(sock_client); + return; + } + + std::string response = + "response form " + host_ + ":" + std::to_string(port_) + " echo " + buffer; + + bytes = send(sock_client, response.data(), response.size(), 0); + close(sock_client); + + if (bytes < 0) { + std::cerr << "send response failed: " << errno << std::endl; + } + }); + } + close(sock_listener); + })); + + return 0; +} + +int ProviderServer::Register() { + provider_ = std::unique_ptr(polaris::ProviderApi::CreateWithDefaultFile()); + if (provider_ == nullptr) { + return -1; + } + polaris::InstanceRegisterRequest register_req(service_namespace_, service_name_, service_token_, + host_, port_); + // 开启健康检查 + register_req.SetHealthCheckFlag(true); + register_req.SetHealthCheckType(polaris::kHeartbeatHealthCheck); + register_req.SetTtl(kHeartbeatTtl); + + // 注册实例 + auto ret_code = provider_->Register(register_req, instance_id_); + if (ret_code != polaris::kReturnOk && ret_code != polaris::kReturnExistedResource) { + std::cout << "register instance with error:" << polaris::ReturnCodeToMsg(ret_code).c_str() + << std::endl; + return ret_code; + } + + // 启动心跳上报线程 + heartbeat_thread_ = std::unique_ptr(new std::thread([=] { + while (!signal_received) { // 循环上报心跳 + polaris::InstanceHeartbeatRequest heartbeat_req(service_token_, instance_id_); + auto ret_code = provider_->Heartbeat(heartbeat_req); + if (ret_code != polaris::kReturnOk) { + std::cout << "instance heartbeat with error:" << polaris::ReturnCodeToMsg(ret_code).c_str() + << std::endl; + sleep(1); + continue; + } + sleep(kHeartbeatTtl); + } + })); + return 0; +} + +void ProviderServer::Deregister() { + if (heartbeat_thread_) { + heartbeat_thread_->join(); + } + // 反注册实例 + polaris::InstanceDeregisterRequest deregister_req(service_token_, instance_id_); + auto ret_code = provider_->Deregister(deregister_req); + if (ret_code != polaris::kReturnOk) { + std::cout << "instance deregister with error:" << polaris::ReturnCodeToMsg(ret_code).c_str() + << std::endl; + } +} + +void ProviderServer::Stop() { + stop_ = true; + if (accept_thread_) { + accept_thread_->join(); + accept_thread_ = nullptr; + } +}