Skip to content

Commit

Permalink
feat: add implementation for httpclient
Browse files Browse the repository at this point in the history
Signed-off-by: ik <hello_ik@outlook.com>
  • Loading branch information
hiiiik committed May 28, 2024
1 parent a33cc48 commit ea1acbe
Show file tree
Hide file tree
Showing 14 changed files with 929 additions and 16 deletions.
2 changes: 2 additions & 0 deletions include/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ else()
opengemini/impl/ErrorCode.cpp
opengemini/impl/comm/Context.cpp
opengemini/impl/http/IHttpClient.cpp
opengemini/impl/http/HttpClient.cpp
opengemini/impl/http/HttpsClient.cpp
)
opengemini_target_setting(Client PUBLIC)
endif()
Expand Down
79 changes: 79 additions & 0 deletions include/opengemini/impl/http/ConnectionPool.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
//
// Copyright 2024 Huawei Cloud Computing Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

#ifndef OPENGEMINI_IMPL_HTTP_CONNECTIONPOOL_HPP
#define OPENGEMINI_IMPL_HTTP_CONNECTIONPOOL_HPP

#include <memory>
#include <mutex>
#include <queue>
#include <unordered_map>

#include <boost/asio/spawn.hpp>
#include <boost/beast.hpp>

#include "opengemini/Endpoint.hpp"
#include "opengemini/impl/comm/TaskSlot.hpp"

namespace opengemini::impl::http {

template<typename STREAM>
struct Connection {
using Stream = STREAM;

Stream stream;
bool used;

bool ShouldRetry(boost::beast::error_code& error,
std::string_view what) const;

#if __cplusplus < 202002L
Connection(Stream _stream, bool _used);
#endif // (__cplusplus < 202002L)
};

template<typename DERIVED, typename STREAM>
class ConnectionPool : public TaskSlot {
public:
using ConnectionPtr = std::unique_ptr<Connection<STREAM>>;
using Stream = STREAM;

public:
ConnectionPool(boost::asio::io_context& ctx,
std::chrono::milliseconds connectTimeout,
std::size_t maxKeepalive = 3);

ConnectionPtr Retrieve(const Endpoint& endpoint,
boost::asio::yield_context yield);

void Push(const Endpoint& endpoint, ConnectionPtr connection);

protected:
const std::chrono::milliseconds connectTimeout_;

private:
std::unordered_map<Endpoint, std::queue<ConnectionPtr>, Endpoint::Hasher>
pool_;
std::mutex mutex_;

const std::size_t maxKeepalive_;
};

} // namespace opengemini::impl::http

#include "opengemini/impl/http/ConnectionPool.tpp"

#endif // !OPENGEMINI_IMPL_HTTP_CONNECTIONPOOL_HPP
79 changes: 79 additions & 0 deletions include/opengemini/impl/http/ConnectionPool.tpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
//
// Copyright 2024 Huawei Cloud Computing Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

#include "opengemini/impl/http/ConnectionPool.hpp"

#include "opengemini/Exception.hpp"

namespace opengemini::impl::http {

#if __cplusplus < 202002L
template<typename STREAM>
Connection<STREAM>::Connection(Stream _stream, bool _used) :
stream(std::move(_stream)),
used(_used)
{ }
#endif // (__cplusplus < 202002L)

template<typename STREAM>
bool Connection<STREAM>::ShouldRetry(boost::beast::error_code& error,
std::string_view what) const
{
if (!error) { return false; }
if (used) { return true; }
throw Exception(std::move(error), std::string(what));
}

template<typename DERIVED, typename STREAM>
ConnectionPool<DERIVED, STREAM>::ConnectionPool(
boost::asio::io_context& ctx,
std::chrono::milliseconds connectTimeout,
std::size_t maxKeepalive) :
TaskSlot(ctx),
connectTimeout_(connectTimeout),
maxKeepalive_(maxKeepalive)
{ }

template<typename DERIVED, typename STREAM>
typename ConnectionPool<DERIVED, STREAM>::ConnectionPtr
ConnectionPool<DERIVED, STREAM>::Retrieve(const Endpoint& endpoint,
boost::asio::yield_context yield)
{
{
std::lock_guard lock(mutex_);
if (auto& connections = pool_[endpoint]; !connections.empty()) {
auto connection = std::move(connections.front());
connections.pop();
return connection;
}
}

return static_cast<DERIVED*>(this)->CreateConnection(endpoint, yield);
}

template<typename DERIVED, typename STREAM>
void ConnectionPool<DERIVED, STREAM>::Push(const Endpoint& endpoint,
ConnectionPtr connection)
{
if (!connection->used) { connection->used = true; }

std::lock_guard lock(mutex_);
auto& connections = pool_[endpoint];
if (connections.size() >= maxKeepalive_) { return; }
pool_[endpoint].push(std::move(connection));
}

} // namespace opengemini::impl::http
106 changes: 106 additions & 0 deletions include/opengemini/impl/http/HttpClient.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
//
// Copyright 2024 Huawei Cloud Computing Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

