From 2fcfeccce31856bf4ee6abf2c6612d80e1744e4d Mon Sep 17 00:00:00 2001 From: Piyush Bisen Date: Tue, 16 Oct 2018 14:47:52 +0530 Subject: [PATCH 01/11] * Implemented failover in ODBC. * Created new class ControlConnection.cpp --- native/.cproject | 20 +- native/.settings/language.settings.xml | 4 +- .../snappyclient/cpp/impl/ClientService.cpp | 111 +--- .../src/snappyclient/cpp/impl/ClientService.h | 20 + .../cpp/impl/ControlConnection.cpp | 622 +++++++++--------- .../snappyclient/cpp/impl/ControlConnection.h | 139 ++-- 6 files changed, 471 insertions(+), 445 deletions(-) diff --git a/native/.cproject b/native/.cproject index 65a661c5d..2669cbefb 100644 --- a/native/.cproject +++ b/native/.cproject @@ -11,7 +11,7 @@ - + @@ -19,7 +19,7 @@ - + @@ -30,7 +30,7 @@ + diff --git a/native/.settings/language.settings.xml b/native/.settings/language.settings.xml index 7b3b3668f..192438689 100644 --- a/native/.settings/language.settings.xml +++ b/native/.settings/language.settings.xml @@ -5,7 +5,7 @@ - + @@ -16,7 +16,7 @@ - + diff --git a/native/src/snappyclient/cpp/impl/ClientService.cpp b/native/src/snappyclient/cpp/impl/ClientService.cpp index 80a7d3385..52e555e25 100644 --- a/native/src/snappyclient/cpp/impl/ClientService.cpp +++ b/native/src/snappyclient/cpp/impl/ClientService.cpp @@ -69,7 +69,7 @@ #include "DNSCacheService.h" #include "InternalLogger.h" #include "InternalUtils.h" - +#include "ControlConnection.h" using namespace io::snappydata; using namespace io::snappydata::client; using namespace io::snappydata::client::impl; @@ -239,67 +239,13 @@ void ClientService::handleTException(const char* op, const TException& te) { handleStdException(op, te); } - /* -void ClientService::handleException(const TException* te, const thrift::SnappyException* se, - const std::set& failedServers, bool tryFailover, +void ClientService::handleException(const TException* te, + const std::set& failedServers, bool tryFailover, bool ignoreFailOver, bool createNewConnection, const std::string& op) { - if (!m_isOpen && createNewConnection) { - if (se != NULL) { - throw GET_SQLEXCEPTION2(se); - } else { - throw GET_SQLEXCEPTION2(SQLState::NO_CURRENT_CONNECTION, te); - } - } - if (!m_loadBalance - // no failover for transactions yet - || m_isolationLevel != IsolationLevel::NONE) { - tryFailover = false; - } - if (se != NULL) { - const thrift::SnappyExceptionData& sedata = se->exceptionData; - const std::string& sqlState = sedata.sqlState; - NetConnection::FailoverStatus status; - if ((status = NetConnection.getFailoverStatus(sqlState, - sedata.getSeverity(), se)).isNone()) { - // convert DATA_CONTAINTER_CLOSED to "X0Z01" for non-transactional case - if (this.isolationLevel == Connection.TRANSACTION_NONE - && SQLState.DATA_CONTAINER_CLOSED.equals(sqlState)) { - throw newSnappyExceptionForNodeFailure(op, - ClientSharedUtils.newRuntimeException(sedata.getReason(), - se.getCause())); - } else { - throw se; - } - } else if (!tryFailover) { - throw newSnappyExceptionForNodeFailure(op, se); - } else if (status == NetConnection.FailoverStatus.RETRY) { - return failedServers; - } - } else if (t instanceof TException) { - if (!tryFailover) { - throw newSnappyExceptionForNodeFailure(op, t); - } - } else { - throw ClientExceptionUtil.newSnappyException(SQLState.JAVA_EXCEPTION, t, - t.getClass(), t.getMessage() + " (Server=" + this.currentHostAddr - + ')'); - } - // need to do failover to new server, so get the next one - if (failedServers == null) { - @SuppressWarnings("unchecked") - Set servers = new THashSet(2); - failedServers = servers; - } - failedServers.add(this.currentHostAddr); - if (createNewConnection) { - openConnection(this.currentHostAddr, failedServers); - } - return failedServers; -} -*/ +}*/ void ClientService::throwSQLExceptionForNodeFailure(const char* op, const std::exception& se) { @@ -343,14 +289,14 @@ void ClientService::setPendingTransactionAttrs( // settings; this could become configurable in future ClientService::ClientService(const std::string& host, const int port, thrift::OpenConnectionArgs& connArgs) : - // default for load-balance is true - m_connArgs(initConnectionArgs(connArgs)), m_loadBalance(true), - m_reqdServerType(thrift::ServerType::THRIFT_SNAPPY_CP), - m_useFramedTransport(false), m_serverGroups(), - m_transport(), m_client(createDummyProtocol()), - m_connHosts(1), m_connId(0), m_token(), m_isOpen(false), - m_pendingTXAttrs(), m_hasPendingTXAttrs(false), - m_isolationLevel(IsolationLevel::NONE), m_lock() { + // default for load-balance is true + m_connArgs(initConnectionArgs(connArgs)), m_loadBalance(true), + m_reqdServerType(thrift::ServerType::THRIFT_SNAPPY_CP), + m_useFramedTransport(false), m_serverGroups(), + m_transport(), m_client(createDummyProtocol()), + m_connHosts(1), m_connId(0), m_token(), m_isOpen(false), + m_pendingTXAttrs(), m_hasPendingTXAttrs(false), + m_isolationLevel(IsolationLevel::NONE), m_lock() { std::map& props = connArgs.properties; std::map::iterator propValue; @@ -439,15 +385,14 @@ void ClientService::openConnection(thrift::HostAddress& hostAddr, m_currentHostAddr = hostAddr; while (true) { - /* - if (m_loadBalance) { - ControlConnection controlService = ControlConnection - .getOrCreateControlConnection(this.connHosts.get(0), this); - // at this point query the control service for preferred server - hostAddr = controlService.getPreferredServer(failedServers, - this.serverGroups, false); - } - */ + + if (m_loadBalance) { + boost::optional controlService = ControlConnection::getOrCreateControlConnection(m_connHosts,this,nullptr); + // at this point query the control service for preferred server + boost::mutex mutex; + boost::lock_guard serviceGuard(mutex); + controlService->getPreferredServer(hostAddr ,nullptr,failedServers,this->m_serverGroups, false); + } try { // first close any existing transport @@ -542,7 +487,7 @@ thrift::OpenConnectionArgs& ClientService::initConnectionArgs( } thrift::ServerType::type ClientService::getServerType(bool isServer, -bool useBinaryProtocol, bool useSSL) { + bool useBinaryProtocol, bool useSSL) { if (isServer) { if (useSSL) { return useBinaryProtocol ? thrift::ServerType::THRIFT_SNAPPY_BP_SSL @@ -1558,3 +1503,17 @@ void ClientService::close() { handleUnknownException("close"); } } +void ClientService::updateFailedServersForCurrent(std::set& failedServers, + bool checkAllFailed, std::exception* failure){ + + //TODO:: Need to discuss with sumedh about this method + thrift::HostAddress host = this->m_currentHostAddr; + + auto ret = failedServers.insert(host); + if(ret.second==false && checkAllFailed){ + boost::optional controlService = ControlConnection::getOrCreateControlConnection(m_connHosts,this,failure); + thrift::HostAddress pHost; + controlService->searchRandomServer(failedServers,failure,pHost); + } + +} diff --git a/native/src/snappyclient/cpp/impl/ClientService.h b/native/src/snappyclient/cpp/impl/ClientService.h index 9de2ee487..cd40dc3f4 100644 --- a/native/src/snappyclient/cpp/impl/ClientService.h +++ b/native/src/snappyclient/cpp/impl/ClientService.h @@ -134,6 +134,9 @@ namespace impl { //const SSLSocketParameters& sslParams, boost::shared_ptr& returnTransport); + void updateFailedServersForCurrent(std::set& failedServers, + bool checkAllFailed,std::exception* failure); + protected: virtual void checkConnection(const char* op); @@ -167,6 +170,10 @@ namespace impl { void destroyTransport() noexcept; +// void handleException(const TException* te, +// const std::set& failedServers, bool tryFailover, bool ignoreFailOver, +// bool createNewConnection, const std::string& op); + private: // the static hostName and hostId used by all connections static std::string s_hostName; @@ -338,6 +345,19 @@ namespace impl { void bulkClose(const std::vector& entities); void close(); + + const std::vector& getLocators() const noexcept{ + return m_connHosts; + } + + const std::set& getServerGrps() const noexcept{ + return m_serverGroups; + } + + inline bool isFrameTransport() const noexcept { + return m_useFramedTransport; + } + }; } /* namespace impl */ diff --git a/native/src/snappyclient/cpp/impl/ControlConnection.cpp b/native/src/snappyclient/cpp/impl/ControlConnection.cpp index c8a4e04d9..f4c3ccff8 100644 --- a/native/src/snappyclient/cpp/impl/ControlConnection.cpp +++ b/native/src/snappyclient/cpp/impl/ControlConnection.cpp @@ -33,345 +33,371 @@ * LICENSE file. */ +//--------Headers---------- +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include #include "ControlConnection.h" -#include -using namespace apache::thrift; +using namespace io::snappydata; using namespace io::snappydata::client; - -/* -ControlConnection::ControlConnection() { - // TODO Auto-generated constructor stub - +using namespace io::snappydata::client::impl; +using namespace io::snappydata::thrift; + +//----private static data member initialiazation---- +std::vector > ControlConnection::s_allConnections; +boost::mutex ControlConnection::s_allConnsLock; +std::unique_ptr ControlConnection::m_controlLocator = nullptr; +thrift::HostAddress ControlConnection::m_controlHost; +std::unordered_set ControlConnection::m_controlHostSet; +thrift::ServerType::type ControlConnection::m_snappyServerType; +std::vector ControlConnection::m_locators; +bool ControlConnection::m_framedTransport; +std::set ControlConnection::m_snappyServerTypeSet; + +ControlConnection::ControlConnection(ClientService *const &service) :m_serverGroups(service->getServerGrps()){ + m_locators= service->getLocators(); + m_framedTransport = service->isFrameTransport(); + m_snappyServerType=service->getServerType(true,false,false); + m_controlHost= service->getCurrentHostAddress(); + boost::assign::insert(m_snappyServerTypeSet)(service->getServerType(true,false,false)); + std::copy(m_locators.begin(),m_locators.end(),std::inserter(m_controlHostSet,m_controlHostSet.end())); } - public static final class SearchRandomServer implements TObjectProcedure { +const boost::optional ControlConnection::getOrCreateControlConnection( + const std::vector& hostAddrs, ClientService *const &service, std::exception* failure){ - private final ServerType searchServerType; - private final Set failedServers; - private HostAddress foundServer; - private int searchIndex; + // loop through all ControlConnections since size of this global list is + // expected to be in single digit (total number of distributed systems) - public static final Random rand = new Random(); + boost::lock_guard globalGuard(s_allConnsLock); + signed short index = static_cast(s_allConnections.size()); + while (--index >= 0) { + const std::unique_ptr& controlService = s_allConnections.at(index); - SearchRandomServer(ServerType searchServerType, int setSize, - Set failedServers) { - this.searchServerType = searchServerType; - this.failedServers = failedServers != null ? failedServers : Collections - . emptySet(); - this.searchIndex = rand.nextInt(setSize); + boost::lock_guard serviceGuard(controlService->m_lock); + for(thrift::HostAddress hostAddr : hostAddrs){ + auto result = std::find(m_locators.begin(),m_locators.end(),hostAddr); + if(result == m_locators.end()){ + continue; + } + auto serviceServerType = service->getServerType(true,false,false); + auto contrServiceServerType = controlService->m_snappyServerType; + if(contrServiceServerType == serviceServerType){ + return *controlService; + }else{ + thrift::SnappyException *ex = new thrift::SnappyException(); + std::string portStr; + Utils::convertIntToString(hostAddr.port,portStr); + std::string msg= hostAddr.hostName + ":" + portStr + + " as registered but having different type " + Utils::getServerTypeString(contrServiceServerType) + + " than connection " + Utils::getServerTypeString( serviceServerType) ; + SnappyExceptionData snappyExData; + // snappyExData.__set_sqlState("08006.C");// TODO: discuss with sumedh about correct SQLState + snappyExData.__set_reason(msg); + + ex->__set_exceptionData(snappyExData); + ex->__set_serverInfo(hostAddr.hostName + ":" + portStr ); + throw ex; + } } - - public HostAddress getRandomServer() { - return this.foundServer; + } + // if we reached here, then need to create a new ControlConnection + std::unique_ptr controlService ( new ControlConnection(service)); + thrift::HostAddress preferredServer; + controlService->getPreferredServer(preferredServer,failure, true); + // check again if new control host already exist + index = static_cast(s_allConnections.size()); + while (--index >= 0) { + const std::unique_ptr& controlService = s_allConnections.at(index); + boost::lock_guard serviceGuard(controlService->m_lock); + auto result = std::find(m_locators.begin(),m_locators.end(),preferredServer); + if(result == m_locators.end()){ + return *controlService; } + // if (controlService->m_controlHostSet.find(preferredServer) + // != controlService->m_controlHostSet.end()) { + // return *controlService; + // } + } + s_allConnections.push_back(std::move(controlService)); + return *s_allConnections.back(); +} +void ControlConnection::getLocatorPreferredServer(thrift::HostAddress& prefHostAddr,std::set& failedServers, + std::setserverGroups){ + // TODO: SanityManager + m_controlLocator->getPreferredServer(prefHostAddr,m_snappyServerTypeSet,serverGroups,failedServers); + //TODO:SanityManager +} +void ControlConnection::getPreferredServer(thrift::HostAddress& preferredServer,std::exception* failure,bool forFailover){ + std::set failedServers; + std::set serverGroups; + return getPreferredServer(preferredServer,failure,failedServers,serverGroups,forFailover); +} +void ControlConnection::getPreferredServer(thrift::HostAddress& preferredServer,std::exception* failure, + std::set& failedServers, + std::set& serverGroups,bool forFailover){ - @Override - public boolean execute(Object o) { - if (this.searchIndex > 0) { - this.searchIndex--; + if(m_controlLocator == nullptr) + { + failoverToAvailableHost(failedServers, false,failure); + forFailover = true; + } + + bool firstCall = true; + while(true){ + try{ + if(forFailover){ + //TODO: SanityManager + + //refresh the full host list + std::vector prefServerAndAllHosts; + m_controlLocator->getAllServersWithPreferredServer(prefServerAndAllHosts,m_snappyServerTypeSet,serverGroups,failedServers); + + //TODO :: refresh new server list--like java do. + refreshAllHosts(prefServerAndAllHosts); + preferredServer = prefServerAndAllHosts.at(0); + //TODO :SanityManger + }else{ + getLocatorPreferredServer(preferredServer,failedServers,serverGroups); } - else if (this.foundServer != null) { - return false; + // TODO: SanityManager + if(preferredServer.port <=0){ + /*For this case we don't have a locator or locator unable to + * determine a preferred server, so choose some server randomly + * as the "preferredServer". In case all servers have failed then + * the search below will also fail. + * Remove controlHost from failedServers since it is known to be + * working at this point (e.g after a reconnect) + * */ + std::set skipServers = failedServers; + if( !failedServers.empty() && std::find(failedServers.begin(),failedServers.end(),m_controlHost)!= failedServers.end()){ + //don't change the original failure list since that is proper + // for the current operation but change for random server search + skipServers.erase(m_controlHost); + } + searchRandomServer(skipServers, failure,preferredServer); } - - HostAddress hostAddr = (HostAddress)o; - SanityManager.DEBUG_PRINT("SW:", "SW: random host " + hostAddr - + " serverType=" + hostAddr.getServerType() + ", searchType=" - + this.searchServerType + ", failed servers = " + failedServers - + ", isFailed=" + failedServers.contains(hostAddr)); - if (hostAddr.getServerType() == this.searchServerType - && !this.failedServers.contains(hostAddr)) { - this.foundServer = hostAddr; + //TODO: Sanitymanger + return; + }catch(thrift::SnappyException &snEx){ + // TODO: + //Discuss with Sumedh + }catch(TException &tex){ + // TODO: SanityManager + //Search for a new host for locator query + if(failedServers.empty()){ + // TODO } - return true; + // for the first call do not mark controlhost as failed but retry(e.g. for a reconnect case) + if(firstCall){ + firstCall = false; + }else{ + failedServers.insert(m_controlHost); + } + m_controlLocator->getOutputProtocol()->getTransport()->close(); + failoverToAvailableHost(failedServers,true,failure); + if(failure ==nullptr){ + failure = &(tex);// TODO: need to look again + } + }catch(std::exception &ex){ + throw unexpectedError(ex, m_controlHost); } + forFailover = true; } - -const std::vector ControlConnection::s_allConnections(2); -const boost::mutex ControlConnection::s_allConnsLock; - -ControlConnection::ControlConnection(const ClientService& service) : - m_snappyServerType(service.m_reqdServerType), m_snappyServerTypeSet( - boost::assign::list_of(service.m_reqdServerType)), m_locators( - service.m_locators), m_controlHost(), m_controlLocator(), - m_controlHosts(service.m_connHosts), m_controlHostSet( - service.m_connHosts), m_serverGroups(service.m_serverGroups) { } -const ControlConnection& ControlConnection::getOrCreateControlConnection( - const thrift::HostAddress& hostAddr, const ClientService& service) { - // loop through all ControlConnections since size of this global list is - // expected to be in single digit (total number of distributed systems) - boost::lock_guard globalGuard(s_allConnsLock); +void ControlConnection::searchRandomServer(const std::set& skipServers,std::exception* failure, + thrift::HostAddress& hostAddress){ - size_t index = s_allConnections.size(); - while (--index >= 0) { - std::unique_ptr& controlService = s_allConnections[index]; - - boost::lock_guard serviceGuard( - controlService->m_lock); - if (controlService->m_controlHostSet.find(hostAddr) - != controlService->m_controlHostSet.end()) { - return controlService; + //TODO: Need to discuss implemetation of this method and also ClientService:: updateFailedServersForCurrent with sumedh + std::vector searchServers; + // Note: Do not use unordered_set -- reason is http://www.cplusplus.com/forum/general/198319/ + std::copy(m_controlHostSet.begin(),m_controlHostSet.end(),std::inserter(searchServers,searchServers.end())); + if(searchServers.size() > 2){ + std::random_shuffle(searchServers.begin(),searchServers.end()); + } + bool findIt = false; + for(thrift::HostAddress host: searchServers){ + if(host.serverType == m_snappyServerType && + !(!skipServers.empty() && + std::find(skipServers.begin(),skipServers.end(),host)!=skipServers.end())){ + hostAddress = host; + findIt = true; + break; } } - // if we reached here, then need to create a new ControlConnection - std::unique_ptr controlService = new ControlConnection( - service); - std::set emptyServers; - controlService->getPreferredServer(emptyServers, true); - s_allConnections.push_back(controlService); - return controlService; + if(findIt) return; + throw failoverExhausted(skipServers,failure); } +void ControlConnection::failoverToAvailableHost(std::set& failedServers,bool checkFailedControlHosts, + std::exception* failure){ -const thrift::HostAddress& ControlConnection::getPreferredServer( - std::set& failedServers, - bool forFailover) -{ - if (m_controlLocator.get() == NULL) { - failoverToAvailableHost(failedServers, NULL); - forFailover = true; + for(auto iterator = m_controlHostSet.begin();iterator!= m_controlHostSet.end(); ++iterator ){ + thrift::HostAddress controlAddr = *iterator; + if(checkFailedControlHosts && ! failedServers.empty() && (failedServers.find(controlAddr) != failedServers.end())){ + continue; } - - while (true) { - try { - HostAddress preferredServer; - if (forFailover) { - if (SanityManager.TraceClientHA) { - SanityManager.DEBUG_PRINT(SanityManager.TRACE_CLIENT_HA, - "getAllServersWithPreferredServer() trying using host " - + this.controlHost); - } - if (SanityManager.TraceClientStatement) { - final long ns = System.nanoTime(); - SanityManager.DEBUG_PRINT_COMPACT( - "getAllServersWithPreferredServer_S", null, 0, ns, true, null); - } - - // refresh the full host list - List prefServerAndAllHosts = controlLocator - .getAllServersWithPreferredServer(this.snappyServerTypeSet, - this.serverGroups, failedServers); - // refresh the new server list - List allHosts = prefServerAndAllHosts.subList(1, - prefServerAndAllHosts.size()); - refreshAllHosts(allHosts); - preferredServer = prefServerAndAllHosts.get(0); - - if (SanityManager.TraceClientHA) { - SanityManager.DEBUG_PRINT(SanityManager.TRACE_CLIENT_HA, - "Returning all hosts " + allHosts - + " using control connection to " + this.controlHost); - } - if (SanityManager.TraceClientStatement) { - final long ns = System.nanoTime(); - SanityManager.DEBUG_PRINT_COMPACT( - "getAllServersWithPreferredServer_E", null, 0, ns, false, - null); - } - } - else { - if (SanityManager.TraceClientHA) { - SanityManager.DEBUG_PRINT(SanityManager.TRACE_CLIENT_HA, - "getPreferredServer() trying using host " + this.controlHost); - } - if (SanityManager.TraceClientStatement) { - final long ns = System.nanoTime(); - SanityManager.DEBUG_PRINT_COMPACT("getPreferredServer_S", null, - 0, ns, true, null); - } - - preferredServer = controlLocator.getPreferredServer( - this.snappyServerTypeSet, this.serverGroups, failedServers); - - if (SanityManager.TraceClientStatement) { - final long ns = System.nanoTime(); - SanityManager.DEBUG_PRINT_COMPACT("getPreferredServer_E", null, - 0, ns, false, null); - } - } - - if (SanityManager.TraceClientHA) { - SanityManager.DEBUG_PRINT(SanityManager.TRACE_CLIENT_HA, - "Got preferred server " + preferredServer - + " using control connection to " + this.controlHost - + (preferredServer.getPort() <= 0 ? "" : " (trying random " - + "server since no preferred server received)")); + //m_controlHost = nullptr; + m_controlLocator.reset(nullptr); + + boost::shared_ptr inTransport =nullptr; + boost::shared_ptr outTransport =nullptr; + boost::shared_ptr inProtocol =nullptr; + boost::shared_ptr outProtocol =nullptr; + + try{ + while(true){ + if(outTransport !=nullptr){ + outTransport->close(); } - if (preferredServer.getPort() <= 0) { - // for this case we don't have a locator so choose some server - // randomly as the "preferredServer" - SearchRandomServer search = new SearchRandomServer(this.snappyServerType, - this.controlHostSet.size(), failedServers); - this.controlHostSet.forEach(search); - if ((preferredServer = search.getRandomServer()) == null) { - throw failoverExhausted(null); - } + boost::shared_ptr tTransport = nullptr; + if(m_snappyServerType == thrift::ServerType::THRIFT_LOCATOR_BP_SSL || + m_snappyServerType == thrift::ServerType::THRIFT_LOCATOR_CP_SSL || + m_snappyServerType == thrift::ServerType::THRIFT_SNAPPY_BP_SSL || + m_snappyServerType==thrift::ServerType::THRIFT_SNAPPY_CP_SSL){ + // TODO: Find out whether SnappyTSSLSocket is needed or not, or any other thing is required + TSSLSocketFactory sslSocketFactory; + tTransport = sslSocketFactory.createSocket(controlAddr.hostName,controlAddr.port); + }else if(m_snappyServerType == thrift::ServerType::THRIFT_LOCATOR_BP || + m_snappyServerType== thrift::ServerType::THRIFT_LOCATOR_CP || + m_snappyServerType== thrift::ServerType::THRIFT_SNAPPY_BP || + m_snappyServerType== thrift::ServerType::THRIFT_SNAPPY_CP){ + tTransport = boost::make_shared(controlAddr.hostName,controlAddr.port); // TODO: Find out whether SnappyTSocket is needed or not, or any other thing is required } - if (SanityManager.TraceClientHA) { - SanityManager.DEBUG_PRINT(SanityManager.TRACE_CLIENT_HA, - "Returning preferred server " + preferredServer - + " with current control connection to " + this.controlHost); + tTransport->open(); + TTransportFactory* transportFactory = nullptr; + if(m_framedTransport){ + transportFactory = new TFramedTransportFactory(); + }else{ + transportFactory = new TTransportFactory(); } - return preferredServer; - } catch (TException te) { - if (SanityManager.TraceClientHA) { - SanityManager.DEBUG_PRINT(SanityManager.TRACE_CLIENT_HA, - "getPreferredServer() received exception from control host " - + this.controlHost, te); - } - if (te instanceof SnappyException) { - SnappyException se = (SnappyException)te; - NetConnection.FailoverStatus status; - if ((status = NetConnection.getFailoverStatus(se - .getExceptionData().getSqlState(), se.getExceptionData() - .getSeverity(), se)).isNone()) { - throw se; - } - else if (status == NetConnection.FailoverStatus.RETRY) { - forFailover = true; - continue; - } - } - // search for a new host for locator query - if (failedServers == null) { - Set servers = new THashSet(2); - failedServers = servers; - } - failedServers.add(controlHost); - controlLocator.getOutputProtocol().getTransport().close(); - failedServers = failoverToAvailableHost(failedServers, te); - } catch (Throwable t) { - throw unexpectedError(t, controlHost); - } - forFailover = true; - } - } - -void ControlConnection::failoverToAvailableHost( - std::set& failedServers, - const std::exception* failure) -{ - NEXT_SERVER: for (std::vector::const_iterator iter = - m_controlHosts.begin(); iter != m_controlHosts.end(); ++iter) { - const thrift::HostAddress& controlAddr = *iter; - if (!failedServers.empty() && failedServers.find(controlAddr) != failedServers.end()) { - continue; - } - m_controlHost.hostName.clear(); - m_controlLocator.reset(NULL); - - boost::shared_ptr transport; - boost::shared_ptr protocol; - while (true) { - try { - if (transport.get() != NULL) { - transport->close(); - } - if (SanityManager.TraceClientHA) { - SanityManager.DEBUG_PRINT(SanityManager.TRACE_CLIENT_HA, - "Trying control connection to host " + controlAddr); - } - if (SanityManager.TraceClientStatement - | SanityManager.TraceClientConn) { - final long ns = System.nanoTime(); - SanityManager.DEBUG_PRINT_COMPACT( - "failoverToAvailableHost_S", null, 0, ns, true, - SanityManager.TraceClientConn ? new Throwable() : null); - } - - // using socket with TCompactProtocol to match the server - // settings; this could become configurable in future - if (m_snappyServerType.isThriftSSL()) { - transport = new SnappyTSSLSocket(controlAddr, this.sslParams); - } - else { - transport = new SnappyTSocket(controlAddr); - } - if (this.snappyServerType.isThriftBinaryProtocol()) { - protocol = new TBinaryProtocol(transport); - } - else { - protocol = new TCompactProtocol(transport); - } - break; - } catch (TException te) { - if (SanityManager.TraceClientHA) { - SanityManager.DEBUG_PRINT(SanityManager.TRACE_CLIENT_HA, - "Received exception in control connection to host " - + controlAddr, te); - } - failure = te; - // search for a new host for locator query - if (failedServers == null) { - @SuppressWarnings("unchecked") - Set servers = new THashSet(2); - failedServers = servers; - } - failedServers.add(controlAddr); - if (transport != null) { - transport.close(); - } - continue NEXT_SERVER; - } catch (Throwable t) { - throw unexpectedError(t, controlAddr); + inTransport = transportFactory->getTransport(tTransport); + outTransport = transportFactory->getTransport(tTransport); + delete transportFactory; + transportFactory= 0; + + TProtocolFactory* protocolFactory = nullptr; + if(m_snappyServerType == thrift::ServerType::THRIFT_LOCATOR_BP || + m_snappyServerType==thrift::ServerType::THRIFT_LOCATOR_BP_SSL || + m_snappyServerType == thrift::ServerType::THRIFT_SNAPPY_BP || + m_snappyServerType == thrift::ServerType::THRIFT_SNAPPY_BP_SSL){ + protocolFactory = new TBinaryProtocolFactory(); + + }else{ + protocolFactory = new TCompactProtocolFactory(); } - } - this.controlHost = controlAddr; - this.controlLocator = new LocatorService.Client(protocol); + inProtocol= protocolFactory->getProtocol(inTransport); + outProtocol= protocolFactory->getProtocol(outTransport); - if (SanityManager.TraceClientHA) { - SanityManager.DEBUG_PRINT(SanityManager.TRACE_CLIENT_HA, - "Established control connection to host " + controlAddr); + delete protocolFactory; + protocolFactory=0; + break; } - if (SanityManager.TraceClientStatement - | SanityManager.TraceClientConn) { - final long ns = System.nanoTime(); - SanityManager.DEBUG_PRINT_COMPACT( - "failoverToAvailableHost_E", null, 0, ns, false, - SanityManager.TraceClientConn ? new Throwable() : null); + }catch(TException &tExp){ + failure = &tExp; // TODO: need to look again + failedServers.insert(controlAddr); + if(outTransport != nullptr){ + outTransport->close(); } - return failedServers; + continue; + }catch(std::exception &ex){ + throw unexpectedError(ex,controlAddr); } - throw failoverExhausted(failure); - } - private void refreshAllHosts(List allHosts) { - // refresh the new server list + m_controlHost = controlAddr; - // we remove all from the set and re-populate since we would like - // to prefer the ones coming as "allServers" with "isServer" flag - // correctly set rather than the ones in "secondary-locators" - this.controlHostSet.clear(); - this.controlHosts.subList(this.locators.size(), this.controlHosts.size()) - .clear(); + m_controlLocator.reset (new thrift::LocatorServiceClient(inProtocol,outProtocol)); + return; + } + throw failoverExhausted(failedServers,failure); +} - this.controlHostSet.addAll(allHosts); - this.controlHostSet.addAll(this.controlHosts); +const thrift::SnappyException* ControlConnection:: unexpectedError(const std::exception& ex, const thrift::HostAddress& host){ - this.controlHosts.addAll(allHosts); + if(m_controlLocator != nullptr){ + m_controlLocator->getOutputProtocol()->getTransport()->close(); + m_controlLocator.reset(nullptr); } + thrift::SnappyException *snappyEx = new thrift::SnappyException(); + SnappyExceptionData snappyExData; + //snappyExData.__set_sqlState(std::string(SQLState::UNKNOWN_EXCEPTION));// TODO: discuss with sumedh about correct SQLState + snappyExData.__set_reason(ex.what()); + snappyEx->__set_exceptionData(snappyExData); + + std::string portNum; + Utils::convertIntToString(host.port,portNum); + snappyEx->__set_serverInfo(host.hostName + host.ipAddress + portNum + Utils::getServerTypeString(host.serverType)); + + return snappyEx; +} - private SnappyException unexpectedError(Throwable t, HostAddress host) { - this.controlHost = null; - if (this.controlLocator != null) { - this.controlLocator.getOutputProtocol().getTransport().close(); - this.controlLocator = null; +void ControlConnection::refreshAllHosts(const std::vector& allHosts) { + //refresh the locator list first(keep old but push current to front) + std::vector locators = m_locators; + std::vector newLocators(locators.size()); + + for(HostAddress host: allHosts){ + thrift::ServerType::type sType = host.serverType; + if(sType == ServerType::THRIFT_LOCATOR_BP || sType == ServerType::THRIFT_LOCATOR_BP_SSL || + sType == ServerType::THRIFT_LOCATOR_CP || sType == ServerType::THRIFT_LOCATOR_CP_SSL || + (std::find(locators.begin(),locators.end(), host)!=locators.end())){ + newLocators.push_back(host); + } + } + for(HostAddress host: locators){ + if(!(std::find(newLocators.begin(),newLocators.end(), host)!=newLocators.end())){ + newLocators.push_back(host); } - return ClientExceptionUtil.newSnappyException(SQLState.JAVA_EXCEPTION, t, - t.getClass(), t.getMessage() + " (Server=" + host + ')'); } + m_locators = newLocators; + // refresh the new server list + + // we remove all from the set and re-populate since we would like + // to prefer the ones coming as "allServers" with "isServer" flag + // correctly set rather than the ones in "secondary-locators" + m_controlHostSet.clear(); + m_controlHostSet.insert(newLocators.begin(),newLocators.end()); + m_controlHostSet.insert(allHosts.begin(),allHosts.end()); +} - private SnappyException failoverExhausted(Throwable cause) { - return ClientExceptionUtil.newSnappyException( - SQLState.DATA_CONTAINER_CLOSED, cause, this.locators.get(0), - " {failed after trying all available servers: " + controlHostSet - + (this.locators.size() > 1 ? ", secondary-locators=" - + this.locators.subList(1, this.locators.size()) : "") - + (cause instanceof TException ? " with: " + ThriftExceptionUtil - .getExceptionString(cause) + '}' : "}")); + +thrift::SnappyException* ControlConnection::failoverExhausted(const std::set& failedServers, + std::exception* failure) { + + // thrift::HostAddress host; + // if(! m_locators.empty()){ + // host = m_locators.at(0); + // } + std::string failedServerString; + for(thrift::HostAddress host : failedServers){ + std::string portStr; + Utils::convertIntToString(host.port,portStr); + failedServerString.append(host.hostName).append(":").append(portStr).append(","); } + thrift::SnappyException *snappyEx = new thrift::SnappyException(); + SnappyExceptionData snappyExData; + //snappyExData.__set_sqlState(std::string(SQLState::DATA_CONTAINER_CLOSED));// TODO: discuss with sumedh about correct SQLState + std::string reason ="{Failed afer trying all the servers:}" ; + snappyExData.__set_reason(reason); + snappyEx->__set_exceptionData(snappyExData); + //snappyExData.__set_sqlState();// + // TODO: complete this funtion + snappyEx->__set_serverInfo(failedServerString); + return snappyEx; } -*/ + + diff --git a/native/src/snappyclient/cpp/impl/ControlConnection.h b/native/src/snappyclient/cpp/impl/ControlConnection.h index 3fe3b97e6..e85f8c56c 100644 --- a/native/src/snappyclient/cpp/impl/ControlConnection.h +++ b/native/src/snappyclient/cpp/impl/ControlConnection.h @@ -36,12 +36,19 @@ #ifndef CONTROLCONNECTION_H_ #define CONTROLCONNECTION_H_ -#include "ClientService.h" +#include "ClientService.h" #include - +#include #include "../thrift/LocatorService.h" +//-----------namespaces----- + +using namespace apache::thrift; +using namespace apache::thrift::transport; +using namespace apache::thrift::protocol; +using namespace io::snappydata; + namespace std { template<> struct hash { @@ -57,64 +64,76 @@ namespace std { } namespace io { -namespace snappydata { -namespace client { - -namespace impl { - - /** - * Holds locator, server information to use for failover. Also provides - * convenience methods to actually search for an appropriate host for - * failover. - *

