Skip to content

Commit

Permalink
Add a C++ client concurrent test/example (#5490)
Browse files Browse the repository at this point in the history
* Replicate failing python unit test in C++.

* Removed unused code from server.cc

* undo change.

* Tweak

* shared_ptr

* Review comments.

* Followup review comments.
  • Loading branch information
jcferretti authored May 15, 2024
1 parent a1371c0 commit 3b97aaf
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 35 deletions.
35 changes: 0 additions & 35 deletions cpp-client/deephaven/dhclient/src/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
#include <grpc/support/log.h>
#include <arrow/flight/client_auth.h>
#include <arrow/flight/client.h>
#include <arrow/flight/client_middleware.h>
#include <arrow/flight/types.h>
#include <arrow/array.h>
#include <arrow/array/array_primitive.h>
Expand All @@ -20,46 +19,12 @@
#include "deephaven/third_party/fmt/format.h"

using arrow::flight::FlightClient;
using deephaven::client::impl::MoveVectorData;
using deephaven::dhcore::utility::Bit_cast;
using deephaven::dhcore::utility::GetWhat;
using io::deephaven::proto::backplane::grpc::AddTableRequest;
using io::deephaven::proto::backplane::grpc::AddTableResponse;
using io::deephaven::proto::backplane::grpc::AjRajTablesRequest;
using io::deephaven::proto::backplane::grpc::AuthenticationConstantsRequest;
using io::deephaven::proto::backplane::grpc::ConfigurationConstantsRequest;
using io::deephaven::proto::backplane::grpc::ConfigurationConstantsResponse;
using io::deephaven::proto::backplane::grpc::ConfigService;
using io::deephaven::proto::backplane::grpc::CreateInputTableRequest;
using io::deephaven::proto::backplane::grpc::CrossJoinTablesRequest;
using io::deephaven::proto::backplane::grpc::DeleteTableRequest;
using io::deephaven::proto::backplane::grpc::DeleteTableResponse;
using io::deephaven::proto::backplane::grpc::DropColumnsRequest;
using io::deephaven::proto::backplane::grpc::EmptyTableRequest;
using io::deephaven::proto::backplane::grpc::ExactJoinTablesRequest;
using io::deephaven::proto::backplane::grpc::ExportedTableCreationResponse;
using io::deephaven::proto::backplane::grpc::FetchTableRequest;
using io::deephaven::proto::backplane::grpc::HandshakeRequest;
using io::deephaven::proto::backplane::grpc::HeadOrTailRequest;
using io::deephaven::proto::backplane::grpc::HeadOrTailByRequest;
using io::deephaven::proto::backplane::grpc::LeftJoinTablesRequest;
using io::deephaven::proto::backplane::grpc::MergeTablesRequest;
using io::deephaven::proto::backplane::grpc::NaturalJoinTablesRequest;
using io::deephaven::proto::backplane::grpc::ReleaseRequest;
using io::deephaven::proto::backplane::grpc::ReleaseResponse;
using io::deephaven::proto::backplane::grpc::SelectDistinctRequest;
using io::deephaven::proto::backplane::grpc::SelectOrUpdateRequest;
using io::deephaven::proto::backplane::grpc::SortTableRequest;
using io::deephaven::proto::backplane::grpc::Ticket;
using io::deephaven::proto::backplane::grpc::TimeTableRequest;
using io::deephaven::proto::backplane::grpc::WhereInRequest;
using io::deephaven::proto::backplane::grpc::UpdateByRequest;
using io::deephaven::proto::backplane::grpc::UnstructuredFilterTableRequest;
using io::deephaven::proto::backplane::grpc::UngroupRequest;
using io::deephaven::proto::backplane::script::grpc::BindTableToVariableRequest;
using io::deephaven::proto::backplane::script::grpc::ExecuteCommandRequest;
using io::deephaven::proto::backplane::script::grpc::ExecuteCommandResponse;
using io::deephaven::proto::backplane::script::grpc::StartConsoleRequest;

using UpdateByOperation = io::deephaven::proto::backplane::grpc::UpdateByRequest::UpdateByOperation;

Expand Down
1 change: 1 addition & 0 deletions cpp-client/deephaven/examples/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
add_subdirectory(table_cleanup)
add_subdirectory(concurrent_client)
add_subdirectory(create_table_with_arrow_flight)
add_subdirectory(create_table_with_table_maker)
add_subdirectory(demos)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
project(hello_world)

set(CMAKE_CXX_STANDARD 17)

add_executable(concurrent_session main.cc)

target_link_libraries(concurrent_session deephaven::client)
93 changes: 93 additions & 0 deletions cpp-client/deephaven/examples/concurrent_client/main.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
*/
#include <exception>
#include <iostream>
#include <iomanip>
#include <memory>
#include <cstdlib>
#include <thread>
#include "deephaven/client/client.h"
#include "deephaven/third_party/fmt/format.h"
#include "deephaven/third_party/fmt/ostream.h"

using deephaven::client::Client;
using deephaven::client::TableHandleManager;

void Usage(const char *const argv0, const int exit_status) {
std::cerr << "Usage: " << argv0 << "[numthreads [iterations [host:port]]]" << '\n';
std::exit(exit_status);
}

void ThreadFun(std::shared_ptr<Client> clientp, const std::size_t ti) {
auto manager = clientp->GetManager();
using namespace std::chrono_literals;
std::cout << "THREAD START " << ti << std::endl << std::flush;
const std::string t1_name = std::string("import deephaven; t1_") + std::to_string(ti);
while (true) {
manager.RunScript(
t1_name + "= deephaven.time_table(\"PT1S\")");
std::this_thread::sleep_for(2000ms);
auto t1 = manager.FetchTable(std::string("t1_") + std::to_string(ti));
}
std::cout << "THREAD END " << ti << std::endl << std::flush;
}

void Run(std::shared_ptr<Client> clientp, const std::size_t nthreads) {
std::vector<std::thread> threads;
for (std::size_t i = 0; i < nthreads; ++i) {
threads.emplace_back(&ThreadFun, clientp, i);
}

for (std::size_t i = 0; i < nthreads; ++i) {
threads[i].join();
}
}

std::size_t get_size_arg(
const char *const description,
const std::size_t defaultval,
const int argc,
const char *const argv[],
int &c) {
if (argc <= c) {
return defaultval;
}

++c;
try {
const std::size_t val = static_cast<std::size_t>(std::stoi(argv[c]));
if (val <= 0) {
throw std::invalid_argument("<= 0");
}
return val;
} catch (const std::invalid_argument &e) {
fmt::println(std::cerr, "{}: can't convert {} argument #{} '{}' to a positive intenger [{}], aborting.",
argv[0], description, c, argv[c], e.what());
std::exit(1);
}
}

int main(int argc, char *argv[]) {
int c = 1;
if (argc > 1 && std::strcmp("-h", argv[1]) == 0) {
Usage(argv[0], 0);
}
std::size_t nthreads = get_size_arg("number of threads", 200, argc, argv, c);
std::size_t iterations = get_size_arg("number of iterations", 600, argc, argv, c);
const char *endpoint = "localhost:10000";
if (argc > c) {
endpoint = argv[c];
}
++c;
if (argc > c) {
Usage(argv[0], 1);
}
try {
auto client = Client::Connect(endpoint);
Run(std::make_shared<Client>(std::move(client)), nthreads);
} catch (const std::exception &e) {
std::cerr << "Caught exception: " << e.what() << '\n';
}
return 0;
}

0 comments on commit 3b97aaf

Please sign in to comment.