#include "opengemini/impl/http/HttpClient.hpp"

#include "opengemini/Exception.hpp"
#include "opengemini/impl/util/Preprocessor.hpp"

namespace opengemini::impl::http {

OPENGEMINI_INLINE_SPECIFIER
HttpClient::HttpClient(boost::asio::io_context& ctx,
std::chrono::milliseconds connectTimeout,
std::chrono::milliseconds readWriteTimeout) :

IHttpClient(ctx, connectTimeout, readWriteTimeout),
pool_(ctx, connectTimeout)
{ }

OPENGEMINI_INLINE_SPECIFIER
Response HttpClient::SendRequest(const Endpoint& endpoint,
Request request,
boost::asio::yield_context yield)
{
namespace beast = boost::beast;
namespace http = boost::beast::http;

Response response;
beast::flat_buffer buffer;
beast::error_code error;

for (;; error.clear()) {
auto connection = pool_.Retrieve(endpoint, yield);
auto& stream = connection->stream;

stream.expires_after(readWriteTimeout_);
http::async_write(stream, request, yield[error]);
if (connection->ShouldRetry(error, "Write to stream failed.")) {
continue;
}

buffer.clear();
http::async_read(stream, buffer, response, yield[error]);
if (connection->ShouldRetry(error, "Read from stream failed.")) {
continue;
}

if (response.keep_alive()) {
pool_.Push(endpoint, std::move(connection));
break;
}

std::ignore = stream.socket().shutdown(
boost::asio::ip::tcp::socket::shutdown_both,
error);
if (error && error != beast::errc::not_connected) {
throw Exception(error, "Shutdown stream failed.");
}
break;
}

return response;
}

OPENGEMINI_INLINE_SPECIFIER
HttpClient::Pool::Pool(boost::asio::io_context& ctx,
std::chrono::milliseconds connectTimeout) :
ConnectionPool(ctx, connectTimeout)
{ }

OPENGEMINI_INLINE_SPECIFIER
HttpClient::Pool::ConnectionPtr
HttpClient::Pool::CreateConnection(const Endpoint& endpoint,
boost::asio::yield_context yield)
{
boost::asio::ip::tcp::resolver resolver(ctx_);
boost::beast::error_code error;
auto results = resolver.async_resolve(endpoint.host,
std::to_string(endpoint.port),
yield[error]);
if (error) { throw Exception(error, "Resolve hostname failed."); }

auto connection =
std::make_unique<ConnectionPtr::element_type>(Stream{ ctx_ }, false);
auto& stream = connection->stream;
stream.expires_after(connectTimeout_);
stream.async_connect(results, yield[error]);
if (error) { throw Exception(error, "Connect to server failed."); }

return connection;
}

} // namespace opengemini::impl::http
60 changes: 60 additions & 0 deletions include/opengemini/impl/http/HttpClient.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
//
// Copyright 2024 Huawei Cloud Computing Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

#ifndef OPENGEMINI_IMPL_HTTP_HTTPCLIENT_HPP
#define OPENGEMINI_IMPL_HTTP_HTTPCLIENT_HPP

#include "opengemini/impl/http/ConnectionPool.hpp"
#include "opengemini/impl/http/IHttpClient.hpp"

namespace opengemini::impl::http {

class HttpClient : public IHttpClient {
public:
explicit HttpClient(boost::asio::io_context& ctx,
std::chrono::milliseconds connectTimeout,
std::chrono::milliseconds readWriteTimeout);
~HttpClient() = default;

private:
class Pool : public ConnectionPool<Pool, boost::beast::tcp_stream> {
friend class ConnectionPool<Pool, Stream>;

public:
Pool(boost::asio::io_context& ctx,
std::chrono::milliseconds connectTimeout);

private:
ConnectionPtr CreateConnection(const Endpoint& endpoint,
boost::asio::yield_context yield);
};

private:
Response SendRequest(const Endpoint& endpoint,
Request request,
boost::asio::yield_context yield) override;

private:
Pool pool_;
};

} // namespace opengemini::impl::http

#ifndef OPENGEMINI_SEPARATE_COMPILATION
# include "opengemini/impl/http/HttpClient.cpp"
#endif // !OPENGEMINI_SEPARATE_COMPILATION

#endif // !OPENGEMINI_IMPL_HTTP_HTTPCLIENT_HPP
Loading

0 comments on commit ea1acbe

Please sign in to comment.