- * One distributed system is supposed to have one ControlConnection. - */ - class ControlConnection { - private: - const thrift::ServerType m_snappyServerType; - //const SSLSocketParameters& m_sslParams; - const std::set m_snappyServerTypeSet; - const std::vector& m_locators; - thrift::HostAddress m_controlHost; - std::unique_ptr m_controlLocator; - const std::vector m_controlHosts; - const std::unordered_set m_controlHostSet; - const std::set& m_serverGroups; - - const boost::mutex m_lock; - - /** - * Since one DS is supposed to have one ControlConnection, so we expect the - * total size of this static global list to be small. - */ - static const std::vector s_allConnections; - /** Global lock for {@link allConnections} */ - static const boost::mutex s_allConnsLock; - - void failoverToAvailableHost(std::set& failedServers, - const std::exception* failure); - - void refreshAllHosts(const std::vector& allHosts); - - const thrift::SnappyException& unexpectedError( - const std::exception& e, const thrift::HostAddress& host); - - void failoverExhausted(const std::exception& cause, - thrift::SnappyException& result); - - public: - ControlConnection(const ClientService& service); - - static const ControlConnection& getOrCreateControlConnection( - const thrift::HostAddress& hostAddr, const ClientService& service); - - const thrift::HostAddress& getPreferredServer( - std::set& failedServers, bool forFailover); - }; - -} /* namespace impl */ -} /* namespace client */ -} /* namespace snappydata */ + namespace snappydata { + namespace client { + namespace impl { + + /** + * Holds locator, server information to use for failover. Also provides + * convenience methods to actually search for an appropriate host for + * failover. + *

+ * One distributed system is supposed to have one ControlConnection. + */ + class ControlConnection { + private: + /**********Data members********/ + static thrift::ServerType::type m_snappyServerType; + //const SSLSocketParameters& m_sslParams; + static std::set m_snappyServerTypeSet; + static std::vector m_locators; + static thrift::HostAddress m_controlHost; + static std::unique_ptr m_controlLocator; + static std::unordered_set m_controlHostSet; + const std::set& m_serverGroups; + + boost::mutex m_lock; + static bool m_framedTransport; + /** + * Since one DS is supposed to have one ControlConnection, so we expect the + * total size of this static global list to be small. + */ + static std::vector > s_allConnections; + /** Global lock for {@link allConnections} */ + static boost::mutex s_allConnsLock; + + /*********Member functions**************/ + ControlConnection():m_serverGroups(std::set()){}; + ControlConnection(ClientService *const &service); + + static void failoverToAvailableHost(std::set& failedServers, + bool checkFailedControlHosts, std::exception* failure); + // + static void refreshAllHosts(const std::vector& allHosts); + // + static const thrift::SnappyException* unexpectedError( + const std::exception& e, const thrift::HostAddress& host); + + static thrift::SnappyException* failoverExhausted(const std::set& failedServers, + std::exception* failure); + + static void getLocatorPreferredServer(thrift::HostAddress& prefHostAddr,std::set& failedServers, + std::setserverGroups); + + + public: + + static const boost::optional getOrCreateControlConnection( + const std::vector& hostAddrs, ClientService *const &service, std::exception* failure); + + void getPreferredServer(thrift::HostAddress& preferredServer,std::exception* failure, + std::set& failedServers, + std::set& serverGroups,bool forFailover = false); + + void getPreferredServer(thrift::HostAddress& preferredServer,std::exception* failure, + bool forFailover = false); + void searchRandomServer(const std::set& skipServers, + std::exception* failure,thrift::HostAddress& hostAddress); + }; + + } /* namespace impl */ + } /* namespace client */ + } /* namespace snappydata */ } /* namespace io */ #endif /* CONTROLCONNECTION_H_ */ From 61a3b4c6e2991c0256902b55bd54cbbbcbdb1ba9 Mon Sep 17 00:00:00 2001 From: Piyush Bisen Date: Wed, 17 Oct 2018 13:14:45 +0530 Subject: [PATCH 02/11] code refactoring: 1-made overloaded getPreferredServer method private. --- .../snappyclient/cpp/impl/ClientService.cpp | 20 +++++------ .../cpp/impl/ControlConnection.cpp | 34 +++++++++---------- .../snappyclient/cpp/impl/ControlConnection.h | 6 ++-- 3 files changed, 29 insertions(+), 31 deletions(-) diff --git a/native/src/snappyclient/cpp/impl/ClientService.cpp b/native/src/snappyclient/cpp/impl/ClientService.cpp index 52e555e25..aed6a579f 100644 --- a/native/src/snappyclient/cpp/impl/ClientService.cpp +++ b/native/src/snappyclient/cpp/impl/ClientService.cpp @@ -289,14 +289,14 @@ void ClientService::setPendingTransactionAttrs( // settings; this could become configurable in future ClientService::ClientService(const std::string& host, const int port, thrift::OpenConnectionArgs& connArgs) : - // default for load-balance is true - m_connArgs(initConnectionArgs(connArgs)), m_loadBalance(true), - m_reqdServerType(thrift::ServerType::THRIFT_SNAPPY_CP), - m_useFramedTransport(false), m_serverGroups(), - m_transport(), m_client(createDummyProtocol()), - m_connHosts(1), m_connId(0), m_token(), m_isOpen(false), - m_pendingTXAttrs(), m_hasPendingTXAttrs(false), - m_isolationLevel(IsolationLevel::NONE), m_lock() { + // default for load-balance is true + m_connArgs(initConnectionArgs(connArgs)), m_loadBalance(true), + m_reqdServerType(thrift::ServerType::THRIFT_SNAPPY_CP), + m_useFramedTransport(false), m_serverGroups(), + m_transport(), m_client(createDummyProtocol()), + m_connHosts(1), m_connId(0), m_token(), m_isOpen(false), + m_pendingTXAttrs(), m_hasPendingTXAttrs(false), + m_isolationLevel(IsolationLevel::NONE), m_lock() { std::map& props = connArgs.properties; std::map::iterator propValue; @@ -387,10 +387,10 @@ void ClientService::openConnection(thrift::HostAddress& hostAddr, while (true) { if (m_loadBalance) { - boost::optional controlService = ControlConnection::getOrCreateControlConnection(m_connHosts,this,nullptr); - // at this point query the control service for preferred server boost::mutex mutex; boost::lock_guard serviceGuard(mutex); + boost::optional controlService = ControlConnection::getOrCreateControlConnection(m_connHosts,this,nullptr); + // at this point query the control service for preferred server controlService->getPreferredServer(hostAddr ,nullptr,failedServers,this->m_serverGroups, false); } diff --git a/native/src/snappyclient/cpp/impl/ControlConnection.cpp b/native/src/snappyclient/cpp/impl/ControlConnection.cpp index f4c3ccff8..36e513b82 100644 --- a/native/src/snappyclient/cpp/impl/ControlConnection.cpp +++ b/native/src/snappyclient/cpp/impl/ControlConnection.cpp @@ -86,24 +86,25 @@ const boost::optional ControlConnection::getOrCreateControlC boost::lock_guard globalGuard(s_allConnsLock); signed short index = static_cast(s_allConnections.size()); while (--index >= 0) { - const std::unique_ptr& controlService = s_allConnections.at(index); + const std::unique_ptr& controlConn = s_allConnections.at(index); - boost::lock_guard serviceGuard(controlService->m_lock); + boost::lock_guard serviceGuard(controlConn->m_lock); + std::vector _locators = controlConn->m_locators; for(thrift::HostAddress hostAddr : hostAddrs){ - auto result = std::find(m_locators.begin(),m_locators.end(),hostAddr); - if(result == m_locators.end()){ + auto result = std::find(_locators.begin(),_locators.end(),hostAddr); + if(result == _locators.end()){ continue; } - auto serviceServerType = service->getServerType(true,false,false); - auto contrServiceServerType = controlService->m_snappyServerType; - if(contrServiceServerType == serviceServerType){ - return *controlService; + auto serviceServerType = service->getServerType(true,false,false); // TODO: need to discuss with sumedh about this getServerType method + auto contrConnServerType = controlConn->m_snappyServerType; + if(contrConnServerType == serviceServerType){ + return *controlConn; }else{ thrift::SnappyException *ex = new thrift::SnappyException(); std::string portStr; Utils::convertIntToString(hostAddr.port,portStr); std::string msg= hostAddr.hostName + ":" + portStr + - " as registered but having different type " + Utils::getServerTypeString(contrServiceServerType) + + " as registered but having different type " + Utils::getServerTypeString(contrConnServerType) + " than connection " + Utils::getServerTypeString( serviceServerType) ; SnappyExceptionData snappyExData; // snappyExData.__set_sqlState("08006.C");// TODO: discuss with sumedh about correct SQLState @@ -122,16 +123,13 @@ const boost::optional ControlConnection::getOrCreateControlC // check again if new control host already exist index = static_cast(s_allConnections.size()); while (--index >= 0) { - const std::unique_ptr& controlService = s_allConnections.at(index); - boost::lock_guard serviceGuard(controlService->m_lock); - auto result = std::find(m_locators.begin(),m_locators.end(),preferredServer); - if(result == m_locators.end()){ - return *controlService; + const std::unique_ptr& controlConn = s_allConnections.at(index); + boost::lock_guard serviceGuard(controlConn->m_lock); + std::vector _locators = controlConn->m_locators; + auto result = std::find(_locators.begin(),_locators.end(),preferredServer); + if(result == _locators.end()){ + return *controlConn; } - // if (controlService->m_controlHostSet.find(preferredServer) - // != controlService->m_controlHostSet.end()) { - // return *controlService; - // } } s_allConnections.push_back(std::move(controlService)); return *s_allConnections.back(); diff --git a/native/src/snappyclient/cpp/impl/ControlConnection.h b/native/src/snappyclient/cpp/impl/ControlConnection.h index e85f8c56c..83c869624 100644 --- a/native/src/snappyclient/cpp/impl/ControlConnection.h +++ b/native/src/snappyclient/cpp/impl/ControlConnection.h @@ -115,7 +115,8 @@ namespace io { static void getLocatorPreferredServer(thrift::HostAddress& prefHostAddr,std::set& failedServers, std::setserverGroups); - + void getPreferredServer(thrift::HostAddress& preferredServer,std::exception* failure, + bool forFailover = false); public: static const boost::optional getOrCreateControlConnection( @@ -125,8 +126,7 @@ namespace io { std::set& failedServers, std::set& serverGroups,bool forFailover = false); - void getPreferredServer(thrift::HostAddress& preferredServer,std::exception* failure, - bool forFailover = false); + void searchRandomServer(const std::set& skipServers, std::exception* failure,thrift::HostAddress& hostAddress); }; From 84014d25dc975d58e747b0f8abc5c1124bc6ff51 Mon Sep 17 00:00:00 2001 From: Piyush Bisen Date: Mon, 29 Oct 2018 15:46:18 +0530 Subject: [PATCH 03/11] Used data member boost::mutex m_lock, instead of local mutex variable --- native/src/snappyclient/cpp/impl/ClientService.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/native/src/snappyclient/cpp/impl/ClientService.cpp b/native/src/snappyclient/cpp/impl/ClientService.cpp index aed6a579f..ea0aeacf9 100644 --- a/native/src/snappyclient/cpp/impl/ClientService.cpp +++ b/native/src/snappyclient/cpp/impl/ClientService.cpp @@ -387,8 +387,7 @@ void ClientService::openConnection(thrift::HostAddress& hostAddr, while (true) { if (m_loadBalance) { - boost::mutex mutex; - boost::lock_guard serviceGuard(mutex); + boost::lock_guard serviceGuard(m_lock); boost::optional controlService = ControlConnection::getOrCreateControlConnection(m_connHosts,this,nullptr); // at this point query the control service for preferred server controlService->getPreferredServer(hostAddr ,nullptr,failedServers,this->m_serverGroups, false); From 77983ff08de1d88a4b44fe23c42941b2ce72423d Mon Sep 17 00:00:00 2001 From: Piyush Bisen Date: Tue, 30 Oct 2018 11:13:39 +0530 Subject: [PATCH 04/11] Fixed the issue related LoadBalance property True all the time, even though setting LoadBalance property false in connection string --- native/src/snappyclient/cpp/impl/ClientService.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/native/src/snappyclient/cpp/impl/ClientService.cpp b/native/src/snappyclient/cpp/impl/ClientService.cpp index ea0aeacf9..05e97c75b 100644 --- a/native/src/snappyclient/cpp/impl/ClientService.cpp +++ b/native/src/snappyclient/cpp/impl/ClientService.cpp @@ -308,7 +308,7 @@ ClientService::ClientService(const std::string& host, const int port, if (!props.empty()) { if ((propValue = props.find(ClientAttribute::LOAD_BALANCE)) != props.end()) { - m_loadBalance = boost::iequals("false", propValue->second); + m_loadBalance = !(boost::iequals("false", propValue->second)); props.erase(propValue); } From ac438092de594afaa2d7de71f52fb058cec07f9f Mon Sep 17 00:00:00 2001 From: Sumedh Wale Date: Mon, 19 Nov 2018 19:50:59 +0530 Subject: [PATCH 05/11] Add support for newer Ubuntu/Mint --- native/build.gradle | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/native/build.gradle b/native/build.gradle index b18649555..eb2611edd 100644 --- a/native/build.gradle +++ b/native/build.gradle @@ -79,8 +79,9 @@ if ((rootProject.name.contains('native') && !rootProject.hasProperty('skipNative "boost-${boostVersion}-ubuntu14.04.tar.bz2", "unixODBC-${unixodbcVersion}-ubuntu14.04.tar.bz2", "googletest-${googletestVersion}-ubuntu14.04.tar.bz2" ] - } else if (flavour.startsWith('ubuntu16') || flavour.startsWith('linuxmint18') || - flavour.startsWith('elementary0.4')) { + } else if (flavour.startsWith('ubuntu16') || flavour.startsWith('ubuntu17') || flavour.startsWith('ubuntu18') || + flavour.startsWith('linuxmint18') || flavour.startsWith('linuxmint19') || + flavour.startsWith('elementary0.4') || flavour.startsWith('elementary5')) { thriftVersion = newThriftVersion boostVersion = newBoostVersion unixodbcVersion = newUnixodbcVersion From 41d461a09505967dd4ddc554e77c080006e77e70 Mon Sep 17 00:00:00 2001 From: Sumedh Wale Date: Mon, 19 Nov 2018 19:52:11 +0530 Subject: [PATCH 06/11] minor update to project files --- native/.settings/language.settings.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/native/.settings/language.settings.xml b/native/.settings/language.settings.xml index 192438689..f43c01d7c 100644 --- a/native/.settings/language.settings.xml +++ b/native/.settings/language.settings.xml @@ -5,7 +5,7 @@ - + @@ -16,7 +16,7 @@ - + From a9db6e9810909d4cac258c6d873e7e19a1cc7662 Mon Sep 17 00:00:00 2001 From: Piyush Bisen Date: Thu, 22 Nov 2018 14:44:13 +0530 Subject: [PATCH 07/11] Resolved the Segmentation fault, with multiple threads --- .../snappyclient/cpp/impl/ClientService.cpp | 4 +- .../cpp/impl/ControlConnection.cpp | 39 +++++++------------ .../snappyclient/cpp/impl/ControlConnection.h | 32 +++++++-------- 3 files changed, 31 insertions(+), 44 deletions(-) diff --git a/native/src/snappyclient/cpp/impl/ClientService.cpp b/native/src/snappyclient/cpp/impl/ClientService.cpp index 05e97c75b..3f8f97929 100644 --- a/native/src/snappyclient/cpp/impl/ClientService.cpp +++ b/native/src/snappyclient/cpp/impl/ClientService.cpp @@ -383,11 +383,9 @@ void ClientService::openConnection(thrift::HostAddress& hostAddr, m_connId, m_token, ex.get()); } - m_currentHostAddr = hostAddr; while (true) { - + boost::lock_guard serviceGuard(m_lock); if (m_loadBalance) { - boost::lock_guard serviceGuard(m_lock); boost::optional controlService = ControlConnection::getOrCreateControlConnection(m_connHosts,this,nullptr); // at this point query the control service for preferred server controlService->getPreferredServer(hostAddr ,nullptr,failedServers,this->m_serverGroups, false); diff --git a/native/src/snappyclient/cpp/impl/ControlConnection.cpp b/native/src/snappyclient/cpp/impl/ControlConnection.cpp index 36e513b82..df9082393 100644 --- a/native/src/snappyclient/cpp/impl/ControlConnection.cpp +++ b/native/src/snappyclient/cpp/impl/ControlConnection.cpp @@ -60,13 +60,6 @@ using namespace io::snappydata::thrift; //----private static data member initialiazation---- std::vector > ControlConnection::s_allConnections; boost::mutex ControlConnection::s_allConnsLock; -std::unique_ptr ControlConnection::m_controlLocator = nullptr; -thrift::HostAddress ControlConnection::m_controlHost; -std::unordered_set ControlConnection::m_controlHostSet; -thrift::ServerType::type ControlConnection::m_snappyServerType; -std::vector ControlConnection::m_locators; -bool ControlConnection::m_framedTransport; -std::set ControlConnection::m_snappyServerTypeSet; ControlConnection::ControlConnection(ClientService *const &service) :m_serverGroups(service->getServerGrps()){ m_locators= service->getLocators(); @@ -75,6 +68,7 @@ ControlConnection::ControlConnection(ClientService *const &service) :m_serverGro m_controlHost= service->getCurrentHostAddress(); boost::assign::insert(m_snappyServerTypeSet)(service->getServerType(true,false,false)); std::copy(m_locators.begin(),m_locators.end(),std::inserter(m_controlHostSet,m_controlHostSet.end())); + m_controlLocator = nullptr; } const boost::optional ControlConnection::getOrCreateControlConnection( @@ -84,6 +78,7 @@ const boost::optional ControlConnection::getOrCreateControlC // expected to be in single digit (total number of distributed systems) boost::lock_guard globalGuard(s_allConnsLock); + signed short index = static_cast(s_allConnections.size()); while (--index >= 0) { const std::unique_ptr& controlConn = s_allConnections.at(index); @@ -148,26 +143,28 @@ void ControlConnection::getPreferredServer(thrift::HostAddress& preferredServer, void ControlConnection::getPreferredServer(thrift::HostAddress& preferredServer,std::exception* failure, std::set& failedServers, std::set& serverGroups,bool forFailover){ - + //boost::lock_guard localGuard(m_lock); if(m_controlLocator == nullptr) { failoverToAvailableHost(failedServers, false,failure); forFailover = true; } - bool firstCall = true; while(true){ + boost::lock_guard localGuard(m_lock); try{ if(forFailover){ //TODO: SanityManager - //refresh the full host list std::vector prefServerAndAllHosts; m_controlLocator->getAllServersWithPreferredServer(prefServerAndAllHosts,m_snappyServerTypeSet,serverGroups,failedServers); - //TODO :: refresh new server list--like java do. - refreshAllHosts(prefServerAndAllHosts); + if(! prefServerAndAllHosts.empty()) + { + std::vector allHosts(prefServerAndAllHosts.begin() +1,prefServerAndAllHosts.end()); + refreshAllHosts(allHosts); preferredServer = prefServerAndAllHosts.at(0); + } //TODO :SanityManger }else{ getLocatorPreferredServer(preferredServer,failedServers,serverGroups); @@ -197,9 +194,6 @@ void ControlConnection::getPreferredServer(thrift::HostAddress& preferredServer, }catch(TException &tex){ // TODO: SanityManager //Search for a new host for locator query - if(failedServers.empty()){ - // TODO - } // for the first call do not mark controlhost as failed but retry(e.g. for a reconnect case) if(firstCall){ firstCall = false; @@ -243,13 +237,13 @@ void ControlConnection::searchRandomServer(const std::set& } void ControlConnection::failoverToAvailableHost(std::set& failedServers,bool checkFailedControlHosts, std::exception* failure){ - - for(auto iterator = m_controlHostSet.begin();iterator!= m_controlHostSet.end(); ++iterator ){ - thrift::HostAddress controlAddr = *iterator; + boost::lock_guard localGuard(m_lock); + //for(auto iterator = m_controlHostSet.begin();iterator!= m_controlHostSet.end(); ++iterator ){ + NEXT: for(thrift::HostAddress controlAddr : m_controlHostSet){ + //thrift::HostAddress controlAddr = *iterator; if(checkFailedControlHosts && ! failedServers.empty() && (failedServers.find(controlAddr) != failedServers.end())){ continue; } - //m_controlHost = nullptr; m_controlLocator.reset(nullptr); boost::shared_ptr inTransport =nullptr; @@ -311,7 +305,8 @@ void ControlConnection::failoverToAvailableHost(std::set& f if(outTransport != nullptr){ outTransport->close(); } - continue; + //continue; + goto NEXT; }catch(std::exception &ex){ throw unexpectedError(ex,controlAddr); } @@ -376,10 +371,6 @@ void ControlConnection::refreshAllHosts(const std::vector& thrift::SnappyException* ControlConnection::failoverExhausted(const std::set& failedServers, std::exception* failure) { - // thrift::HostAddress host; - // if(! m_locators.empty()){ - // host = m_locators.at(0); - // } std::string failedServerString; for(thrift::HostAddress host : failedServers){ std::string portStr; diff --git a/native/src/snappyclient/cpp/impl/ControlConnection.h b/native/src/snappyclient/cpp/impl/ControlConnection.h index 83c869624..36e53fe27 100644 --- a/native/src/snappyclient/cpp/impl/ControlConnection.h +++ b/native/src/snappyclient/cpp/impl/ControlConnection.h @@ -78,17 +78,17 @@ namespace io { class ControlConnection { private: /**********Data members********/ - static thrift::ServerType::type m_snappyServerType; + thrift::ServerType::type m_snappyServerType; //const SSLSocketParameters& m_sslParams; - static std::set m_snappyServerTypeSet; - static std::vector m_locators; - static thrift::HostAddress m_controlHost; - static std::unique_ptr m_controlLocator; - static std::unordered_set m_controlHostSet; + std::set m_snappyServerTypeSet; + std::vector m_locators; + thrift::HostAddress m_controlHost; + std::unique_ptr m_controlLocator; + std::unordered_set m_controlHostSet; const std::set& m_serverGroups; boost::mutex m_lock; - static bool m_framedTransport; + bool m_framedTransport; /** * Since one DS is supposed to have one ControlConnection, so we expect the * total size of this static global list to be small. @@ -101,22 +101,20 @@ namespace io { ControlConnection():m_serverGroups(std::set()){}; ControlConnection(ClientService *const &service); - static void failoverToAvailableHost(std::set& failedServers, + void failoverToAvailableHost(std::set& failedServers, bool checkFailedControlHosts, std::exception* failure); - // - static void refreshAllHosts(const std::vector& allHosts); - // - static const thrift::SnappyException* unexpectedError( - const std::exception& e, const thrift::HostAddress& host); - static thrift::SnappyException* failoverExhausted(const std::set& failedServers, - std::exception* failure); + void refreshAllHosts(const std::vector& allHosts); - static void getLocatorPreferredServer(thrift::HostAddress& prefHostAddr,std::set& failedServers, + const thrift::SnappyException* unexpectedError(const std::exception& e, const thrift::HostAddress& host); + + thrift::SnappyException* failoverExhausted(const std::set& failedServers,std::exception* failure); + + void getLocatorPreferredServer(thrift::HostAddress& prefHostAddr,std::set& failedServers, std::setserverGroups); void getPreferredServer(thrift::HostAddress& preferredServer,std::exception* failure, - bool forFailover = false); + bool forFailover = false); public: static const boost::optional getOrCreateControlConnection( From 2964ea9e726b262ece7cf39a3b0b42688b0d9a66 Mon Sep 17 00:00:00 2001 From: Piyush Bisen Date: Fri, 23 Nov 2018 10:44:42 +0530 Subject: [PATCH 08/11] Code refactored --- native/src/snappyclient/cpp/impl/ControlConnection.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/native/src/snappyclient/cpp/impl/ControlConnection.cpp b/native/src/snappyclient/cpp/impl/ControlConnection.cpp index df9082393..cd6f3f184 100644 --- a/native/src/snappyclient/cpp/impl/ControlConnection.cpp +++ b/native/src/snappyclient/cpp/impl/ControlConnection.cpp @@ -149,9 +149,10 @@ void ControlConnection::getPreferredServer(thrift::HostAddress& preferredServer, failoverToAvailableHost(failedServers, false,failure); forFailover = true; } + boost::lock_guard localGuard(m_lock); bool firstCall = true; while(true){ - boost::lock_guard localGuard(m_lock); + try{ if(forFailover){ //TODO: SanityManager From 79a7ee1894216853a75350bb4f2c4126f9653624 Mon Sep 17 00:00:00 2001 From: Piyush Bisen Date: Wed, 12 Dec 2018 19:25:30 +0530 Subject: [PATCH 09/11] Code refactored --- native/.settings/language.settings.xml | 4 ++-- native/src/snappyclient/cpp/impl/ControlConnection.cpp | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/native/.settings/language.settings.xml b/native/.settings/language.settings.xml index f43c01d7c..43d9bb4d8 100644 --- a/native/.settings/language.settings.xml +++ b/native/.settings/language.settings.xml @@ -5,7 +5,7 @@ - + @@ -16,7 +16,7 @@ - + diff --git a/native/src/snappyclient/cpp/impl/ControlConnection.cpp b/native/src/snappyclient/cpp/impl/ControlConnection.cpp index cd6f3f184..2438a116a 100644 --- a/native/src/snappyclient/cpp/impl/ControlConnection.cpp +++ b/native/src/snappyclient/cpp/impl/ControlConnection.cpp @@ -239,9 +239,9 @@ void ControlConnection::searchRandomServer(const std::set& void ControlConnection::failoverToAvailableHost(std::set& failedServers,bool checkFailedControlHosts, std::exception* failure){ boost::lock_guard localGuard(m_lock); - //for(auto iterator = m_controlHostSet.begin();iterator!= m_controlHostSet.end(); ++iterator ){ - NEXT: for(thrift::HostAddress controlAddr : m_controlHostSet){ - //thrift::HostAddress controlAddr = *iterator; + for(auto iterator = m_controlHostSet.begin();iterator!= m_controlHostSet.end(); ++iterator ){ + // NEXT: for(thrift::HostAddress controlAddr : m_controlHostSet){ + thrift::HostAddress controlAddr = *iterator; if(checkFailedControlHosts && ! failedServers.empty() && (failedServers.find(controlAddr) != failedServers.end())){ continue; } @@ -306,8 +306,8 @@ void ControlConnection::failoverToAvailableHost(std::set& f if(outTransport != nullptr){ outTransport->close(); } - //continue; - goto NEXT; + continue; + //goto NEXT; }catch(std::exception &ex){ throw unexpectedError(ex,controlAddr); } From e0c101cb978b10089b8b4bb63bfd2bc427ce357c Mon Sep 17 00:00:00 2001 From: Piyush Bisen Date: Thu, 27 Dec 2018 14:28:54 +0530 Subject: [PATCH 10/11] added getFailOverStatus method --- native/src/snappyclient/cpp/SQLState.cpp | 2 + .../snappyclient/cpp/impl/ClientService.cpp | 7 -- .../cpp/impl/ControlConnection.cpp | 86 +++++++++++++------ .../snappyclient/cpp/impl/ControlConnection.h | 13 +++ native/src/snappyclient/headers/SQLState.h | 5 ++ 5 files changed, 80 insertions(+), 33 deletions(-) diff --git a/native/src/snappyclient/cpp/SQLState.cpp b/native/src/snappyclient/cpp/SQLState.cpp index 5fbbc3c68..08d9b30ba 100644 --- a/native/src/snappyclient/cpp/SQLState.cpp +++ b/native/src/snappyclient/cpp/SQLState.cpp @@ -131,6 +131,8 @@ const SQLState SQLState::DATA_CONTAINER_CLOSED("40XD0", ExceptionSeverity::TRANSACTION_SEVERITY); const SQLState SQLState::THRIFT_PROTOCOL_ERROR("58015", ExceptionSeverity::SESSION_SEVERITY); +const SQLState SQLState::NODE_BUCKET_MOVED("X0Z18", + ExceptionSeverity::TRANSACTION_SEVERITY); void SQLState::staticInitialize() { } diff --git a/native/src/snappyclient/cpp/impl/ClientService.cpp b/native/src/snappyclient/cpp/impl/ClientService.cpp index 3f8f97929..58b64a0c1 100644 --- a/native/src/snappyclient/cpp/impl/ClientService.cpp +++ b/native/src/snappyclient/cpp/impl/ClientService.cpp @@ -239,13 +239,6 @@ void ClientService::handleTException(const char* op, const TException& te) { handleStdException(op, te); } -/* -void ClientService::handleException(const TException* te, - const std::set& failedServers, bool tryFailover, bool ignoreFailOver, - bool createNewConnection, const std::string& op) -{ - -}*/ void ClientService::throwSQLExceptionForNodeFailure(const char* op, const std::exception& se) { diff --git a/native/src/snappyclient/cpp/impl/ControlConnection.cpp b/native/src/snappyclient/cpp/impl/ControlConnection.cpp index 2438a116a..3a30805a0 100644 --- a/native/src/snappyclient/cpp/impl/ControlConnection.cpp +++ b/native/src/snappyclient/cpp/impl/ControlConnection.cpp @@ -69,6 +69,12 @@ ControlConnection::ControlConnection(ClientService *const &service) :m_serverGro boost::assign::insert(m_snappyServerTypeSet)(service->getServerType(true,false,false)); std::copy(m_locators.begin(),m_locators.end(),std::inserter(m_controlHostSet,m_controlHostSet.end())); m_controlLocator = nullptr; + + //initliaze failoverSQLStateSet + short arrSize = sizeof(failoverSQLStateArray)/sizeof(failoverSQLStateArray[0]); + for(short i =0; i< arrSize;++i){ + failoverSQLStateSet.insert(failoverSQLStateArray[i]); + } } const boost::optional ControlConnection::getOrCreateControlConnection( @@ -102,8 +108,9 @@ const boost::optional ControlConnection::getOrCreateControlC " as registered but having different type " + Utils::getServerTypeString(contrConnServerType) + " than connection " + Utils::getServerTypeString( serviceServerType) ; SnappyExceptionData snappyExData; - // snappyExData.__set_sqlState("08006.C");// TODO: discuss with sumedh about correct SQLState + snappyExData.__set_sqlState("08006"); snappyExData.__set_reason(msg); + //snappyExData.__set_errorCode(17002); //TODO:: Need to confirm with sumedh ex->__set_exceptionData(snappyExData); ex->__set_serverInfo(hostAddr.hostName + ":" + portStr ); @@ -131,9 +138,7 @@ const boost::optional ControlConnection::getOrCreateControlC } void ControlConnection::getLocatorPreferredServer(thrift::HostAddress& prefHostAddr,std::set& failedServers, std::setserverGroups){ - // TODO: SanityManager m_controlLocator->getPreferredServer(prefHostAddr,m_snappyServerTypeSet,serverGroups,failedServers); - //TODO:SanityManager } void ControlConnection::getPreferredServer(thrift::HostAddress& preferredServer,std::exception* failure,bool forFailover){ std::set failedServers; @@ -143,7 +148,6 @@ void ControlConnection::getPreferredServer(thrift::HostAddress& preferredServer, void ControlConnection::getPreferredServer(thrift::HostAddress& preferredServer,std::exception* failure, std::set& failedServers, std::set& serverGroups,bool forFailover){ - //boost::lock_guard localGuard(m_lock); if(m_controlLocator == nullptr) { failoverToAvailableHost(failedServers, false,failure); @@ -155,23 +159,20 @@ void ControlConnection::getPreferredServer(thrift::HostAddress& preferredServer, try{ if(forFailover){ - //TODO: SanityManager //refresh the full host list std::vector prefServerAndAllHosts; m_controlLocator->getAllServersWithPreferredServer(prefServerAndAllHosts,m_snappyServerTypeSet,serverGroups,failedServers); - //TODO :: refresh new server list--like java do. if(! prefServerAndAllHosts.empty()) { std::vector allHosts(prefServerAndAllHosts.begin() +1,prefServerAndAllHosts.end()); refreshAllHosts(allHosts); preferredServer = prefServerAndAllHosts.at(0); } - //TODO :SanityManger }else{ getLocatorPreferredServer(preferredServer,failedServers,serverGroups); } - // TODO: SanityManager if(preferredServer.port <=0){ + /*For this case we don't have a locator or locator unable to * determine a preferred server, so choose some server randomly * as the "preferredServer". In case all servers have failed then @@ -179,6 +180,7 @@ void ControlConnection::getPreferredServer(thrift::HostAddress& preferredServer, * Remove controlHost from failedServers since it is known to be * working at this point (e.g after a reconnect) * */ + std::set skipServers = failedServers; if( !failedServers.empty() && std::find(failedServers.begin(),failedServers.end(),m_controlHost)!= failedServers.end()){ //don't change the original failure list since that is proper @@ -187,13 +189,16 @@ void ControlConnection::getPreferredServer(thrift::HostAddress& preferredServer, } searchRandomServer(skipServers, failure,preferredServer); } - //TODO: Sanitymanger return; }catch(thrift::SnappyException &snEx){ - // TODO: - //Discuss with Sumedh + FailoverStatus status = getFailoverStatus(snEx.exceptionData.sqlState,snEx.exceptionData.errorCode,snEx); + if(status== FailoverStatus::NONE){ + throw snEx; + }else if(status== FailoverStatus::RETRY){ + forFailover = true; + continue; + } }catch(TException &tex){ - // TODO: SanityManager //Search for a new host for locator query // for the first call do not mark controlhost as failed but retry(e.g. for a reconnect case) if(firstCall){ @@ -204,7 +209,7 @@ void ControlConnection::getPreferredServer(thrift::HostAddress& preferredServer, m_controlLocator->getOutputProtocol()->getTransport()->close(); failoverToAvailableHost(failedServers,true,failure); if(failure ==nullptr){ - failure = &(tex);// TODO: need to look again + failure = &(tex); } }catch(std::exception &ex){ throw unexpectedError(ex, m_controlHost); @@ -216,7 +221,6 @@ void ControlConnection::getPreferredServer(thrift::HostAddress& preferredServer, void ControlConnection::searchRandomServer(const std::set& skipServers,std::exception* failure, thrift::HostAddress& hostAddress){ - //TODO: Need to discuss implemetation of this method and also ClientService:: updateFailedServersForCurrent with sumedh std::vector searchServers; // Note: Do not use unordered_set -- reason is http://www.cplusplus.com/forum/general/198319/ std::copy(m_controlHostSet.begin(),m_controlHostSet.end(),std::inserter(searchServers,searchServers.end())); @@ -240,7 +244,6 @@ void ControlConnection::failoverToAvailableHost(std::set& f std::exception* failure){ boost::lock_guard localGuard(m_lock); for(auto iterator = m_controlHostSet.begin();iterator!= m_controlHostSet.end(); ++iterator ){ - // NEXT: for(thrift::HostAddress controlAddr : m_controlHostSet){ thrift::HostAddress controlAddr = *iterator; if(checkFailedControlHosts && ! failedServers.empty() && (failedServers.find(controlAddr) != failedServers.end())){ continue; @@ -262,14 +265,13 @@ void ControlConnection::failoverToAvailableHost(std::set& f m_snappyServerType == thrift::ServerType::THRIFT_LOCATOR_CP_SSL || m_snappyServerType == thrift::ServerType::THRIFT_SNAPPY_BP_SSL || m_snappyServerType==thrift::ServerType::THRIFT_SNAPPY_CP_SSL){ - // TODO: Find out whether SnappyTSSLSocket is needed or not, or any other thing is required TSSLSocketFactory sslSocketFactory; tTransport = sslSocketFactory.createSocket(controlAddr.hostName,controlAddr.port); }else if(m_snappyServerType == thrift::ServerType::THRIFT_LOCATOR_BP || m_snappyServerType== thrift::ServerType::THRIFT_LOCATOR_CP || m_snappyServerType== thrift::ServerType::THRIFT_SNAPPY_BP || m_snappyServerType== thrift::ServerType::THRIFT_SNAPPY_CP){ - tTransport = boost::make_shared(controlAddr.hostName,controlAddr.port); // TODO: Find out whether SnappyTSocket is needed or not, or any other thing is required + tTransport = boost::make_shared(controlAddr.hostName,controlAddr.port); } tTransport->open(); TTransportFactory* transportFactory = nullptr; @@ -301,13 +303,12 @@ void ControlConnection::failoverToAvailableHost(std::set& f break; } }catch(TException &tExp){ - failure = &tExp; // TODO: need to look again + failure = &tExp; failedServers.insert(controlAddr); if(outTransport != nullptr){ outTransport->close(); } continue; - //goto NEXT; }catch(std::exception &ex){ throw unexpectedError(ex,controlAddr); } @@ -328,8 +329,9 @@ const thrift::SnappyException* ControlConnection:: unexpectedError(const std::ex } thrift::SnappyException *snappyEx = new thrift::SnappyException(); SnappyExceptionData snappyExData; - //snappyExData.__set_sqlState(std::string(SQLState::UNKNOWN_EXCEPTION));// TODO: discuss with sumedh about correct SQLState + snappyExData.__set_sqlState(std::string(SQLState::UNKNOWN_EXCEPTION.getSQLState())); snappyExData.__set_reason(ex.what()); + snappyEx->__set_exceptionData(snappyExData); std::string portNum; @@ -380,14 +382,46 @@ thrift::SnappyException* ControlConnection::failoverExhausted(const std::set__set_exceptionData(snappyExData); - //snappyExData.__set_sqlState();// - // TODO: complete this funtion snappyEx->__set_serverInfo(failedServerString); return snappyEx; } - +FailoverStatus ControlConnection::getFailoverStatus(const std::string& sqlState,const int32_t& errorCode, const TException& snappyEx){ + if(! std::strcmp(SQLState::SNAPPY_NODE_SHUTDOWN.getSQLState(),sqlState.c_str()) + || std::strcmp(SQLState::NODE_BUCKET_MOVED.getSQLState(),sqlState.c_str())){ + return FailoverStatus::RETRY; + } + /* for 08001 we have to, unfortunately, resort to string search to + * determine if failover makes sense or it is due to some problem + * with authentication or invalid properties */ + else if(!sqlState.compare("08001")){ + std::string msg(snappyEx.what()); + if(!msg.empty() && + ((msg.find("rror")!=std::string::npos) // cater to CONNECT_UNABLE_TO_CONNECT_TO_SERVER + || (msg.find("xception")!=std::string::npos ) // cater to CONNECT_SOCKET_EXCEPTION + ||(msg.find("ocket")!=std::string::npos))// cater to CONNECT_UNABLE_TO_OPEN_SOCKET_STREAM + ){ + return FailoverStatus::NEW_SERVER; + } + } + /* for 08004 we have to, unfortunately, resort to string search to + * determine if failover makes sense or it is due to some problem + * with authentication + */ + else if(!sqlState.compare("08004")){ + std::string msg(snappyEx.what()); + if(!msg.empty() && + (msg.find("connection refused") !=std::string::npos) + ){ + return FailoverStatus::NEW_SERVER; + } + } + else if(failoverSQLStateSet.find(sqlState)!= failoverSQLStateSet.end()){ + return FailoverStatus::NEW_SERVER; + } + return FailoverStatus::NONE; +} diff --git a/native/src/snappyclient/cpp/impl/ControlConnection.h b/native/src/snappyclient/cpp/impl/ControlConnection.h index 36e53fe27..1db06adf3 100644 --- a/native/src/snappyclient/cpp/impl/ControlConnection.h +++ b/native/src/snappyclient/cpp/impl/ControlConnection.h @@ -68,6 +68,11 @@ namespace io { namespace client { namespace impl { + enum class FailoverStatus :unsigned char{ + NONE, /** no failover to be done */ + NEW_SERVER, /** failover to a new server */ + RETRY /** retry to the same server */ + }; /** * Holds locator, server information to use for failover. Also provides * convenience methods to actually search for an appropriate host for @@ -97,6 +102,12 @@ namespace io { /** Global lock for {@link allConnections} */ static boost::mutex s_allConnsLock; + /** array of SQLState strings that denote failover should be done */ + std::string failoverSQLStateArray[23] = { "08001", + "08003", "08004", "08006", "X0J15", "X0Z32", "XN001", "XN014", "XN016", + "58009", "58014", "58015", "58016", "58017", "57017", "58010", "30021", + "XJ040", "XJ041", "XSDA3", "XSDA4", "XSDAJ", "XJ217" }; + std::set failoverSQLStateSet; /*********Member functions**************/ ControlConnection():m_serverGroups(std::set()){}; ControlConnection(ClientService *const &service); @@ -115,6 +126,8 @@ namespace io { void getPreferredServer(thrift::HostAddress& preferredServer,std::exception* failure, bool forFailover = false); + + FailoverStatus getFailoverStatus(const std::string& sqlState,const int32_t& errorCode, const TException& snappyEx); public: static const boost::optional getOrCreateControlConnection( diff --git a/native/src/snappyclient/headers/SQLState.h b/native/src/snappyclient/headers/SQLState.h index af498269b..7ec1081d7 100644 --- a/native/src/snappyclient/headers/SQLState.h +++ b/native/src/snappyclient/headers/SQLState.h @@ -377,6 +377,11 @@ namespace client { * protocol is detected (can be due to server failure). */ static const SQLState THRIFT_PROTOCOL_ERROR; + /** + * SQLState of the exception thrown when an error + * is detected due to bucket moved. + */ + static const SQLState NODE_BUCKET_MOVED; }; } /* namespace client */ From 72c82eb659cfc3d04ebd1f45188c67e32a20ee24 Mon Sep 17 00:00:00 2001 From: Piyush Bisen Date: Thu, 3 Jan 2019 17:31:43 +0530 Subject: [PATCH 11/11] Added support for toolchain vc141 --- native/build.gradle | 37 ++++++++++--------- .../src/snappyclient/cpp/types/DateTime.cpp | 20 +++++----- .../src/snappyclient/cpp/types/Timestamp.cpp | 8 ++-- 3 files changed, 34 insertions(+), 31 deletions(-) diff --git a/native/build.gradle b/native/build.gradle index eb2611edd..929a29825 100644 --- a/native/build.gradle +++ b/native/build.gradle @@ -14,11 +14,14 @@ * permissions and limitations under the License. See accompanying * LICENSE file. */ - -plugins { - id 'de.undercouch.download' version '3.4.3' +buildscript { + repositories { + maven { url 'https://plugins.gradle.org/m2' } + } + dependencies { + classpath 'de.undercouch:gradle-download-task:3.4.3' + } } - apply plugin: 'wrapper' if ((rootProject.name.contains('native') && !rootProject.hasProperty('skipNative')) @@ -38,16 +41,16 @@ if ((rootProject.name.contains('native') && !rootProject.hasProperty('skipNative String distDir = "${rootProject.projectDir}/dist" String thriftVersion = '1.0.0-1' - String boostVersion = '1.59.0' + String newThriftVersion = '1.0.0-2' + String boostVersion = '1.69.0' String mpirVersion = '2.7.2' String opensslVersion = '1.0.2h' String unixodbcVersion = '2.3.4' String googletestVersion = '1.7.0' - String oldReleasesDir = "https://github.com/SnappyDataInc/thrift/releases/download/${thriftVersion}" + String oldReleasesDir = "https://github.com/SnappyDataInc/thrift/releases/download/${newThriftVersion}" String releasesDir = oldReleasesDir - String newThriftVersion = '1.0.0-2' String newBoostVersion = '1.65.1' String newUnixodbcVersion = '2.3.4' String newGoogletestVersion = '1.8.0' @@ -134,7 +137,7 @@ if ((rootProject.name.contains('native') && !rootProject.hasProperty('skipNative dependsOn downloadDependencies inputs.files dependencies.collect { [ "${distDir}/${it}" ] }.flatten() - outputs.files dependencies.collect { [ "${distDir}/${it.replaceAll('-[^-]*.tar.bz2', '')}" ] }.flatten() + //outputs.files dependencies.collect { [ "${distDir}/${it.replaceAll('-[^-]*.tar.bz2', '')}" ] }.flatten() doLast { outputs.files.each { d -> delete d } @@ -273,15 +276,15 @@ if ((rootProject.name.contains('native') && !rootProject.hasProperty('skipNative '-rdynamic', '-ldl', '-z', 'defs' } else if (toolChain in VisualCpp) { // explicitly include UCRT since its still not passed through by gradle - String VS_2015_INCLUDE_DIR = 'C:/Program Files (x86)/Windows Kits/10/Include/10.0.10240.0/ucrt' - String VS_2015_LIB_DIR = 'C:/Program Files (x86)/Windows Kits/10/Lib/10.0.10240.0/ucrt' - String VS_2015_INCLUDE_DIR_UM = 'C:/Program Files (x86)/Windows Kits/10/Include/10.0.10240.0/um' - String VS_2015_LIB_DIR_UM = 'C:/Program Files (x86)/Windows Kits/10/Lib/10.0.10240.0/um' - String VS_2015_INCLUDE_DIR_SHARED = 'C:/Program Files (x86)/Windows Kits/10/Include/10.0.10240.0/shared' + String VS_2017_INCLUDE_DIR = 'C:/Program Files (x86)/Windows Kits/10/Include/10.0.17763.0/ucrt' + String VS_2017_LIB_DIR = 'C:/Program Files (x86)/Windows Kits/10/Lib/10.0.17763.0/ucrt' + String VS_2017_INCLUDE_DIR_UM = 'C:/Program Files (x86)/Windows Kits/10/Include/10.0.17763.0/um' + String VS_2017_LIB_DIR_UM = 'C:/Program Files (x86)/Windows Kits/10/Lib/10.0.17763.0/um' + String VS_2017_INCLUDE_DIR_SHARED = 'C:/Program Files (x86)/Windows Kits/10/Include/10.0.17763.0/shared' cppCompiler.args '/W3', '/FS', '/Zc:inline', '/WX', '/EHsc', '/Fdsnappyclient.pdb', '/errorReport:prompt', - "/I${VS_2015_INCLUDE_DIR_UM}", "/I${VS_2015_INCLUDE_DIR_SHARED}", - "/I${VS_2015_INCLUDE_DIR}" + "/I${VS_2017_INCLUDE_DIR_UM}", "/I${VS_2017_INCLUDE_DIR_SHARED}", ++ "/I${VS_2017_INCLUDE_DIR}" if (buildType == buildTypes.debug) { cppCompiler.define '_DEBUG' cppCompiler.args '/Od', '/Gm', '/ZI', '/RTC1', '/MDd' @@ -290,9 +293,9 @@ if ((rootProject.name.contains('native') && !rootProject.hasProperty('skipNative cppCompiler.args '/O2', '/GL', '/Oi', '/Gm-', '/Zi', '/MD' } if (targetPlatform == platforms.x64) { - linker.args "/LIBPATH:${VS_2015_LIB_DIR}/x64:${VS_2015_LIB_DIR_UM}/x64" + linker.args "/LIBPATH:${VS_2017_LIB_DIR}/x64:${VS_2017_LIB_DIR_UM}/x64" } else { - linker.args "/LIBPATH:${VS_2015_LIB_DIR}/x86:${VS_2015_LIB_DIR_UM}/x86" + linker.args "/LIBPATH:${VS_2017_LIB_DIR}/x86:${VS_2017_LIB_DIR_UM}/x86" } } } diff --git a/native/src/snappyclient/cpp/types/DateTime.cpp b/native/src/snappyclient/cpp/types/DateTime.cpp index fe34dfa71..a6927b2ba 100644 --- a/native/src/snappyclient/cpp/types/DateTime.cpp +++ b/native/src/snappyclient/cpp/types/DateTime.cpp @@ -116,8 +116,8 @@ DateTime DateTime::parseTime(const std::string& str, boost::posix_time::time_duration td = boost::posix_time::duration_from_string(str); boost::gregorian::date::ymd_type ymd = today.year_month_day(); - return DateTime(ymd.year, ymd.month, ymd.day, td.hours(), td.minutes(), - td.seconds(), utc); + return DateTime(static_cast( ymd.year), static_cast(ymd.month), static_cast( ymd.day), static_cast(td.hours()), static_cast(td.minutes()), + static_cast(td.seconds()), utc); } catch (const std::exception& e) { std::string err(str); err.append(": ").append(e.what()); @@ -144,8 +144,8 @@ DateTime DateTime::parseDateTime(const std::string& str, // call parse_time_duration with remaining string boost::posix_time::time_duration td = boost::posix_time::duration_from_string(timeStr); - return DateTime(ymd.year, ymd.month, ymd.day, td.hours(), td.minutes(), - td.seconds(), utc); + return DateTime(static_cast(ymd.year), static_cast(ymd.month), static_cast(ymd.day), static_cast(td.hours()), static_cast(td.minutes()), + static_cast(td.seconds()), utc); } catch (const std::exception& e) { std::string err(str); err.append(": ").append(e.what()); @@ -286,7 +286,7 @@ void DateTime::toTime(std::string& str, const bool utc) const { InternalUtils::convertEpochSecsToPosixTime(m_secsSinceEpoch); const auto td = dateTime.time_of_day(); - bufp = toTimeString(td.hours(), td.minutes(), td.seconds(), bufp); + bufp = toTimeString(static_cast(td.hours()), static_cast(td.minutes()), static_cast(td.seconds()), bufp); str.append(buf, bufp - &buf[0]); } else { boost::local_time::local_date_time localTime( @@ -294,7 +294,7 @@ void DateTime::toTime(std::string& str, const bool utc) const { InternalUtils::s_localTimeZone); const auto td = localTime.time_of_day(); - bufp = toTimeString(td.hours(), td.minutes(), td.seconds(), bufp); + bufp = toTimeString(static_cast(td.hours()), static_cast(td.minutes()), static_cast(td.seconds()), bufp); str.append(buf, bufp - &buf[0]); } } catch (const std::exception&) { @@ -311,8 +311,8 @@ void DateTime::toDateTime(std::string& str, const bool utc) const { auto ymd = dateTime.date().year_month_day(); auto td = dateTime.time_of_day(); - toString(uint16_t(ymd.year), ymd.month.as_number(), ymd.day.as_number(), - td.hours(), td.minutes(), td.seconds(), 0, str); + toString(static_cast(ymd.year), static_cast(ymd.month.as_number()), static_cast(ymd.day.as_number()), + static_cast(td.hours()), static_cast(td.minutes()), static_cast(td.seconds()), 0, str); } else { boost::local_time::local_date_time localTime( InternalUtils::convertEpochSecsToPosixTime(m_secsSinceEpoch), @@ -320,8 +320,8 @@ void DateTime::toDateTime(std::string& str, const bool utc) const { auto ymd = localTime.date().year_month_day(); auto td = localTime.time_of_day(); - toString(uint16_t(ymd.year), ymd.month.as_number(), ymd.day.as_number(), - td.hours(), td.minutes(), td.seconds(), 0, str); + toString(static_cast(ymd.year), ymd.month.as_number(), ymd.day.as_number(), + static_cast(td.hours()), static_cast(td.minutes()), static_cast(td.seconds()), 0, str); } } catch (const std::exception&) { throw GET_SQLEXCEPTION2(SQLStateMessage::LANG_DATE_RANGE_EXCEPTION_MSG1, diff --git a/native/src/snappyclient/cpp/types/Timestamp.cpp b/native/src/snappyclient/cpp/types/Timestamp.cpp index 80e4fbda5..9ffeec385 100644 --- a/native/src/snappyclient/cpp/types/Timestamp.cpp +++ b/native/src/snappyclient/cpp/types/Timestamp.cpp @@ -117,8 +117,8 @@ Timestamp Timestamp::parseString(const std::string& str, boost::date_time::parse_delimited_time_duration< _snappy_impl::nano_time_duration>(timeStr); // nanoseconds will lie within int32 limits - return Timestamp(ymd.year, ymd.month, ymd.day, td.hours(), td.minutes(), - td.seconds(), static_cast(td.fractional_seconds()), utc); + return Timestamp(static_cast(ymd.year), static_cast(ymd.month), static_cast(ymd.day), static_cast(td.hours()), static_cast(td.minutes()), + static_cast(td.seconds()), static_cast(td.fractional_seconds()), utc); } catch (const std::exception& e) { std::string err(str); err.append(": ").append(e.what()); @@ -142,7 +142,7 @@ std::string& Timestamp::toString(std::string& str, const bool utc) const { boost::posix_time::time_duration td = dateTime.time_of_day(); return DateTime::toString(uint16_t(ymd.year), ymd.month.as_number(), - ymd.day.as_number(), td.hours(), td.minutes(), td.seconds(), m_nanos, + ymd.day.as_number(), static_cast(td.hours()), static_cast(td.minutes()), static_cast(td.seconds()), m_nanos, str); } catch (const std::exception&) { throw GET_SQLEXCEPTION2(SQLStateMessage::LANG_DATE_RANGE_EXCEPTION_MSG1, @@ -189,7 +189,7 @@ std::ostream& operator <<(std::ostream& stream, Timestamp ts) { boost::posix_time::time_duration td = dateTime.time_of_day(); return DateTime::toString(uint16_t(ymd.year), ymd.month.as_number(), - ymd.day.as_number(), td.hours(), td.minutes(), td.seconds(), + static_cast(ymd.day.as_number()), static_cast(td.hours()), static_cast(td.minutes()), static_cast(td.seconds()), ts.getNanos(), stream); } catch (const std::exception&) { throw GET_SQLEXCEPTION2(SQLStateMessage::LANG_DATE_RANGE_EXCEPTION_MSG1,