Skip to content

Commit

Permalink
Merge pull request #256 from project-tsurugi/wip/i_1113
Browse files Browse the repository at this point in the history
add ipc_lob_test
  • Loading branch information
t-horikawa authored Jan 27, 2025
2 parents 76e4457 + 2d3a04c commit ec5f99a
Show file tree
Hide file tree
Showing 5 changed files with 264 additions and 30 deletions.
10 changes: 5 additions & 5 deletions src/tateyama/endpoint/common/endpoint_proto_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,13 @@ inline bool append_response_header(std::stringstream& ss, std::string_view body,
hdr.set_payload_type(type);
if(input.blobs_ && type == ::tateyama::proto::framework::response::Header::SERVICE_RESULT) {
if (!(input.blobs_)->empty()) {
auto* blobs = hdr.mutable_blobs();
auto* mutable_blobs = hdr.mutable_blobs();
for(auto&& e: *input.blobs_) {
auto* blob = blobs->add_blobs();
auto* mutable_blob = mutable_blobs->add_blobs();
auto cn = e->channel_name();
blob->set_channel_name(cn.data(), cn.length());
blob->set_path((e->path()).string());
blob->set_temporary(e->is_temporary());
mutable_blob->set_channel_name(cn.data(), cn.length());
mutable_blob->set_path((e->path()).string());
mutable_blob->set_temporary(e->is_temporary());
}
}
}
Expand Down
25 changes: 22 additions & 3 deletions test/tateyama/endpoint/header_utils.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 Project Tsurugi.
* Copyright 2018-2025 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -14,24 +14,43 @@
* limitations under the License.
*/
#pragma once

#include <sstream>
#include <set>
#include <string_view>
#include <string>

#include <tateyama/status.h>
#include <tateyama/api/server/request.h>
#include <tateyama/api/server/response.h>
#include <tateyama/endpoint/common/endpoint_proto_utils.h>

// #include <gtest/gtest.h>
#include <tateyama/proto/framework/common.pb.h>
#include <tateyama/proto/framework/request.pb.h>

namespace tateyama::endpoint {

struct request_header_content {
std::size_t session_id_{};
std::size_t service_id_{};
std::set<std::tuple<std::string, std::string, bool>>* blobs_{};
};

inline bool append_request_header(std::stringstream& ss, std::string_view body, request_header_content input) {
::tateyama::proto::framework::request::Header hdr{};
hdr.set_session_id(input.session_id_);
hdr.set_service_id(input.service_id_);
if (input.blobs_) {
if (!(input.blobs_)->empty()) {
::tateyama::proto::framework::common::RepeatedBlobInfo blobs{};
auto* mutable_blobs = hdr.mutable_blobs();
for (auto&& e: *input.blobs_) {
auto* mutable_blob = mutable_blobs->add_blobs();
mutable_blob->set_channel_name(std::get<0>(e));
mutable_blob->set_path(std::get<1>(e));
mutable_blob->set_temporary(std::get<2>(e));
}
}
}
if(auto res = utils::SerializeDelimitedToOstream(hdr, std::addressof(ss)); ! res) {
return false;
}
Expand Down
37 changes: 17 additions & 20 deletions test/tateyama/endpoint/ipc/ipc_client.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2024 Project Tsurugi.
* Copyright 2018-2025 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -55,25 +55,22 @@ void ipc_client::send(const std::size_t tag, const std::string &message, std::si
ipc_test_index + index_offset);
}

/*
* see parse_header() in tateyama/endpoint/common/endpoint_proto_utils.h
*/
struct parse_response_result {
std::size_t session_id_ { };
tateyama::proto::framework::response::Header::PayloadType payload_type_{};
std::string_view payload_ { };
};

bool parse_response_header(std::string_view input, parse_response_result &result) {
result = { };
::tateyama::proto::framework::response::Header hdr { };
void ipc_client::send(const std::size_t tag, const std::string &message, std::set<std::tuple<std::string, std::string, bool>>& blobs, std::size_t index_offset) {
request_header_content hdr { session_id_, tag, &blobs };
std::stringstream ss { };
append_request_header(ss, message, hdr);
auto request_message = ss.str();
request_wire_->write(reinterpret_cast<const signed char*>(request_message.data()),
request_message.length(),
ipc_test_index + index_offset);
}

bool parse_response_header(std::string_view input, tateyama::proto::framework::response::Header& hdr, std::string_view &payload) {
google::protobuf::io::ArrayInputStream in { input.data(), static_cast<int>(input.size()) };
if (auto res = utils::ParseDelimitedFromZeroCopyStream(std::addressof(hdr), std::addressof(in), nullptr); !res) {
return false;
}
result.session_id_ = hdr.session_id();
result.payload_type_ = hdr.payload_type();
return utils::GetDelimitedBodyFromZeroCopyStream(std::addressof(in), nullptr, result.payload_);
return utils::GetDelimitedBodyFromZeroCopyStream(std::addressof(in), nullptr, payload);
}

void ipc_client::receive(std::string &message) {
Expand Down Expand Up @@ -110,12 +107,12 @@ void ipc_client::receive(std::string &message, tateyama::proto::framework::respo
r_msg.resize(header.get_length());
response_wire_->read(reinterpret_cast<signed char*>(r_msg.data()));
//
parse_response_result result;
parse_response_header(r_msg, result);
std::string_view payload{};
parse_response_header(r_msg, hdr_, payload);
// ASSERT_TRUE(parse_response_header(r_msg, result));
// EXPECT_EQ(session_id_, result.session_id_);
message = result.payload_;
type = result.payload_type_;
message = payload;
type = hdr_.payload_type();
}

resultset_wires_container* ipc_client::create_resultset_wires() {
Expand Down
10 changes: 8 additions & 2 deletions test/tateyama/endpoint/ipc/ipc_client.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 Project Tsurugi.
* Copyright 2018-2025 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,6 +15,8 @@
*/
#pragma once

#include <set>

#include "ipc_test_utils.h"

// FIXME: temporary header handling - start
Expand All @@ -41,6 +43,7 @@ class ipc_client {
~ipc_client() { disconnect(); }

void send(const std::size_t tag, const std::string &message, std::size_t index_offset = 0);
void send(const std::size_t tag, const std::string &message, std::set<std::tuple<std::string, std::string, bool>>& blobs, std::size_t index_offset = 0);
void receive(std::string &message);
void receive(std::string &message, tateyama::proto::framework::response::Header::PayloadType& type);
void disconnect() {
Expand All @@ -51,14 +54,16 @@ class ipc_client {
}
resultset_wires_container* create_resultset_wires();
void dispose_resultset_wires(resultset_wires_container *rwc);

std::size_t session_id() const noexcept {
return session_id_;
}

std::string session_name() const noexcept {
return session_name_;
}
tateyama::proto::framework::response::Header& framework_response_header() {
return hdr_;
}

// tateyama/src/tateyama/endpoint/ipc/bootstrap/server_wires_impl.h
// - private server_wire_container_impl::resultset_buffer_size = 64KB
Expand All @@ -79,6 +84,7 @@ class ipc_client {
tsubakuro::common::wire::session_wire_container::wire_container *request_wire_ { };
tsubakuro::common::wire::session_wire_container::response_wire_container *response_wire_ { };
tateyama::proto::endpoint::request::Handshake default_endpoint_handshake_{ };
tateyama::proto::framework::response::Header hdr_ { };
bool disconnected_{ };

void handshake();
Expand Down
212 changes: 212 additions & 0 deletions test/tateyama/endpoint/ipc/ipc_lob_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
/*
* Copyright 2018-2025 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "tateyama/endpoint/ipc/bootstrap/ipc_worker.h"
#include "tateyama/endpoint/header_utils.h"
#include <tateyama/proto/endpoint/request.pb.h>
#include "tateyama/status/resource/database_info_impl.h"
#include "ipc_client.h"

#include <gtest/gtest.h>

namespace tateyama::server {
class ipc_listener_for_test {
public:
static void run(tateyama::endpoint::ipc::bootstrap::ipc_worker& worker) {
worker.invoke([&]{
worker.run();
worker.delete_hook();
});
}
static void wait(tateyama::endpoint::ipc::bootstrap::ipc_worker& worker) {
while (!worker.is_terminated());
}
};
} // namespace tateyama::server

namespace tateyama::endpoint::ipc {

class blob_info_for_test : public tateyama::api::server::blob_info {
public:
blob_info_for_test(std::string_view channel_name, std::filesystem::path path, bool temporary)
: channel_name_(channel_name), path_(std::move(path)), temporary_(temporary) {
}
[[nodiscard]] std::string_view channel_name() const noexcept override {
return channel_name_;
}
[[nodiscard]] std::filesystem::path path() const noexcept override {
return path_;
}
[[nodiscard]] bool is_temporary() const noexcept override {
return temporary_;
}
void dispose() override {
}
private:
const std::string channel_name_{};
const std::filesystem::path path_{};
const bool temporary_{};
};

static constexpr std::size_t my_session_id = 123;

static constexpr std::string_view database_name = "ipc_lob_test";
static constexpr std::string_view label = "label_fot_test";
static constexpr std::string_view application_name = "application_name_fot_test";
static constexpr std::size_t datachannel_buffer_size = 64 * 1024;
static constexpr tateyama::common::wire::message_header::index_type index_ = 1;
static constexpr std::string_view response_test_message = "opqrstuvwxyz";
static constexpr std::string_view request_test_message = "abcdefgh";
static constexpr std::size_t service_id_of_lob_service = 102;

class lob_service : public tateyama::framework::routing_service {
public:
bool setup(tateyama::framework::environment&) { return true; }
bool start(tateyama::framework::environment&) { return true; }
bool shutdown(tateyama::framework::environment&) { return true; }
std::string_view label() const noexcept { return __func__; }

id_type id() const noexcept { return service_id_of_lob_service; }
bool operator ()(std::shared_ptr<tateyama::api::server::request> req,
std::shared_ptr<tateyama::api::server::response> res) override {
req_ = req;
for (auto&& e: blobs_) {
res->add_blob(std::make_unique<blob_info_for_test>(std::get<0>(e), std::get<1>(e), std::get<2>(e)));
}
res->body(response_test_message);
return true;
}

tateyama::api::server::request* request() {
return req_.get();
}

void push_blob(const std::string& name, const std::string& path, const bool temporary) {
blobs_.emplace(name, std::filesystem::path(path), temporary);
}

private:
std::shared_ptr<tateyama::api::server::request> req_{};
std::set<std::tuple<std::string, std::filesystem::path, bool>> blobs_{};
};

class ipc_lob_test : public ::testing::Test {
virtual void SetUp() {
auto rv = system("if [ -f /dev/shm/ipc_lob_test ]; then rm -f /dev/shm/ipc_lob_test; fi");
std::this_thread::sleep_for(std::chrono::milliseconds(500));

// server part
std::string session_name{database_name};
session_name += "-";
session_name += std::to_string(my_session_id);
wire_ = std::make_shared<bootstrap::server_wire_container_impl>(session_name, "dummy_mutex_file_name", datachannel_buffer_size, 16);
const tateyama::endpoint::common::worker_common::configuration conf(tateyama::endpoint::common::worker_common::connection_type::ipc);
worker_ = std::make_unique<tateyama::endpoint::ipc::bootstrap::ipc_worker>(service_, conf, my_session_id, wire_, database_info_);
tateyama::server::ipc_listener_for_test::run(*worker_);

// client part
tateyama::proto::endpoint::request::ClientInformation cci{};
cci.set_connection_label(std::string(label));
cci.set_application_name(std::string(application_name));
tateyama::proto::endpoint::request::Credential cred{};
// FIXME handle userName when a credential specification is fixed.
cci.set_allocated_credential(&cred);
tateyama::proto::endpoint::request::Handshake hs{};
hs.set_allocated_client_information(&cci);
client_ = std::make_unique<ipc_client>(database_name, my_session_id, hs);
cci.release_credential();
hs.release_client_information();
}

virtual void TearDown() {
worker_->terminate(tateyama::session::shutdown_request_type::forceful);
tateyama::server::ipc_listener_for_test::wait(*worker_);

auto rv = system("if [ -f /dev/shm/ipc_lob_test ]; then rm -f /dev/shm/ipc_lob_test; fi");
blobs_.clear();
}

public:
tateyama::status_info::resource::database_info_impl database_info_{database_name};
std::set<std::tuple<std::string, std::string, bool>> blobs_{};

protected:
std::shared_ptr<bootstrap::server_wire_container_impl> wire_{};
std::unique_ptr<tateyama::endpoint::ipc::bootstrap::ipc_worker> worker_{};
std::unique_ptr<ipc_client> client_{};
lob_service service_{};
};


TEST_F(ipc_lob_test, receive) {
blobs_.emplace("BlobChannel-123-456", "/tmp/BlobFile", false);
blobs_.emplace("ClobChannel-123-789", "/tmp/ClobFile", true);

// we do not care service_id nor request message here
client_->send(service_id_of_lob_service, std::string(request_test_message), blobs_);
std::string res{};
client_->receive(res);

// server part
auto* request = service_.request();

// test for blob_info
// blob
EXPECT_TRUE(request->has_blob("BlobChannel-123-456"));
auto& blob = request->get_blob("BlobChannel-123-456");
EXPECT_EQ(blob.channel_name(), "BlobChannel-123-456");
EXPECT_EQ(blob.path().string(), "/tmp/BlobFile");
EXPECT_EQ(blob.is_temporary(), false);

// clob
EXPECT_TRUE(request->has_blob("ClobChannel-123-789"));
auto& clob = request->get_blob("ClobChannel-123-789");
EXPECT_EQ(clob.channel_name(), "ClobChannel-123-789");
EXPECT_EQ(clob.path().string(), "/tmp/ClobFile");
EXPECT_EQ(clob.is_temporary(),true);

// blob, clob that does not exist
EXPECT_FALSE(request->has_blob("BlobChannel-987-654"));
EXPECT_THROW(auto& blob_not_find = request->get_blob("BlobChannel-987-654"), std::runtime_error);
EXPECT_FALSE(request->has_blob("ClobChannel-654-321"));
EXPECT_THROW(auto& clob_not_find = request->get_blob("ClobChannel-654-321"), std::runtime_error);
}

TEST_F(ipc_lob_test, send) {
service_.push_blob("BlobChannel-123-456", "/tmp/BlobFile", false);
service_.push_blob("ClobChannel-123-789", "/tmp/ClobFile", true);

// we do not care service_id nor request message here
client_->send(service_id_of_lob_service, std::string(request_test_message), blobs_);
std::string res{};
client_->receive(res);

::tateyama::proto::framework::response::Header& header = client_->framework_response_header();
ASSERT_TRUE(header.has_blobs());
for (auto&& e: header.blobs().blobs()) {
if (e.channel_name().compare("BlobChannel-123-456") == 0) {
EXPECT_EQ(e.channel_name(), "BlobChannel-123-456");
EXPECT_EQ(e.path(), "/tmp/BlobFile");
EXPECT_EQ(e.temporary(), false);
} else if (e.channel_name().compare("ClobChannel-123-789") == 0) {
EXPECT_EQ(e.channel_name(), "ClobChannel-123-789");
EXPECT_EQ(e.path(), "/tmp/ClobFile");
EXPECT_EQ(e.temporary(), true);
}
}
}

} // namespace tateyama::endpoint::ipc

0 comments on commit ec5f99a

Please sign in to comment.