diff --git a/src/brpc/periodic_naming_service.cpp b/src/brpc/periodic_naming_service.cpp index 5e10977997..ac3c337032 100644 --- a/src/brpc/periodic_naming_service.cpp +++ b/src/brpc/periodic_naming_service.cpp @@ -37,7 +37,7 @@ int PeriodicNamingService::RunNamingService( const char* service_name, NamingServiceActions* actions) { std::vector servers; bool ever_reset = false; - for (;;) { + while (true) { servers.clear(); const int rc = GetServers(service_name, &servers); if (rc == 0) { @@ -51,6 +51,10 @@ int PeriodicNamingService::RunNamingService( actions->ResetServers(servers); } + if (bthread_stopped(bthread_self())) { + RPC_VLOG << "Quit NamingServiceThread=" << bthread_self(); + return 0; + } if (bthread_usleep(GetNamingServiceAccessIntervalMs() * 1000UL) < 0) { if (errno == ESTOP) { RPC_VLOG << "Quit NamingServiceThread=" << bthread_self(); diff --git a/src/brpc/periodic_naming_service.h b/src/brpc/periodic_naming_service.h index 8216ddfdbc..7e51114f90 100644 --- a/src/brpc/periodic_naming_service.h +++ b/src/brpc/periodic_naming_service.h @@ -32,7 +32,7 @@ class PeriodicNamingService : public NamingService { virtual int GetNamingServiceAccessIntervalMs() const; int RunNamingService(const char* service_name, - NamingServiceActions* actions); + NamingServiceActions* actions) override; }; } // namespace brpc diff --git a/src/brpc/policy/consul_naming_service.cpp b/src/brpc/policy/consul_naming_service.cpp index f4b2345e3b..795c180c8d 100644 --- a/src/brpc/policy/consul_naming_service.cpp +++ b/src/brpc/policy/consul_naming_service.cpp @@ -62,6 +62,14 @@ std::string RapidjsonValueToString(const BUTIL_RAPIDJSON_NAMESPACE::Value& value return buffer.GetString(); } +ConsulNamingService::ConsulNamingService() + : _backup_file_loaded(false), _consul_connected(false) {} + +int ConsulNamingService::GetNamingServiceAccessIntervalMs() const { + return FLAGS_consul_retry_interval_ms > 0 ? FLAGS_consul_retry_interval_ms : + PeriodicNamingService::GetNamingServiceAccessIntervalMs(); +} + int ConsulNamingService::DegradeToOtherServiceIfNeeded(const char* service_name, std::vector* servers) { if (FLAGS_consul_enable_degrade_to_file_naming_service && !_backup_file_loaded) { @@ -209,47 +217,9 @@ int ConsulNamingService::GetServers(const char* service_name, return 0; } -int ConsulNamingService::RunNamingService(const char* service_name, - NamingServiceActions* actions) { - std::vector servers; - bool ever_reset = false; - for (;;) { - servers.clear(); - const int rc = GetServers(service_name, &servers); - if (bthread_stopped(bthread_self())) { - RPC_VLOG << "Quit NamingServiceThread=" << bthread_self(); - return 0; - } - if (rc == 0) { - ever_reset = true; - actions->ResetServers(servers); - } else { - if (!ever_reset) { - // ResetServers must be called at first time even if GetServers - // failed, to wake up callers to `WaitForFirstBatchOfServers' - ever_reset = true; - servers.clear(); - actions->ResetServers(servers); - } - if (bthread_usleep(std::max(FLAGS_consul_retry_interval_ms, 1) * butil::Time::kMicrosecondsPerMillisecond) < 0) { - if (errno == ESTOP) { - RPC_VLOG << "Quit NamingServiceThread=" << bthread_self(); - return 0; - } - PLOG(FATAL) << "Fail to sleep"; - return -1; - } - } - } - CHECK(false); - return -1; -} - - void ConsulNamingService::Describe(std::ostream& os, const DescribeOptions&) const { os << "consul"; - return; } NamingService* ConsulNamingService::New() const { diff --git a/src/brpc/policy/consul_naming_service.h b/src/brpc/policy/consul_naming_service.h index 93bee068a1..bf63e658c0 100644 --- a/src/brpc/policy/consul_naming_service.h +++ b/src/brpc/policy/consul_naming_service.h @@ -19,7 +19,7 @@ #ifndef BRPC_POLICY_CONSUL_NAMING_SERVICE #define BRPC_POLICY_CONSUL_NAMING_SERVICE -#include "brpc/naming_service.h" +#include "brpc/periodic_naming_service.h" #include "brpc/channel.h" @@ -27,13 +27,15 @@ namespace brpc { class Channel; namespace policy { -class ConsulNamingService : public NamingService { -private: - int RunNamingService(const char* service_name, - NamingServiceActions* actions) override; +class ConsulNamingService : public PeriodicNamingService { +public: + ConsulNamingService(); +private: int GetServers(const char* service_name, - std::vector* servers); + std::vector* servers) override; + + int GetNamingServiceAccessIntervalMs() const override; void Describe(std::ostream& os, const DescribeOptions&) const override; @@ -48,8 +50,8 @@ class ConsulNamingService : public NamingService { Channel _channel; std::string _consul_index; std::string _consul_url; - bool _backup_file_loaded = false; - bool _consul_connected = false; + bool _backup_file_loaded; + bool _consul_connected; }; } // namespace policy