From 9e08c503f97fe3848039c80ec07e324db7dfbadf Mon Sep 17 00:00:00 2001 From: Tobias Zagorni Date: Tue, 5 Apr 2022 10:38:57 -0400 Subject: [PATCH] ARROW-16032: [C++] Migrate FlightClient API to Result<> Closes #12719 from zagto/flight-api-result-client Authored-by: Tobias Zagorni Signed-off-by: David Li --- c_glib/arrow-flight-glib/client.cpp | 19 +- cpp/examples/arrow/flight_sql_example.cc | 2 +- cpp/src/arrow/flight/client.cc | 145 ++++++++--- cpp/src/arrow/flight/client.h | 130 +++++++--- cpp/src/arrow/flight/flight_benchmark.cc | 24 +- cpp/src/arrow/flight/flight_test.cc | 242 +++++++----------- .../integration_tests/test_integration.cc | 10 +- .../test_integration_client.cc | 17 +- cpp/src/arrow/flight/sql/client.h | 16 +- cpp/src/arrow/flight/sql/server_test.cc | 3 +- cpp/src/arrow/flight/sql/test_app_cli.cc | 3 +- cpp/src/arrow/flight/test_definitions.cc | 242 +++++++++--------- cpp/src/arrow/flight/test_util.h | 2 +- python/pyarrow/_flight.pyx | 41 ++- python/pyarrow/includes/libarrow_flight.pxd | 44 ++-- 15 files changed, 496 insertions(+), 444 deletions(-) diff --git a/c_glib/arrow-flight-glib/client.cpp b/c_glib/arrow-flight-glib/client.cpp index b4de6468c6514..0d1961e6c6222 100644 --- a/c_glib/arrow-flight-glib/client.cpp +++ b/c_glib/arrow-flight-glib/client.cpp @@ -251,12 +251,11 @@ gaflight_client_new(GAFlightLocation *location, arrow::Status status; if (options) { const auto flight_options = gaflight_client_options_get_raw(options); - status = arrow::flight::FlightClient::Connect(*flight_location, - *flight_options, - &flight_client); + auto result = arrow::flight::FlightClient::Connect(*flight_location, *flight_options); + status = std::move(result).Value(&flight_client); } else { - status = arrow::flight::FlightClient::Connect(*flight_location, - &flight_client); + auto result = arrow::flight::FlightClient::Connect(*flight_location); + status = std::move(result).Value(&flight_client); } if (garrow::check(error, status, "[flight-client][new]")) { return gaflight_client_new_raw(flight_client.release()); @@ -315,9 +314,8 @@ gaflight_client_list_flights(GAFlightClient *client, flight_options = gaflight_call_options_get_raw(options); } std::unique_ptr flight_listing; - auto status = flight_client->ListFlights(*flight_options, - *flight_criteria, - &flight_listing); + auto result = flight_client->ListFlights(*flight_options, *flight_criteria); + auto status = std::move(result).Value(&flight_listing); if (!garrow::check(error, status, "[flight-client][list-flights]")) { @@ -369,9 +367,8 @@ gaflight_client_do_get(GAFlightClient *client, flight_options = gaflight_call_options_get_raw(options); } std::unique_ptr flight_reader; - auto status = flight_client->DoGet(*flight_options, - *flight_ticket, - &flight_reader); + auto result = flight_client->DoGet(*flight_options, *flight_ticket); + auto status = std::move(result).Value(&flight_reader); if (garrow::check(error, status, "[flight-client][do-get]")) { diff --git a/cpp/examples/arrow/flight_sql_example.cc b/cpp/examples/arrow/flight_sql_example.cc index 5dfd97dbf1c81..1201d78c5eb74 100644 --- a/cpp/examples/arrow/flight_sql_example.cc +++ b/cpp/examples/arrow/flight_sql_example.cc @@ -44,7 +44,7 @@ arrow::Status Main() { // Set up the Flight SQL client std::unique_ptr flight_client; - ARROW_RETURN_NOT_OK(flight::FlightClient::Connect(location, &flight_client)); + ARROW_ASSIGN_OR_RAISE(flight_client, flight::FlightClient::Connect(location)); std::unique_ptr client( new flightsql::FlightSqlClient(std::move(flight_client))); diff --git a/cpp/src/arrow/flight/client.cc b/cpp/src/arrow/flight/client.cc index 160387b1663a8..daec8e3a9bb8e 100644 --- a/cpp/src/arrow/flight/client.cc +++ b/cpp/src/arrow/flight/client.cc @@ -33,6 +33,7 @@ #include "arrow/status.h" #include "arrow/table.h" #include "arrow/util/logging.h" +#include "arrow/util/make_unique.h" #include "arrow/flight/client_auth.h" #include "arrow/flight/serialization_internal.h" @@ -489,21 +490,32 @@ FlightClient::~FlightClient() { } } +arrow::Result> FlightClient::Connect( + const Location& location) { + return Connect(location, FlightClientOptions::Defaults()); +} + Status FlightClient::Connect(const Location& location, std::unique_ptr* client) { - return Connect(location, FlightClientOptions::Defaults(), client); + return Connect(location, FlightClientOptions::Defaults()).Value(client); } -Status FlightClient::Connect(const Location& location, const FlightClientOptions& options, - std::unique_ptr* client) { +arrow::Result> FlightClient::Connect( + const Location& location, const FlightClientOptions& options) { flight::transport::grpc::InitializeFlightGrpcClient(); - client->reset(new FlightClient); - (*client)->write_size_limit_bytes_ = options.write_size_limit_bytes; + std::unique_ptr client(new FlightClient()); + client->write_size_limit_bytes_ = options.write_size_limit_bytes; const auto scheme = location.scheme(); - ARROW_ASSIGN_OR_RAISE((*client)->transport_, + ARROW_ASSIGN_OR_RAISE(client->transport_, internal::GetDefaultTransportRegistry()->MakeClient(scheme)); - return (*client)->transport_->Init(options, location, *location.uri_); + RETURN_NOT_OK(client->transport_->Init(options, location, *location.uri_)); + return client; +} + +Status FlightClient::Connect(const Location& location, const FlightClientOptions& options, + std::unique_ptr* client) { + return Connect(location, options).Value(client); } Status FlightClient::Authenticate(const FlightCallOptions& options, @@ -519,23 +531,44 @@ arrow::Result> FlightClient::AuthenticateBas return transport_->AuthenticateBasicToken(options, username, password); } +arrow::Result> FlightClient::DoAction( + const FlightCallOptions& options, const Action& action) { + std::unique_ptr results; + RETURN_NOT_OK(CheckOpen()); + RETURN_NOT_OK(transport_->DoAction(options, action, &results)); + return results; +} + Status FlightClient::DoAction(const FlightCallOptions& options, const Action& action, std::unique_ptr* results) { + return DoAction(options, action).Value(results); +} + +arrow::Result> FlightClient::ListActions( + const FlightCallOptions& options) { + std::vector actions; RETURN_NOT_OK(CheckOpen()); - return transport_->DoAction(options, action, results); + RETURN_NOT_OK(transport_->ListActions(options, &actions)); + return actions; } Status FlightClient::ListActions(const FlightCallOptions& options, std::vector* actions) { + return ListActions(options).Value(actions); +} + +arrow::Result> FlightClient::GetFlightInfo( + const FlightCallOptions& options, const FlightDescriptor& descriptor) { + std::unique_ptr info; RETURN_NOT_OK(CheckOpen()); - return transport_->ListActions(options, actions); + RETURN_NOT_OK(transport_->GetFlightInfo(options, descriptor, &info)); + return info; } Status FlightClient::GetFlightInfo(const FlightCallOptions& options, const FlightDescriptor& descriptor, std::unique_ptr* info) { - RETURN_NOT_OK(CheckOpen()); - return transport_->GetFlightInfo(options, descriptor, info); + return GetFlightInfo(options, descriptor).Value(info); } arrow::Result> FlightClient::GetSchema( @@ -550,63 +583,97 @@ Status FlightClient::GetSchema(const FlightCallOptions& options, return GetSchema(options, descriptor).Value(schema_result); } +arrow::Result> FlightClient::ListFlights() { + return ListFlights({}, {}); +} + Status FlightClient::ListFlights(std::unique_ptr* listing) { + return ListFlights({}, {}).Value(listing); +} + +arrow::Result> FlightClient::ListFlights( + const FlightCallOptions& options, const Criteria& criteria) { + std::unique_ptr listing; RETURN_NOT_OK(CheckOpen()); - return ListFlights({}, {}, listing); + RETURN_NOT_OK(transport_->ListFlights(options, criteria, &listing)); + return listing; } Status FlightClient::ListFlights(const FlightCallOptions& options, const Criteria& criteria, std::unique_ptr* listing) { + return ListFlights(options, criteria).Value(listing); +} + +arrow::Result> FlightClient::DoGet( + const FlightCallOptions& options, const Ticket& ticket) { RETURN_NOT_OK(CheckOpen()); - return transport_->ListFlights(options, criteria, listing); + std::unique_ptr remote_stream; + RETURN_NOT_OK(transport_->DoGet(options, ticket, &remote_stream)); + auto stream_reader = arrow::internal::make_unique( + std::move(remote_stream), options.read_options, options.stop_token, + options.memory_manager); + // Eagerly read the schema + RETURN_NOT_OK(stream_reader->EnsureDataStarted()); + return stream_reader; } Status FlightClient::DoGet(const FlightCallOptions& options, const Ticket& ticket, std::unique_ptr* stream) { + return DoGet(options, ticket).Value(stream); +} + +arrow::Result FlightClient::DoPut( + const FlightCallOptions& options, const FlightDescriptor& descriptor, + const std::shared_ptr& schema) { RETURN_NOT_OK(CheckOpen()); std::unique_ptr remote_stream; - RETURN_NOT_OK(transport_->DoGet(options, ticket, &remote_stream)); - *stream = std::unique_ptr( - new ClientStreamReader(std::move(remote_stream), options.read_options, - options.stop_token, options.memory_manager)); - // Eagerly read the schema - return static_cast(stream->get())->EnsureDataStarted(); + RETURN_NOT_OK(transport_->DoPut(options, &remote_stream)); + std::shared_ptr shared_stream = std::move(remote_stream); + DoPutResult result; + result.reader = arrow::internal::make_unique(shared_stream); + result.writer = arrow::internal::make_unique( + std::move(shared_stream), options.write_options, write_size_limit_bytes_, + descriptor); + RETURN_NOT_OK(result.writer->Begin(schema, options.write_options)); + return result; } Status FlightClient::DoPut(const FlightCallOptions& options, const FlightDescriptor& descriptor, const std::shared_ptr& schema, - std::unique_ptr* stream, + std::unique_ptr* writer, std::unique_ptr* reader) { + ARROW_ASSIGN_OR_RAISE(auto result, DoPut(options, descriptor, schema)); + *writer = std::move(result.writer); + *reader = std::move(result.reader); + return Status::OK(); +} + +arrow::Result FlightClient::DoExchange( + const FlightCallOptions& options, const FlightDescriptor& descriptor) { RETURN_NOT_OK(CheckOpen()); std::unique_ptr remote_stream; - RETURN_NOT_OK(transport_->DoPut(options, &remote_stream)); + RETURN_NOT_OK(transport_->DoExchange(options, &remote_stream)); std::shared_ptr shared_stream = std::move(remote_stream); - *reader = - std::unique_ptr(new ClientMetadataReader(shared_stream)); - *stream = std::unique_ptr( - new ClientStreamWriter(std::move(shared_stream), options.write_options, - write_size_limit_bytes_, descriptor)); - RETURN_NOT_OK((*stream)->Begin(schema, options.write_options)); - return Status::OK(); + DoExchangeResult result; + result.reader = arrow::internal::make_unique( + shared_stream, options.read_options, options.stop_token, options.memory_manager); + auto stream_writer = arrow::internal::make_unique( + std::move(shared_stream), options.write_options, write_size_limit_bytes_, + descriptor); + RETURN_NOT_OK(stream_writer->Begin()); + result.writer = std::move(stream_writer); + return result; } Status FlightClient::DoExchange(const FlightCallOptions& options, const FlightDescriptor& descriptor, std::unique_ptr* writer, std::unique_ptr* reader) { - RETURN_NOT_OK(CheckOpen()); - std::unique_ptr remote_stream; - RETURN_NOT_OK(transport_->DoExchange(options, &remote_stream)); - std::shared_ptr shared_stream = std::move(remote_stream); - *reader = std::unique_ptr(new ClientStreamReader( - shared_stream, options.read_options, options.stop_token, options.memory_manager)); - auto stream_writer = std::unique_ptr( - new ClientStreamWriter(std::move(shared_stream), options.write_options, - write_size_limit_bytes_, descriptor)); - RETURN_NOT_OK(stream_writer->Begin()); - *writer = std::move(stream_writer); + ARROW_ASSIGN_OR_RAISE(auto result, DoExchange(options, descriptor)); + *writer = std::move(result.writer); + *reader = std::move(result.reader); return Status::OK(); } diff --git a/cpp/src/arrow/flight/client.h b/cpp/src/arrow/flight/client.h index 06d87bb9aebb3..0298abe366d0e 100644 --- a/cpp/src/arrow/flight/client.h +++ b/cpp/src/arrow/flight/client.h @@ -192,17 +192,22 @@ class ARROW_FLIGHT_EXPORT FlightClient { /// \brief Connect to an unauthenticated flight service /// \param[in] location the URI - /// \param[out] client the created FlightClient - /// \return Status OK status may not indicate that the connection was - /// successful + /// \return Arrow result with the created FlightClient, OK status may not indicate that + /// the connection was successful + static arrow::Result> Connect(const Location& location); + + ARROW_DEPRECATED("Deprecated in 8.0.0. Use Result-returning overload instead.") static Status Connect(const Location& location, std::unique_ptr* client); /// \brief Connect to an unauthenticated flight service /// \param[in] location the URI /// \param[in] options Other options for setting up the client - /// \param[out] client the created FlightClient - /// \return Status OK status may not indicate that the connection was - /// successful + /// \return Arrow result with the created FlightClient, OK status may not indicate that + /// the connection was successful + static arrow::Result> Connect( + const Location& location, const FlightClientOptions& options); + + ARROW_DEPRECATED("Deprecated in 8.0.0. Use Result-returning overload instead.") static Status Connect(const Location& location, const FlightClientOptions& options, std::unique_ptr* client); @@ -227,21 +232,34 @@ class ARROW_FLIGHT_EXPORT FlightClient { /// of results, if any /// \param[in] options Per-RPC options /// \param[in] action the action to be performed - /// \param[out] results an iterator object for reading the returned results - /// \return Status + /// \return Arrow result with an iterator object for reading the returned results + arrow::Result> DoAction(const FlightCallOptions& options, + const Action& action); + arrow::Result> DoAction(const Action& action) { + return DoAction({}, action); + } + + ARROW_DEPRECATED("Deprecated in 8.0.0. Use Result-returning overload instead.") Status DoAction(const FlightCallOptions& options, const Action& action, std::unique_ptr* results); + ARROW_DEPRECATED("Deprecated in 8.0.0. Use Result-returning overload instead.") Status DoAction(const Action& action, std::unique_ptr* results) { - return DoAction({}, action, results); + return DoAction({}, action).Value(results); } /// \brief Retrieve a list of available Action types /// \param[in] options Per-RPC options - /// \param[out] actions the available actions - /// \return Status + /// \return Arrow result with the available actions + arrow::Result> ListActions(const FlightCallOptions& options); + arrow::Result> ListActions() { + return ListActions(FlightCallOptions()); + } + + ARROW_DEPRECATED("Deprecated in 8.0.0. Use Result-returning overload instead.") Status ListActions(const FlightCallOptions& options, std::vector* actions); + ARROW_DEPRECATED("Deprecated in 8.0.0. Use Result-returning overload instead.") Status ListActions(std::vector* actions) { - return ListActions({}, actions); + return ListActions().Value(actions); } /// \brief Request access plan for a single flight, which may be an existing @@ -249,14 +267,22 @@ class ARROW_FLIGHT_EXPORT FlightClient { /// \param[in] options Per-RPC options /// \param[in] descriptor the dataset request, whether a named dataset or /// command - /// \param[out] info the FlightInfo describing where to access the dataset - /// \return Status + /// \return Arrow result with the FlightInfo describing where to access the dataset + arrow::Result> GetFlightInfo( + const FlightCallOptions& options, const FlightDescriptor& descriptor); + arrow::Result> GetFlightInfo( + const FlightDescriptor& descriptor) { + return GetFlightInfo({}, descriptor); + } + + ARROW_DEPRECATED("Deprecated in 8.0.0. Use Result-returning overload instead.") Status GetFlightInfo(const FlightCallOptions& options, const FlightDescriptor& descriptor, std::unique_ptr* info); + ARROW_DEPRECATED("Deprecated in 8.0.0. Use Result-returning overload instead.") Status GetFlightInfo(const FlightDescriptor& descriptor, std::unique_ptr* info) { - return GetFlightInfo({}, descriptor, info); + return GetFlightInfo({}, descriptor).Value(info); } /// \brief Request schema for a single flight, which may be an existing @@ -283,15 +309,20 @@ class ARROW_FLIGHT_EXPORT FlightClient { } /// \brief List all available flights known to the server - /// \param[out] listing an iterator that returns a FlightInfo for each flight - /// \return Status + /// \return Arrow result with an iterator that returns a FlightInfo for each flight + arrow::Result> ListFlights(); + + ARROW_DEPRECATED("Deprecated in 8.0.0. Use Result-returning overload instead.") Status ListFlights(std::unique_ptr* listing); /// \brief List available flights given indicated filter criteria /// \param[in] options Per-RPC options /// \param[in] criteria the filter criteria (opaque) - /// \param[out] listing an iterator that returns a FlightInfo for each flight - /// \return Status + /// \return Arrow result with an iterator that returns a FlightInfo for each flight + arrow::Result> ListFlights( + const FlightCallOptions& options, const Criteria& criteria); + + ARROW_DEPRECATED("Deprecated in 8.0.0. Use Result-returning overload instead.") Status ListFlights(const FlightCallOptions& options, const Criteria& criteria, std::unique_ptr* listing); @@ -299,14 +330,28 @@ class ARROW_FLIGHT_EXPORT FlightClient { /// stream. Returns record batch stream reader /// \param[in] options Per-RPC options /// \param[in] ticket The flight ticket to use - /// \param[out] stream the returned RecordBatchReader - /// \return Status + /// \return Arrow result with the returned RecordBatchReader + arrow::Result> DoGet( + const FlightCallOptions& options, const Ticket& ticket); + arrow::Result> DoGet(const Ticket& ticket) { + return DoGet({}, ticket); + } + + ARROW_DEPRECATED("Deprecated in 8.0.0. Use Result-returning overload instead.") Status DoGet(const FlightCallOptions& options, const Ticket& ticket, std::unique_ptr* stream); + ARROW_DEPRECATED("Deprecated in 8.0.0. Use Result-returning overload instead.") Status DoGet(const Ticket& ticket, std::unique_ptr* stream) { - return DoGet({}, ticket, stream); + return DoGet({}, ticket).Value(stream); } + /// \brief DoPut return value + struct DoPutResult { + /// \brief a writer to write record batches to + std::unique_ptr writer; + /// \brief a reader for application metadata from the server + std::unique_ptr reader; + }; /// \brief Upload data to a Flight described by the given /// descriptor. The caller must call Close() on the returned stream /// once they are done writing. @@ -318,26 +363,53 @@ class ARROW_FLIGHT_EXPORT FlightClient { /// \param[in] options Per-RPC options /// \param[in] descriptor the descriptor of the stream /// \param[in] schema the schema for the data to upload - /// \param[out] stream a writer to write record batches to - /// \param[out] reader a reader for application metadata from the server - /// \return Status + /// \return Arrow result with a DoPutResult struct holding a reader and a writer + arrow::Result DoPut(const FlightCallOptions& options, + const FlightDescriptor& descriptor, + const std::shared_ptr& schema); + + arrow::Result DoPut(const FlightDescriptor& descriptor, + const std::shared_ptr& schema) { + return DoPut({}, descriptor, schema); + } + + ARROW_DEPRECATED("Deprecated in 8.0.0. Use Result-returning overload instead.") Status DoPut(const FlightCallOptions& options, const FlightDescriptor& descriptor, const std::shared_ptr& schema, - std::unique_ptr* stream, + std::unique_ptr* writer, std::unique_ptr* reader); + ARROW_DEPRECATED("Deprecated in 8.0.0. Use Result-returning overload instead.") Status DoPut(const FlightDescriptor& descriptor, const std::shared_ptr& schema, - std::unique_ptr* stream, + std::unique_ptr* writer, std::unique_ptr* reader) { - return DoPut({}, descriptor, schema, stream, reader); + ARROW_ASSIGN_OR_RAISE(auto output, DoPut({}, descriptor, schema)); + *writer = std::move(output.writer); + *reader = std::move(output.reader); + return Status::OK(); } + struct DoExchangeResult { + std::unique_ptr writer; + std::unique_ptr reader; + }; + arrow::Result DoExchange(const FlightCallOptions& options, + const FlightDescriptor& descriptor); + arrow::Result DoExchange(const FlightDescriptor& descriptor) { + return DoExchange({}, descriptor); + } + + ARROW_DEPRECATED("Deprecated in 8.0.0. Use Result-returning overload instead.") Status DoExchange(const FlightCallOptions& options, const FlightDescriptor& descriptor, std::unique_ptr* writer, std::unique_ptr* reader); + ARROW_DEPRECATED("Deprecated in 8.0.0. Use Result-returning overload instead.") Status DoExchange(const FlightDescriptor& descriptor, std::unique_ptr* writer, std::unique_ptr* reader) { - return DoExchange({}, descriptor, writer, reader); + ARROW_ASSIGN_OR_RAISE(auto output, DoExchange({}, descriptor)); + *writer = std::move(output.writer); + *reader = std::move(output.reader); + return Status::OK(); } /// \brief Explicitly shut down and clean up the client. diff --git a/cpp/src/arrow/flight/flight_benchmark.cc b/cpp/src/arrow/flight/flight_benchmark.cc index aeb20407d75ea..872c67c80b786 100644 --- a/cpp/src/arrow/flight/flight_benchmark.cc +++ b/cpp/src/arrow/flight/flight_benchmark.cc @@ -123,8 +123,7 @@ struct PerformanceStats { Status WaitForReady(FlightClient* client, const FlightCallOptions& call_options) { Action action{"ping", nullptr}; for (int attempt = 0; attempt < 10; attempt++) { - std::unique_ptr stream; - if (client->DoAction(call_options, action, &stream).ok()) { + if (client->DoAction(call_options, action).ok()) { return Status::OK(); } std::this_thread::sleep_for(std::chrono::milliseconds(1000)); @@ -138,7 +137,7 @@ arrow::Result RunDoGetTest(FlightClient* client, const FlightEndpoint& endpoint, PerformanceStats* stats) { std::unique_ptr reader; - RETURN_NOT_OK(client->DoGet(call_options, endpoint.ticket, &reader)); + ARROW_ASSIGN_OR_RAISE(reader, client->DoGet(call_options, endpoint.ticket)); FlightStreamChunk batch; @@ -246,10 +245,10 @@ arrow::Result RunDoPutTest(FlightClient* client, StopWatch timer; int64_t num_records = 0; int64_t num_bytes = 0; - std::unique_ptr writer; - std::unique_ptr reader; - RETURN_NOT_OK(client->DoPut(call_options, FlightDescriptor{}, - batches[0].batch->schema(), &writer, &reader)); + ARROW_ASSIGN_OR_RAISE( + auto do_put_result, + client->DoPut(call_options, FlightDescriptor{}, batches[0].batch->schema())); + std::unique_ptr writer = std::move(do_put_result.writer); for (size_t i = 0; i < batches.size(); i++) { auto batch = batches[i]; auto is_last = i == (batches.size() - 1); @@ -283,8 +282,7 @@ Status DoSinglePerfRun(FlightClient* client, const FlightClientOptions client_op descriptor.type = FlightDescriptor::CMD; perf.SerializeToString(&descriptor.cmd); - std::unique_ptr plan; - RETURN_NOT_OK(client->GetFlightInfo(call_options, descriptor, &plan)); + ARROW_ASSIGN_OR_RAISE(auto plan, client->GetFlightInfo(call_options, descriptor)); // Read the streams in parallel ipc::DictionaryMemo dict_memo; @@ -300,8 +298,9 @@ Status DoSinglePerfRun(FlightClient* client, const FlightClientOptions client_op if (endpoint.locations.empty()) { data_client = client; } else { - RETURN_NOT_OK(FlightClient::Connect(endpoint.locations.front(), client_options, - &local_client)); + ARROW_ASSIGN_OR_RAISE( + local_client, + FlightClient::Connect(endpoint.locations.front(), client_options)); data_client = local_client.get(); } @@ -521,8 +520,7 @@ int main(int argc, char** argv) { #endif } - std::unique_ptr client; - ABORT_NOT_OK(arrow::flight::FlightClient::Connect(location, options, &client)); + auto client = arrow::flight::FlightClient::Connect(location, options).ValueOrDie(); ABORT_NOT_OK(arrow::flight::WaitForReady(client.get(), call_options)); arrow::Status s = arrow::flight::RunPerformanceTest(client.get(), options, call_options, diff --git a/cpp/src/arrow/flight/flight_test.cc b/cpp/src/arrow/flight/flight_test.cc index b30a91268ebf7..3f0ed7114fa16 100644 --- a/cpp/src/arrow/flight/flight_test.cc +++ b/cpp/src/arrow/flight/flight_test.cc @@ -124,9 +124,9 @@ TEST(TestFlight, ConnectUri) { std::unique_ptr client; ASSERT_OK_AND_ASSIGN(auto location1, Location::Parse(uri)); ASSERT_OK_AND_ASSIGN(auto location2, Location::Parse(uri)); - ASSERT_OK(FlightClient::Connect(location1, &client)); + ASSERT_OK_AND_ASSIGN(client, FlightClient::Connect(location1)); ASSERT_OK(client->Close()); - ASSERT_OK(FlightClient::Connect(location2, &client)); + ASSERT_OK_AND_ASSIGN(client, FlightClient::Connect(location2)); ASSERT_OK(client->Close()); } @@ -143,9 +143,9 @@ TEST(TestFlight, ConnectUriUnix) { std::unique_ptr client; ASSERT_OK_AND_ASSIGN(auto location1, Location::Parse(uri)); ASSERT_OK_AND_ASSIGN(auto location2, Location::Parse(uri)); - ASSERT_OK(FlightClient::Connect(location1, &client)); + ASSERT_OK_AND_ASSIGN(client, FlightClient::Connect(location1)); ASSERT_OK(client->Close()); - ASSERT_OK(FlightClient::Connect(location2, &client)); + ASSERT_OK_AND_ASSIGN(client, FlightClient::Connect(location2)); ASSERT_OK(client->Close()); } #endif @@ -161,9 +161,8 @@ TEST(TestFlight, DISABLED_IpV6Port) { ASSERT_OK_AND_ASSIGN(auto location2, Location::ForGrpcTcp("[::1]", server->port())); std::unique_ptr client; - ASSERT_OK(FlightClient::Connect(location2, &client)); - std::unique_ptr listing; - ASSERT_OK(client->ListFlights(&listing)); + ASSERT_OK_AND_ASSIGN(client, FlightClient::Connect(location2)); + ASSERT_OK(client->ListFlights()); } // ---------------------------------------------------------------------- @@ -189,7 +188,7 @@ class TestFlightClient : public ::testing::Test { Status ConnectClient() { ARROW_ASSIGN_OR_RAISE(auto location, Location::ForGrpcTcp("localhost", server_->port())); - return FlightClient::Connect(location, &client_); + return FlightClient::Connect(location).Value(&client_); } template @@ -198,8 +197,7 @@ class TestFlightClient : public ::testing::Test { EndpointCheckFunc&& check_endpoints) { auto expected_schema = expected_batches[0]->schema(); - std::unique_ptr info; - ASSERT_OK(client_->GetFlightInfo(descr, &info)); + ASSERT_OK_AND_ASSIGN(auto info, client_->GetFlightInfo(descr)); check_endpoints(info->endpoints()); ipc::DictionaryMemo dict_memo; @@ -215,11 +213,8 @@ class TestFlightClient : public ::testing::Test { auto num_batches = static_cast(expected_batches.size()); ASSERT_GE(num_batches, 2); - std::unique_ptr stream; - ASSERT_OK(client_->DoGet(ticket, &stream)); - - std::unique_ptr stream2; - ASSERT_OK(client_->DoGet(ticket, &stream2)); + ASSERT_OK_AND_ASSIGN(auto stream, client_->DoGet(ticket)); + ASSERT_OK_AND_ASSIGN(auto stream2, client_->DoGet(ticket)); ASSERT_OK_AND_ASSIGN(auto reader, MakeRecordBatchReader(std::move(stream2))); FlightStreamChunk chunk; @@ -367,7 +362,7 @@ class TestTls : public ::testing::Test { CertKeyPair root_cert; RETURN_NOT_OK(ExampleTlsCertificateRoot(&root_cert)); options.tls_root_certs = root_cert.pem_cert; - return FlightClient::Connect(location_, options, &client_); + return FlightClient::Connect(location_, options).Value(&client_); } protected: @@ -650,7 +645,7 @@ class PropagatingTestServer : public FlightServerBase { current_span_id = ((const TracingServerMiddleware*)middleware)->span_id; } - return client_->DoAction(action, result); + return client_->DoAction(action).Value(result); } private: @@ -820,7 +815,7 @@ class TestBasicHeaderAuthMiddleware : public ::testing::Test { std::unique_ptr listing; FlightCallOptions call_options; call_options.headers.push_back(bearer_result.ValueOrDie()); - ASSERT_OK(client_->ListFlights(call_options, {}, &listing)); + ASSERT_OK_AND_ASSIGN(listing, client_->ListFlights(call_options, {})); ASSERT_TRUE(bearer_middleware_->GetIsValid()); } @@ -846,13 +841,12 @@ class TestBasicHeaderAuthMiddleware : public ::testing::Test { TEST_F(TestErrorMiddleware, TestMetadata) { Action action; - std::unique_ptr stream; // Run action1 action.type = "action1"; action.body = Buffer::FromString("action1-content"); - Status s = client_->DoAction(action, &stream); + Status s = client_->DoAction(action).status(); ASSERT_FALSE(s.ok()); std::shared_ptr flightStatusDetail = FlightStatusDetail::UnwrapStatus(s); @@ -861,8 +855,7 @@ TEST_F(TestErrorMiddleware, TestMetadata) { } TEST_F(TestFlightClient, ListFlights) { - std::unique_ptr listing; - ASSERT_OK(client_->ListFlights(&listing)); + ASSERT_OK_AND_ASSIGN(auto listing, client_->ListFlights()); ASSERT_TRUE(listing != nullptr); std::vector flights = ExampleFlightInfo(); @@ -880,8 +873,7 @@ TEST_F(TestFlightClient, ListFlights) { } TEST_F(TestFlightClient, ListFlightsWithCriteria) { - std::unique_ptr listing; - ASSERT_OK(client_->ListFlights(FlightCallOptions(), {"foo"}, &listing)); + ASSERT_OK_AND_ASSIGN(auto listing, client_->ListFlights(FlightCallOptions(), {"foo"})); std::unique_ptr info; ASSERT_OK_AND_ASSIGN(info, listing->Next()); ASSERT_TRUE(info == nullptr); @@ -889,9 +881,7 @@ TEST_F(TestFlightClient, ListFlightsWithCriteria) { TEST_F(TestFlightClient, GetFlightInfo) { auto descr = FlightDescriptor::Path({"examples", "ints"}); - std::unique_ptr info; - - ASSERT_OK(client_->GetFlightInfo(descr, &info)); + ASSERT_OK_AND_ASSIGN(auto info, client_->GetFlightInfo(descr)); ASSERT_NE(info, nullptr); std::vector flights = ExampleFlightInfo(); @@ -909,17 +899,15 @@ TEST_F(TestFlightClient, GetSchema) { TEST_F(TestFlightClient, GetFlightInfoNotFound) { auto descr = FlightDescriptor::Path({"examples", "things"}); - std::unique_ptr info; // XXX Ideally should be Invalid (or KeyError), but gRPC doesn't support // multiple error codes. - auto st = client_->GetFlightInfo(descr, &info); + auto st = client_->GetFlightInfo(descr).status(); ASSERT_RAISES(Invalid, st); ASSERT_NE(st.message().find("Flight not found"), std::string::npos); } TEST_F(TestFlightClient, ListActions) { - std::vector actions; - ASSERT_OK(client_->ListActions(&actions)); + ASSERT_OK_AND_ASSIGN(std::vector actions, client_->ListActions()); std::vector expected = ExampleActionTypes(); EXPECT_THAT(actions, ::testing::ContainerEq(expected)); @@ -927,7 +915,6 @@ TEST_F(TestFlightClient, ListActions) { TEST_F(TestFlightClient, DoAction) { Action action; - std::unique_ptr stream; std::unique_ptr result; // Run action1 @@ -935,7 +922,7 @@ TEST_F(TestFlightClient, DoAction) { const std::string action1_value = "action1-content"; action.body = Buffer::FromString(action1_value); - ASSERT_OK(client_->DoAction(action, &stream)); + ASSERT_OK_AND_ASSIGN(auto stream, client_->DoAction(action)); for (int i = 0; i < 3; ++i) { ASSERT_OK_AND_ASSIGN(result, stream->Next()); @@ -949,7 +936,7 @@ TEST_F(TestFlightClient, DoAction) { // Run action2, no results action.type = "action2"; - ASSERT_OK(client_->DoAction(action, &stream)); + ASSERT_OK_AND_ASSIGN(stream, client_->DoAction(action)); ASSERT_OK_AND_ASSIGN(result, stream->Next()); ASSERT_EQ(nullptr, result); @@ -957,20 +944,18 @@ TEST_F(TestFlightClient, DoAction) { TEST_F(TestFlightClient, RoundTripStatus) { const auto descr = FlightDescriptor::Command("status-outofmemory"); - std::unique_ptr info; - const auto status = client_->GetFlightInfo(descr, &info); + const auto status = client_->GetFlightInfo(descr).status(); ASSERT_RAISES(OutOfMemory, status); } // Test setting generic transport options by configuring gRPC to fail // all calls. TEST_F(TestFlightClient, GenericOptions) { - std::unique_ptr client; auto options = FlightClientOptions::Defaults(); // Set a very low limit at the gRPC layer to fail all calls options.generic_options.emplace_back(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, 4); ASSERT_OK_AND_ASSIGN(auto location, Location::ForGrpcTcp("localhost", server_->port())); - ASSERT_OK(FlightClient::Connect(location, options, &client)); + ASSERT_OK_AND_ASSIGN(auto client, FlightClient::Connect(location, options)); auto descr = FlightDescriptor::Path({"examples", "ints"}); std::shared_ptr schema; ipc::DictionaryMemo dict_memo; @@ -981,14 +966,12 @@ TEST_F(TestFlightClient, GenericOptions) { TEST_F(TestFlightClient, TimeoutFires) { // Server does not exist on this port, so call should fail - std::unique_ptr client; ASSERT_OK_AND_ASSIGN(auto location, Location::ForGrpcTcp("localhost", 30001)); - ASSERT_OK(FlightClient::Connect(location, &client)); + ASSERT_OK_AND_ASSIGN(auto client, FlightClient::Connect(location)); FlightCallOptions options; options.timeout = TimeoutDuration{0.2}; - std::unique_ptr info; auto start = std::chrono::system_clock::now(); - Status status = client->GetFlightInfo(options, FlightDescriptor{}, &info); + Status status = client->GetFlightInfo(options, FlightDescriptor{}).status(); auto end = std::chrono::system_clock::now(); #ifdef ARROW_WITH_TIMING_TESTS EXPECT_LE(end - start, std::chrono::milliseconds{400}); @@ -1005,7 +988,7 @@ TEST_F(TestFlightClient, NoTimeout) { std::unique_ptr info; auto start = std::chrono::system_clock::now(); auto descriptor = FlightDescriptor::Path({"examples", "ints"}); - Status status = client_->GetFlightInfo(options, descriptor, &info); + Status status = client_->GetFlightInfo(options, descriptor).Value(&info); auto end = std::chrono::system_clock::now(); #ifdef ARROW_WITH_TIMING_TESTS EXPECT_LE(end - start, std::chrono::milliseconds{600}); @@ -1022,9 +1005,8 @@ TEST_F(TestFlightClient, Close) { // Idempotent ASSERT_OK(client_->Close()); - std::unique_ptr listing; EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, ::testing::HasSubstr("FlightClient is closed"), - client_->ListFlights(&listing)); + client_->ListFlights()); } TEST_F(TestAuthHandler, PassAuthenticatedCalls) { @@ -1033,65 +1015,52 @@ TEST_F(TestAuthHandler, PassAuthenticatedCalls) { std::unique_ptr(new TestClientAuthHandler("user", "p4ssw0rd")))); Status status; - std::unique_ptr listing; - status = client_->ListFlights(&listing); + status = client_->ListFlights().status(); ASSERT_RAISES(NotImplemented, status); std::unique_ptr results; Action action; action.type = ""; action.body = Buffer::FromString(""); - status = client_->DoAction(action, &results); - ASSERT_OK(status); + ASSERT_OK_AND_ASSIGN(results, client_->DoAction(action)); - std::vector actions; - status = client_->ListActions(&actions); + status = client_->ListActions().status(); ASSERT_RAISES(NotImplemented, status); - std::unique_ptr info; - status = client_->GetFlightInfo(FlightDescriptor{}, &info); + status = client_->GetFlightInfo(FlightDescriptor{}).status(); ASSERT_RAISES(NotImplemented, status); - std::unique_ptr stream; - status = client_->DoGet(Ticket{}, &stream); + status = client_->DoGet(Ticket{}).status(); ASSERT_RAISES(NotImplemented, status); - std::unique_ptr writer; - std::unique_ptr reader; std::shared_ptr schema = arrow::schema({}); - status = client_->DoPut(FlightDescriptor{}, schema, &writer, &reader); - ASSERT_OK(status); - status = writer->Close(); + ASSERT_OK_AND_ASSIGN(auto do_put_result, client_->DoPut(FlightDescriptor{}, schema)); + status = do_put_result.writer->Close(); ASSERT_RAISES(NotImplemented, status); } TEST_F(TestAuthHandler, FailUnauthenticatedCalls) { Status status; - std::unique_ptr listing; - status = client_->ListFlights(&listing); + status = client_->ListFlights().status(); ASSERT_RAISES(IOError, status); ASSERT_THAT(status.message(), ::testing::HasSubstr("Invalid token")); - std::unique_ptr results; Action action; action.type = ""; action.body = Buffer::FromString(""); - status = client_->DoAction(action, &results); + status = client_->DoAction(action).status(); ASSERT_RAISES(IOError, status); ASSERT_THAT(status.message(), ::testing::HasSubstr("Invalid token")); - std::vector actions; - status = client_->ListActions(&actions); + status = client_->ListActions().status(); ASSERT_RAISES(IOError, status); ASSERT_THAT(status.message(), ::testing::HasSubstr("Invalid token")); - std::unique_ptr info; - status = client_->GetFlightInfo(FlightDescriptor{}, &info); + status = client_->GetFlightInfo(FlightDescriptor{}).status(); ASSERT_RAISES(IOError, status); ASSERT_THAT(status.message(), ::testing::HasSubstr("Invalid token")); - std::unique_ptr stream; - status = client_->DoGet(Ticket{}, &stream); + status = client_->DoGet(Ticket{}).status(); ASSERT_RAISES(IOError, status); ASSERT_THAT(status.message(), ::testing::HasSubstr("Invalid token")); @@ -1099,9 +1068,10 @@ TEST_F(TestAuthHandler, FailUnauthenticatedCalls) { std::unique_ptr reader; std::shared_ptr schema( (new arrow::Schema(std::vector>()))); - status = client_->DoPut(FlightDescriptor{}, schema, &writer, &reader); + FlightClient::DoPutResult do_put_result; + status = client_->DoPut(FlightDescriptor{}, schema).Value(&do_put_result); // ARROW-16053: gRPC may or may not fail the call immediately - if (status.ok()) status = writer->Close(); + if (status.ok()) status = do_put_result.writer->Close(); ASSERT_RAISES(IOError, status); // ARROW-7583: don't check the error message here. // Because gRPC reports errors in some paths with booleans, instead @@ -1119,7 +1089,7 @@ TEST_F(TestAuthHandler, CheckPeerIdentity) { action.type = "who-am-i"; action.body = Buffer::FromString(""); std::unique_ptr results; - ASSERT_OK(client_->DoAction(action, &results)); + ASSERT_OK_AND_ASSIGN(results, client_->DoAction(action)); ASSERT_NE(results, nullptr); std::unique_ptr result; @@ -1144,76 +1114,62 @@ TEST_F(TestBasicAuthHandler, PassAuthenticatedCalls) { new TestClientBasicAuthHandler("user", "p4ssw0rd")))); Status status; - std::unique_ptr listing; - status = client_->ListFlights(&listing); + status = client_->ListFlights().status(); ASSERT_RAISES(NotImplemented, status); - std::unique_ptr results; Action action; action.type = ""; action.body = Buffer::FromString(""); - status = client_->DoAction(action, &results); + status = client_->DoAction(action).status(); ASSERT_OK(status); - std::vector actions; - status = client_->ListActions(&actions); + status = client_->ListActions().status(); ASSERT_RAISES(NotImplemented, status); - std::unique_ptr info; - status = client_->GetFlightInfo(FlightDescriptor{}, &info); + status = client_->GetFlightInfo(FlightDescriptor{}).status(); ASSERT_RAISES(NotImplemented, status); - std::unique_ptr stream; - status = client_->DoGet(Ticket{}, &stream); + status = client_->DoGet(Ticket{}).status(); ASSERT_RAISES(NotImplemented, status); - std::unique_ptr writer; - std::unique_ptr reader; std::shared_ptr schema = arrow::schema({}); - status = client_->DoPut(FlightDescriptor{}, schema, &writer, &reader); - ASSERT_OK(status); - status = writer->Close(); + ASSERT_OK_AND_ASSIGN(auto do_put_result, client_->DoPut(FlightDescriptor{}, schema)); + status = do_put_result.writer->Close(); ASSERT_RAISES(NotImplemented, status); } TEST_F(TestBasicAuthHandler, FailUnauthenticatedCalls) { Status status; - std::unique_ptr listing; - status = client_->ListFlights(&listing); + status = client_->ListFlights().status(); ASSERT_RAISES(IOError, status); ASSERT_THAT(status.message(), ::testing::HasSubstr("Invalid token")); - std::unique_ptr results; Action action; action.type = ""; action.body = Buffer::FromString(""); - status = client_->DoAction(action, &results); + status = client_->DoAction(action).status(); ASSERT_RAISES(IOError, status); ASSERT_THAT(status.message(), ::testing::HasSubstr("Invalid token")); - std::vector actions; - status = client_->ListActions(&actions); + status = client_->ListActions().status(); ASSERT_RAISES(IOError, status); ASSERT_THAT(status.message(), ::testing::HasSubstr("Invalid token")); - std::unique_ptr info; - status = client_->GetFlightInfo(FlightDescriptor{}, &info); + status = client_->GetFlightInfo(FlightDescriptor{}).status(); ASSERT_RAISES(IOError, status); ASSERT_THAT(status.message(), ::testing::HasSubstr("Invalid token")); - std::unique_ptr stream; - status = client_->DoGet(Ticket{}, &stream); + status = client_->DoGet(Ticket{}).status(); ASSERT_RAISES(IOError, status); ASSERT_THAT(status.message(), ::testing::HasSubstr("Invalid token")); - std::unique_ptr writer; - std::unique_ptr reader; std::shared_ptr schema( (new arrow::Schema(std::vector>()))); - status = client_->DoPut(FlightDescriptor{}, schema, &writer, &reader); + FlightClient::DoPutResult do_put_result; + status = client_->DoPut(FlightDescriptor{}, schema).Value(&do_put_result); // May or may not succeed depending on if the transport buffers the write ARROW_UNUSED(status); - status = writer->Close(); + status = do_put_result.writer->Close(); // But this should definitely fail ASSERT_RAISES(IOError, status); ASSERT_THAT(status.message(), ::testing::HasSubstr("Invalid token")); @@ -1227,8 +1183,7 @@ TEST_F(TestBasicAuthHandler, CheckPeerIdentity) { Action action; action.type = "who-am-i"; action.body = Buffer::FromString(""); - std::unique_ptr results; - ASSERT_OK(client_->DoAction(action, &results)); + ASSERT_OK_AND_ASSIGN(auto results, client_->DoAction(action)); ASSERT_NE(results, nullptr); std::unique_ptr result; @@ -1244,8 +1199,7 @@ TEST_F(TestTls, DoAction) { Action action; action.type = "test"; action.body = Buffer::FromString(""); - std::unique_ptr results; - ASSERT_OK(client_->DoAction(options, action, &results)); + ASSERT_OK_AND_ASSIGN(auto results, client_->DoAction(options, action)); ASSERT_NE(results, nullptr); std::unique_ptr result; @@ -1256,21 +1210,19 @@ TEST_F(TestTls, DoAction) { #if defined(GRPC_NAMESPACE_FOR_TLS_CREDENTIALS_OPTIONS) TEST_F(TestTls, DisableServerVerification) { - std::unique_ptr client; auto client_options = FlightClientOptions::Defaults(); // For security reasons, if encryption is being used, // the client should be configured to verify the server by default. ASSERT_EQ(client_options.disable_server_verification, false); client_options.disable_server_verification = true; - ASSERT_OK(FlightClient::Connect(location_, client_options, &client)); + ASSERT_OK_AND_ASSIGN(auto client, FlightClient::Connect(location_, client_options)); FlightCallOptions options; options.timeout = TimeoutDuration{5.0}; Action action; action.type = "test"; action.body = Buffer::FromString(""); - std::unique_ptr results; - ASSERT_OK(client->DoAction(options, action, &results)); + ASSERT_OK_AND_ASSIGN(auto results, client->DoAction(options, action)); ASSERT_NE(results, nullptr); std::unique_ptr result; @@ -1281,60 +1233,53 @@ TEST_F(TestTls, DisableServerVerification) { #endif TEST_F(TestTls, OverrideHostname) { - std::unique_ptr client; auto client_options = FlightClientOptions::Defaults(); client_options.override_hostname = "fakehostname"; CertKeyPair root_cert; ASSERT_OK(ExampleTlsCertificateRoot(&root_cert)); client_options.tls_root_certs = root_cert.pem_cert; - ASSERT_OK(FlightClient::Connect(location_, client_options, &client)); + ASSERT_OK_AND_ASSIGN(auto client, FlightClient::Connect(location_, client_options)); FlightCallOptions options; options.timeout = TimeoutDuration{5.0}; Action action; action.type = "test"; action.body = Buffer::FromString(""); - std::unique_ptr results; - ASSERT_RAISES(IOError, client->DoAction(options, action, &results)); + ASSERT_RAISES(IOError, client->DoAction(options, action)); } // Test the facility for setting generic transport options. TEST_F(TestTls, OverrideHostnameGeneric) { - std::unique_ptr client; auto client_options = FlightClientOptions::Defaults(); client_options.generic_options.emplace_back(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG, "fakehostname"); CertKeyPair root_cert; ASSERT_OK(ExampleTlsCertificateRoot(&root_cert)); client_options.tls_root_certs = root_cert.pem_cert; - ASSERT_OK(FlightClient::Connect(location_, client_options, &client)); + ASSERT_OK_AND_ASSIGN(auto client, FlightClient::Connect(location_, client_options)); FlightCallOptions options; options.timeout = TimeoutDuration{5.0}; Action action; action.type = "test"; action.body = Buffer::FromString(""); - std::unique_ptr results; - ASSERT_RAISES(IOError, client->DoAction(options, action, &results)); + ASSERT_RAISES(IOError, client->DoAction(options, action)); // Could check error message for the gRPC error message but it isn't // necessarily stable } TEST_F(TestRejectServerMiddleware, Rejected) { - std::unique_ptr info; - const auto& status = client_->GetFlightInfo(FlightDescriptor{}, &info); + const Status status = client_->GetFlightInfo(FlightDescriptor{}).status(); ASSERT_RAISES(IOError, status); ASSERT_THAT(status.message(), ::testing::HasSubstr("All calls are rejected")); } TEST_F(TestCountingServerMiddleware, Count) { - std::unique_ptr info; - const auto& status = client_->GetFlightInfo(FlightDescriptor{}, &info); + const Status status = client_->GetFlightInfo(FlightDescriptor{}).status(); ASSERT_RAISES(NotImplemented, status); Ticket ticket{""}; - std::unique_ptr stream; - ASSERT_OK(client_->DoGet(ticket, &stream)); + ASSERT_OK_AND_ASSIGN(auto stream, client_->DoGet(ticket)); ASSERT_EQ(1, request_counter_->failed_); @@ -1351,7 +1296,6 @@ TEST_F(TestCountingServerMiddleware, Count) { TEST_F(TestPropagatingMiddleware, Propagate) { Action action; - std::unique_ptr stream; std::unique_ptr result; current_span_id = "trace-id"; @@ -1359,7 +1303,7 @@ TEST_F(TestPropagatingMiddleware, Propagate) { action.type = "action1"; action.body = Buffer::FromString("action1-content"); - ASSERT_OK(client_->DoAction(action, &stream)); + ASSERT_OK_AND_ASSIGN(auto stream, client_->DoAction(action)); ASSERT_OK_AND_ASSIGN(result, stream->Next()); ASSERT_EQ("trace-id", result->body->ToString()); @@ -1371,8 +1315,7 @@ TEST_F(TestPropagatingMiddleware, Propagate) { // passed to the interceptor TEST_F(TestPropagatingMiddleware, ListFlights) { client_middleware_->Reset(); - std::unique_ptr listing; - const Status status = client_->ListFlights(&listing); + const Status status = client_->ListFlights().status(); ASSERT_RAISES(NotImplemented, status); ValidateStatus(status, FlightMethod::ListFlights); } @@ -1380,8 +1323,7 @@ TEST_F(TestPropagatingMiddleware, ListFlights) { TEST_F(TestPropagatingMiddleware, GetFlightInfo) { client_middleware_->Reset(); auto descr = FlightDescriptor::Path({"examples", "ints"}); - std::unique_ptr info; - const Status status = client_->GetFlightInfo(descr, &info); + const Status status = client_->GetFlightInfo(descr).status(); ASSERT_RAISES(NotImplemented, status); ValidateStatus(status, FlightMethod::GetFlightInfo); } @@ -1397,7 +1339,7 @@ TEST_F(TestPropagatingMiddleware, GetSchema) { TEST_F(TestPropagatingMiddleware, ListActions) { client_middleware_->Reset(); std::vector actions; - const Status status = client_->ListActions(&actions); + const Status status = client_->ListActions().status(); ASSERT_RAISES(NotImplemented, status); ValidateStatus(status, FlightMethod::ListActions); } @@ -1406,7 +1348,7 @@ TEST_F(TestPropagatingMiddleware, DoGet) { client_middleware_->Reset(); Ticket ticket1{"ARROW-5095-fail"}; std::unique_ptr stream; - Status status = client_->DoGet(ticket1, &stream); + Status status = client_->DoGet(ticket1).status(); ASSERT_RAISES(NotImplemented, status); ValidateStatus(status, FlightMethod::DoGet); } @@ -1417,10 +1359,8 @@ TEST_F(TestPropagatingMiddleware, DoPut) { auto a1 = ArrayFromJSON(int32(), "[4, 5, 6, null]"); auto schema = arrow::schema({field("f1", a1->type())}); - std::unique_ptr stream; - std::unique_ptr reader; - ASSERT_OK(client_->DoPut(descr, schema, &stream, &reader)); - const Status status = stream->Close(); + ASSERT_OK_AND_ASSIGN(auto do_put_result, client_->DoPut(descr, schema)); + const Status status = do_put_result.writer->Close(); ASSERT_RAISES(NotImplemented, status); ValidateStatus(status, FlightMethod::DoPut); } @@ -1515,20 +1455,18 @@ TEST_F(TestCancel, ListFlights) { StopSource stop_source; FlightCallOptions options; options.stop_token = stop_source.token(); - std::unique_ptr listing; stop_source.RequestStop(Status::Cancelled("StopSource")); EXPECT_RAISES_WITH_MESSAGE_THAT(Cancelled, ::testing::HasSubstr("StopSource"), - client_->ListFlights(options, {}, &listing)); + client_->ListFlights(options, {})); } TEST_F(TestCancel, DoAction) { StopSource stop_source; FlightCallOptions options; options.stop_token = stop_source.token(); - std::unique_ptr results; stop_source.RequestStop(Status::Cancelled("StopSource")); EXPECT_RAISES_WITH_MESSAGE_THAT(Cancelled, ::testing::HasSubstr("StopSource"), - client_->DoAction(options, {}, &results)); + client_->DoAction(options, {})); } TEST_F(TestCancel, ListActions) { @@ -1538,7 +1476,7 @@ TEST_F(TestCancel, ListActions) { std::vector results; stop_source.RequestStop(Status::Cancelled("StopSource")); EXPECT_RAISES_WITH_MESSAGE_THAT(Cancelled, ::testing::HasSubstr("StopSource"), - client_->ListActions(options, &results)); + client_->ListActions(options)); } TEST_F(TestCancel, DoGet) { @@ -1547,12 +1485,11 @@ TEST_F(TestCancel, DoGet) { options.stop_token = stop_source.token(); std::unique_ptr results; stop_source.RequestStop(Status::Cancelled("StopSource")); - std::unique_ptr stream; - ASSERT_OK(client_->DoGet(options, {}, &stream)); + ASSERT_OK_AND_ASSIGN(auto stream, client_->DoGet(options, {})); EXPECT_RAISES_WITH_MESSAGE_THAT(Cancelled, ::testing::HasSubstr("StopSource"), stream->ToTable()); - ASSERT_OK(client_->DoGet({}, &stream)); + ASSERT_OK_AND_ASSIGN(stream, client_->DoGet({})); EXPECT_RAISES_WITH_MESSAGE_THAT(Cancelled, ::testing::HasSubstr("StopSource"), stream->ToTable(options.stop_token)); } @@ -1563,18 +1500,17 @@ TEST_F(TestCancel, DoExchange) { options.stop_token = stop_source.token(); std::unique_ptr results; stop_source.RequestStop(Status::Cancelled("StopSource")); - std::unique_ptr writer; - std::unique_ptr stream; - ASSERT_OK( - client_->DoExchange(options, FlightDescriptor::Command(""), &writer, &stream)); + ASSERT_OK_AND_ASSIGN(auto do_exchange_result, + client_->DoExchange(options, FlightDescriptor::Command(""))); EXPECT_RAISES_WITH_MESSAGE_THAT(Cancelled, ::testing::HasSubstr("StopSource"), - stream->ToTable()); - ARROW_UNUSED(writer->Close()); + do_exchange_result.reader->ToTable()); + ARROW_UNUSED(do_exchange_result.writer->Close()); - ASSERT_OK(client_->DoExchange(FlightDescriptor::Command(""), &writer, &stream)); + ASSERT_OK_AND_ASSIGN(do_exchange_result, + client_->DoExchange(FlightDescriptor::Command(""))); EXPECT_RAISES_WITH_MESSAGE_THAT(Cancelled, ::testing::HasSubstr("StopSource"), - stream->ToTable(options.stop_token)); - ARROW_UNUSED(writer->Close()); + do_exchange_result.reader->ToTable(options.stop_token)); + ARROW_UNUSED(do_exchange_result.writer->Close()); } } // namespace flight diff --git a/cpp/src/arrow/flight/integration_tests/test_integration.cc b/cpp/src/arrow/flight/integration_tests/test_integration.cc index 89dc191c651a1..e1d79f1a3e688 100644 --- a/cpp/src/arrow/flight/integration_tests/test_integration.cc +++ b/cpp/src/arrow/flight/integration_tests/test_integration.cc @@ -51,7 +51,7 @@ class AuthBasicProtoServer : public FlightServerBase { Status CheckActionResults(FlightClient* client, const Action& action, std::vector results) { std::unique_ptr stream; - RETURN_NOT_OK(client->DoAction(action, &stream)); + ARROW_ASSIGN_OR_RAISE(stream, client->DoAction(action)); std::unique_ptr result; for (const std::string& expected : results) { ARROW_ASSIGN_OR_RAISE(result, stream->Next()); @@ -91,7 +91,7 @@ class AuthBasicProtoScenario : public Scenario { Action action; std::unique_ptr stream; std::shared_ptr detail; - const auto& status = client->DoAction(action, &stream); + const auto& status = client->DoAction(action).Value(&stream); detail = FlightStatusDetail::UnwrapStatus(status); // This client is unauthenticated and should fail. if (detail == nullptr) { @@ -231,12 +231,11 @@ class MiddlewareScenario : public Scenario { } Status RunClient(std::unique_ptr client) override { - std::unique_ptr info; // This call is expected to fail. In gRPC/Java, this causes the // server to combine headers and HTTP/2 trailers, so to read the // expected header, Flight must check for both headers and // trailers. - if (client->GetFlightInfo(FlightDescriptor::Command(""), &info).ok()) { + if (client->GetFlightInfo(FlightDescriptor::Command("")).status().ok()) { return Status::Invalid("Expected call to fail"); } if (client_middleware_->received_header_ != "expected value") { @@ -248,7 +247,8 @@ class MiddlewareScenario : public Scenario { // This call should succeed client_middleware_->received_header_ = ""; - RETURN_NOT_OK(client->GetFlightInfo(FlightDescriptor::Command("success"), &info)); + ARROW_ASSIGN_OR_RAISE(auto info, + client->GetFlightInfo(FlightDescriptor::Command("success"))); if (client_middleware_->received_header_ != "expected value") { return Status::Invalid( "Expected to receive header 'x-middleware: expected value', but instead got '", diff --git a/cpp/src/arrow/flight/integration_tests/test_integration_client.cc b/cpp/src/arrow/flight/integration_tests/test_integration_client.cc index 08f80e9923a29..9c5c985c06efd 100644 --- a/cpp/src/arrow/flight/integration_tests/test_integration_client.cc +++ b/cpp/src/arrow/flight/integration_tests/test_integration_client.cc @@ -93,8 +93,7 @@ Status UploadBatchesToFlight(const std::vector>& ch Status ConsumeFlightLocation( FlightClient* read_client, const Ticket& ticket, const std::vector>& retrieved_data) { - std::unique_ptr stream; - RETURN_NOT_OK(read_client->DoGet(ticket, &stream)); + ARROW_ASSIGN_OR_RAISE(auto stream, read_client->DoGet(ticket)); int counter = 0; const int expected = static_cast(retrieved_data.size()); @@ -161,14 +160,14 @@ class IntegrationTestScenario : public Scenario { std::vector> original_data; ABORT_NOT_OK(ReadBatches(reader, &original_data)); - std::unique_ptr write_stream; - std::unique_ptr metadata_reader; - ABORT_NOT_OK(client->DoPut(descr, original_schema, &write_stream, &metadata_reader)); + auto do_put_result = client->DoPut(descr, original_schema).ValueOrDie(); + std::unique_ptr write_stream = std::move(do_put_result.writer); + std::unique_ptr metadata_reader = + std::move(do_put_result.reader); ABORT_NOT_OK(UploadBatchesToFlight(original_data, *write_stream, *metadata_reader)); // 2. Get the ticket for the data. - std::unique_ptr info; - ABORT_NOT_OK(client->GetFlightInfo(descr, &info)); + std::unique_ptr info = client->GetFlightInfo(descr).ValueOrDie(); std::shared_ptr schema; ipc::DictionaryMemo dict_memo; @@ -189,7 +188,7 @@ class IntegrationTestScenario : public Scenario { for (const auto& location : endpoint.locations) { std::cout << "Verifying location " << location.ToString() << std::endl; std::unique_ptr read_client; - RETURN_NOT_OK(FlightClient::Connect(location, &read_client)); + ARROW_ASSIGN_OR_RAISE(read_client, FlightClient::Connect(location)); RETURN_NOT_OK(ConsumeFlightLocation(read_client.get(), ticket, original_data)); RETURN_NOT_OK(read_client->Close()); } @@ -212,7 +211,7 @@ arrow::Status RunScenario(arrow::flight::integration_tests::Scenario* scenario) RETURN_NOT_OK(scenario->MakeClient(&options)); ARROW_ASSIGN_OR_RAISE(auto location, arrow::flight::Location::ForGrpcTcp(FLAGS_host, FLAGS_port)); - RETURN_NOT_OK(arrow::flight::FlightClient::Connect(location, options, &client)); + ARROW_ASSIGN_OR_RAISE(client, arrow::flight::FlightClient::Connect(location, options)); return scenario->RunClient(std::move(client)); } diff --git a/cpp/src/arrow/flight/sql/client.h b/cpp/src/arrow/flight/sql/client.h index 454f98a351c94..212f25c2faee5 100644 --- a/cpp/src/arrow/flight/sql/client.h +++ b/cpp/src/arrow/flight/sql/client.h @@ -170,10 +170,7 @@ class ARROW_EXPORT FlightSqlClient { // function GetFlightInfoForCommand. virtual arrow::Result> GetFlightInfo( const FlightCallOptions& options, const FlightDescriptor& descriptor) { - std::unique_ptr info; - ARROW_RETURN_NOT_OK(impl_->GetFlightInfo(options, descriptor, &info)); - - return info; + return impl_->GetFlightInfo(options, descriptor); } /// \brief Explicitly shut down and clean up the client. @@ -183,19 +180,22 @@ class ARROW_EXPORT FlightSqlClient { virtual Status DoPut(const FlightCallOptions& options, const FlightDescriptor& descriptor, const std::shared_ptr& schema, - std::unique_ptr* stream, + std::unique_ptr* writer, std::unique_ptr* reader) { - return impl_->DoPut(options, descriptor, schema, stream, reader); + ARROW_ASSIGN_OR_RAISE(auto result, impl_->DoPut(options, descriptor, schema)); + *writer = std::move(result.writer); + *reader = std::move(result.reader); + return Status::OK(); } virtual Status DoGet(const FlightCallOptions& options, const Ticket& ticket, std::unique_ptr* stream) { - return impl_->DoGet(options, ticket, stream); + return impl_->DoGet(options, ticket).Value(stream); } virtual Status DoAction(const FlightCallOptions& options, const Action& action, std::unique_ptr* results) { - return impl_->DoAction(options, action, results); + return impl_->DoAction(options, action).Value(results); } }; diff --git a/cpp/src/arrow/flight/sql/server_test.cc b/cpp/src/arrow/flight/sql/server_test.cc index d2b41df8f95ab..746c91c102b3e 100644 --- a/cpp/src/arrow/flight/sql/server_test.cc +++ b/cpp/src/arrow/flight/sql/server_test.cc @@ -162,9 +162,8 @@ class TestFlightSqlServer : public ::testing::Test { ss << "grpc://localhost:" << port; std::string uri = ss.str(); - std::unique_ptr client; ASSERT_OK_AND_ASSIGN(auto location, Location::Parse(uri)); - ASSERT_OK(FlightClient::Connect(location, &client)); + ASSERT_OK_AND_ASSIGN(auto client, FlightClient::Connect(location)); sql_client.reset(new FlightSqlClient(std::move(client))); } diff --git a/cpp/src/arrow/flight/sql/test_app_cli.cc b/cpp/src/arrow/flight/sql/test_app_cli.cc index 63924cc1c91d4..7989210dd098b 100644 --- a/cpp/src/arrow/flight/sql/test_app_cli.cc +++ b/cpp/src/arrow/flight/sql/test_app_cli.cc @@ -101,9 +101,8 @@ Status PrintResults(FlightSqlClient& client, const FlightCallOptions& call_optio } Status RunMain() { - std::unique_ptr client; ARROW_ASSIGN_OR_RAISE(auto location, Location::ForGrpcTcp(FLAGS_host, FLAGS_port)); - ARROW_RETURN_NOT_OK(FlightClient::Connect(location, &client)); + ARROW_ASSIGN_OR_RAISE(auto client, FlightClient::Connect(location)); FlightCallOptions call_options; diff --git a/cpp/src/arrow/flight/test_definitions.cc b/cpp/src/arrow/flight/test_definitions.cc index 5ead99f94ba16..2cfac64144632 100644 --- a/cpp/src/arrow/flight/test_definitions.cc +++ b/cpp/src/arrow/flight/test_definitions.cc @@ -103,13 +103,12 @@ void ConnectivityTest::TestBrokenConnection() { std::unique_ptr client; ASSERT_OK_AND_ASSIGN(location, Location::ForScheme(transport(), "localhost", server->port())); - ASSERT_OK(FlightClient::Connect(location, &client)); + ASSERT_OK_AND_ASSIGN(client, FlightClient::Connect(location)); ASSERT_OK(server->Shutdown()); ASSERT_OK(server->Wait()); - std::unique_ptr info; - ASSERT_RAISES(IOError, client->GetFlightInfo(FlightDescriptor::Command(""), &info)); + ASSERT_RAISES(IOError, client->GetFlightInfo(FlightDescriptor::Command(""))); } //------------------------------------------------------------ @@ -131,15 +130,15 @@ void DataTest::TearDown() { Status DataTest::ConnectClient() { ARROW_ASSIGN_OR_RAISE(auto location, Location::ForScheme(transport(), "localhost", server_->port())); - return FlightClient::Connect(location, &client_); + ARROW_ASSIGN_OR_RAISE(client_, FlightClient::Connect(location)); + return Status::OK(); } void DataTest::CheckDoGet( const FlightDescriptor& descr, const RecordBatchVector& expected_batches, std::function&)> check_endpoints) { auto expected_schema = expected_batches[0]->schema(); - std::unique_ptr info; - ASSERT_OK(client_->GetFlightInfo(descr, &info)); + ASSERT_OK_AND_ASSIGN(auto info, client_->GetFlightInfo(descr)); check_endpoints(info->endpoints()); ipc::DictionaryMemo dict_memo; @@ -155,11 +154,9 @@ void DataTest::CheckDoGet(const Ticket& ticket, auto num_batches = static_cast(expected_batches.size()); ASSERT_GE(num_batches, 2); - std::unique_ptr stream; - ASSERT_OK(client_->DoGet(ticket, &stream)); + ASSERT_OK_AND_ASSIGN(auto stream, client_->DoGet(ticket)); - std::unique_ptr stream2; - ASSERT_OK(client_->DoGet(ticket, &stream2)); + ASSERT_OK_AND_ASSIGN(auto stream2, client_->DoGet(ticket)); ASSERT_OK_AND_ASSIGN(auto reader, MakeRecordBatchReader(std::move(stream2))); std::shared_ptr batch; @@ -247,7 +244,7 @@ void DataTest::TestOverflowServerBatch() { // DoGet: check for overflow on large batch Ticket ticket{"ARROW-13253-DoGet-Batch"}; std::unique_ptr stream; - ASSERT_OK(client_->DoGet(ticket, &stream)); + ASSERT_OK_AND_ASSIGN(stream, client_->DoGet(ticket)); FlightStreamChunk chunk; EXPECT_RAISES_WITH_MESSAGE_THAT( Invalid, ::testing::HasSubstr("Cannot send record batches exceeding 2GiB yet"), @@ -256,35 +253,30 @@ void DataTest::TestOverflowServerBatch() { { // DoExchange: check for overflow on large batch from server auto descr = FlightDescriptor::Command("large_batch"); - std::unique_ptr reader; - std::unique_ptr writer; - ASSERT_OK(client_->DoExchange(descr, &writer, &reader)); + ASSERT_OK_AND_ASSIGN(auto do_exchange_result, client_->DoExchange(descr)); RecordBatchVector batches; EXPECT_RAISES_WITH_MESSAGE_THAT( Invalid, ::testing::HasSubstr("Cannot send record batches exceeding 2GiB yet"), - reader->ToRecordBatches().Value(&batches)); - ARROW_UNUSED(writer->Close()); + do_exchange_result.reader->ToRecordBatches().Value(&batches)); + ARROW_UNUSED(do_exchange_result.writer->Close()); } } void DataTest::TestOverflowClientBatch() { ASSERT_OK_AND_ASSIGN(auto batch, VeryLargeBatch()); { // DoPut: check for overflow on large batch - std::unique_ptr stream; - std::unique_ptr reader; auto descr = FlightDescriptor::Path({""}); - ASSERT_OK(client_->DoPut(descr, batch->schema(), &stream, &reader)); + ASSERT_OK_AND_ASSIGN(auto do_put_result, client_->DoPut(descr, batch->schema())); EXPECT_RAISES_WITH_MESSAGE_THAT( Invalid, ::testing::HasSubstr("Cannot send record batches exceeding 2GiB yet"), - stream->WriteRecordBatch(*batch)); - ASSERT_OK(stream->Close()); + do_put_result.writer->WriteRecordBatch(*batch)); + ASSERT_OK(do_put_result.writer->Close()); } { // DoExchange: check for overflow on large batch from client auto descr = FlightDescriptor::Command("counter"); - std::unique_ptr reader; - std::unique_ptr writer; - ASSERT_OK(client_->DoExchange(descr, &writer, &reader)); + ASSERT_OK_AND_ASSIGN(auto exchange, client_->DoExchange(descr)); + auto writer = std::move(exchange.writer); ASSERT_OK(writer->Begin(batch->schema())); EXPECT_RAISES_WITH_MESSAGE_THAT( Invalid, ::testing::HasSubstr("Cannot send record batches exceeding 2GiB yet"), @@ -298,9 +290,9 @@ void DataTest::TestDoExchange() { auto a1 = ArrayFromJSON(int32(), "[4, 5, 6, null]"); auto schema = arrow::schema({field("f1", a1->type())}); batches.push_back(RecordBatch::Make(schema, a1->length(), {a1})); - std::unique_ptr reader; - std::unique_ptr writer; - ASSERT_OK(client_->DoExchange(descr, &writer, &reader)); + ASSERT_OK_AND_ASSIGN(auto exchange, client_->DoExchange(descr)); + std::unique_ptr reader = std::move(exchange.reader); + std::unique_ptr writer = std::move(exchange.writer); ASSERT_OK(writer->Begin(schema)); for (const auto& batch : batches) { ASSERT_OK(writer->WriteRecordBatch(*batch)); @@ -322,9 +314,9 @@ void DataTest::TestDoExchange() { // schema messages void DataTest::TestDoExchangeNoData() { auto descr = FlightDescriptor::Command("counter"); - std::unique_ptr reader; - std::unique_ptr writer; - ASSERT_OK(client_->DoExchange(descr, &writer, &reader)); + ASSERT_OK_AND_ASSIGN(auto exchange, client_->DoExchange(descr)); + std::unique_ptr reader = std::move(exchange.reader); + std::unique_ptr writer = std::move(exchange.writer); ASSERT_OK(writer->DoneWriting()); ASSERT_OK_AND_ASSIGN(auto chunk, reader->Next()); ASSERT_EQ(nullptr, chunk.data); @@ -336,9 +328,9 @@ void DataTest::TestDoExchangeNoData() { // in the client-side writer. void DataTest::TestDoExchangeWriteOnlySchema() { auto descr = FlightDescriptor::Command("counter"); - std::unique_ptr reader; - std::unique_ptr writer; - ASSERT_OK(client_->DoExchange(descr, &writer, &reader)); + ASSERT_OK_AND_ASSIGN(auto exchange, client_->DoExchange(descr)); + std::unique_ptr reader = std::move(exchange.reader); + std::unique_ptr writer = std::move(exchange.writer); auto schema = arrow::schema({field("f1", arrow::int32())}); ASSERT_OK(writer->Begin(schema)); ASSERT_OK(writer->WriteMetadata(Buffer::FromString("foo"))); @@ -352,9 +344,9 @@ void DataTest::TestDoExchangeWriteOnlySchema() { // Emulate DoGet void DataTest::TestDoExchangeGet() { auto descr = FlightDescriptor::Command("get"); - std::unique_ptr reader; - std::unique_ptr writer; - ASSERT_OK(client_->DoExchange(descr, &writer, &reader)); + ASSERT_OK_AND_ASSIGN(auto exchange, client_->DoExchange(descr)); + std::unique_ptr reader = std::move(exchange.reader); + std::unique_ptr writer = std::move(exchange.writer); ASSERT_OK_AND_ASSIGN(auto server_schema, reader->GetSchema()); AssertSchemaEqual(*ExampleIntSchema(), *server_schema); RecordBatchVector batches; @@ -372,9 +364,9 @@ void DataTest::TestDoExchangeGet() { // Emulate DoPut void DataTest::TestDoExchangePut() { auto descr = FlightDescriptor::Command("put"); - std::unique_ptr reader; - std::unique_ptr writer; - ASSERT_OK(client_->DoExchange(descr, &writer, &reader)); + ASSERT_OK_AND_ASSIGN(auto exchange, client_->DoExchange(descr)); + std::unique_ptr reader = std::move(exchange.reader); + std::unique_ptr writer = std::move(exchange.writer); ASSERT_OK(writer->Begin(ExampleIntSchema())); RecordBatchVector batches; ASSERT_OK(ExampleIntBatches(&batches)); @@ -393,9 +385,9 @@ void DataTest::TestDoExchangePut() { // Test the echo server void DataTest::TestDoExchangeEcho() { auto descr = FlightDescriptor::Command("echo"); - std::unique_ptr reader; - std::unique_ptr writer; - ASSERT_OK(client_->DoExchange(descr, &writer, &reader)); + ASSERT_OK_AND_ASSIGN(auto exchange, client_->DoExchange(descr)); + std::unique_ptr reader = std::move(exchange.reader); + std::unique_ptr writer = std::move(exchange.writer); ASSERT_OK(writer->Begin(ExampleIntSchema())); RecordBatchVector batches; ASSERT_OK(ExampleIntBatches(&batches)); @@ -445,7 +437,9 @@ void DataTest::TestDoExchangeTotal() { // here. EXPECT_RAISES_WITH_MESSAGE_THAT( Invalid, ::testing::HasSubstr("Field is not INT64: f1"), ([&]() { - RETURN_NOT_OK(client_->DoExchange(descr, &writer, &reader)); + ARROW_ASSIGN_OR_RAISE(auto exchange, client_->DoExchange(descr)); + reader = std::move(exchange.reader); + writer = std::move(exchange.writer); RETURN_NOT_OK(writer->Begin(schema)); auto batch = RecordBatch::Make(schema, /* num_rows */ 4, {a1}); RETURN_NOT_OK(writer->WriteRecordBatch(*batch)); @@ -456,7 +450,9 @@ void DataTest::TestDoExchangeTotal() { auto a1 = ArrayFromJSON(arrow::int64(), "[1, 2, null, 3]"); auto a2 = ArrayFromJSON(arrow::int64(), "[null, 4, 5, 6]"); auto schema = arrow::schema({field("f1", a1->type()), field("f2", a2->type())}); - ASSERT_OK(client_->DoExchange(descr, &writer, &reader)); + ASSERT_OK_AND_ASSIGN(auto exchange, client_->DoExchange(descr)); + reader = std::move(exchange.reader); + writer = std::move(exchange.writer); ASSERT_OK(writer->Begin(schema)); auto batch = RecordBatch::Make(schema, /* num_rows */ 4, {a1, a2}); ASSERT_OK(writer->WriteRecordBatch(*batch)); @@ -484,22 +480,28 @@ void DataTest::TestDoExchangeTotal() { // Ensure server errors get propagated no matter what we try void DataTest::TestDoExchangeError() { auto descr = FlightDescriptor::Command("error"); - std::unique_ptr reader; std::unique_ptr writer; + std::unique_ptr reader; { - ASSERT_OK(client_->DoExchange(descr, &writer, &reader)); + ASSERT_OK_AND_ASSIGN(auto exchange, client_->DoExchange(descr)); + writer = std::move(exchange.writer); + reader = std::move(exchange.reader); auto status = writer->Close(); EXPECT_RAISES_WITH_MESSAGE_THAT( NotImplemented, ::testing::HasSubstr("Expected error"), writer->Close()); } { - ASSERT_OK(client_->DoExchange(descr, &writer, &reader)); + ASSERT_OK_AND_ASSIGN(auto exchange, client_->DoExchange(descr)); + writer = std::move(exchange.writer); + reader = std::move(exchange.reader); EXPECT_RAISES_WITH_MESSAGE_THAT( NotImplemented, ::testing::HasSubstr("Expected error"), reader->Next()); ARROW_UNUSED(writer->Close()); } { - ASSERT_OK(client_->DoExchange(descr, &writer, &reader)); + ASSERT_OK_AND_ASSIGN(auto exchange, client_->DoExchange(descr)); + writer = std::move(exchange.writer); + reader = std::move(exchange.reader); EXPECT_RAISES_WITH_MESSAGE_THAT( NotImplemented, ::testing::HasSubstr("Expected error"), reader->GetSchema()); ARROW_UNUSED(writer->Close()); @@ -513,9 +515,9 @@ void DataTest::TestDoExchangeError() { void DataTest::TestDoExchangeConcurrency() { // Ensure that we can do reads/writes on separate threads auto descr = FlightDescriptor::Command("echo"); - std::unique_ptr reader; - std::unique_ptr writer; - ASSERT_OK(client_->DoExchange(descr, &writer, &reader)); + ASSERT_OK_AND_ASSIGN(auto exchange, client_->DoExchange(descr)); + std::unique_ptr reader = std::move(exchange.reader); + std::unique_ptr writer = std::move(exchange.writer); RecordBatchVector batches; ASSERT_OK(ExampleIntBatches(&batches)); @@ -546,9 +548,9 @@ void DataTest::TestDoExchangeUndrained() { auto descr = FlightDescriptor::Command("TestUndrained"); auto schema = arrow::schema({arrow::field("ints", int64())}); - std::unique_ptr reader; - std::unique_ptr writer; - ASSERT_OK(client_->DoExchange(descr, &writer, &reader)); + ASSERT_OK_AND_ASSIGN(auto exchange, client_->DoExchange(descr)); + std::unique_ptr reader = std::move(exchange.reader); + std::unique_ptr writer = std::move(exchange.writer); auto batch = RecordBatchFromJSON(schema, "[[1], [2], [3], [4]]"); ASSERT_OK(writer->Begin(schema)); @@ -567,13 +569,12 @@ void DataTest::TestIssue5095() { // Make sure the server-side error message is reflected to the // client Ticket ticket1{"ARROW-5095-fail"}; - std::unique_ptr stream; - Status status = client_->DoGet(ticket1, &stream); + Status status = client_->DoGet(ticket1).status(); ASSERT_RAISES(UnknownError, status); ASSERT_THAT(status.message(), ::testing::HasSubstr("Server-side error")); Ticket ticket2{"ARROW-5095-success"}; - status = client_->DoGet(ticket2, &stream); + status = client_->DoGet(ticket2).status(); ASSERT_RAISES(KeyError, status); ASSERT_THAT(status.message(), ::testing::HasSubstr("No data")); } @@ -660,9 +661,9 @@ void DoPutTest::CheckBatches(const FlightDescriptor& expected_descriptor, void DoPutTest::CheckDoPut(const FlightDescriptor& descr, const std::shared_ptr& schema, const RecordBatchVector& batches) { - std::unique_ptr stream; - std::unique_ptr reader; - ASSERT_OK(client_->DoPut(descr, schema, &stream, &reader)); + ASSERT_OK_AND_ASSIGN(auto do_put_result, client_->DoPut(descr, schema)); + std::unique_ptr writer = std::move(do_put_result.writer); + std::unique_ptr reader = std::move(do_put_result.reader); // Ensure that the reader can be used independently of the writer std::thread reader_thread([&reader, &batches]() { @@ -675,18 +676,18 @@ void DoPutTest::CheckDoPut(const FlightDescriptor& descr, int64_t counter = 0; for (const auto& batch : batches) { if (counter % 2 == 0) { - ASSERT_OK(stream->WriteRecordBatch(*batch)); + ASSERT_OK(writer->WriteRecordBatch(*batch)); } else { auto buffer = Buffer::FromString(std::to_string(counter)); - ASSERT_OK(stream->WriteWithMetadata(*batch, std::move(buffer))); + ASSERT_OK(writer->WriteWithMetadata(*batch, std::move(buffer))); } counter++; } // Write a metadata-only message - ASSERT_OK(stream->WriteMetadata(Buffer::FromString(kExpectedMetadata))); - ASSERT_OK(stream->DoneWriting()); + ASSERT_OK(writer->WriteMetadata(Buffer::FromString(kExpectedMetadata))); + ASSERT_OK(writer->DoneWriting()); reader_thread.join(); - ASSERT_OK(stream->Close()); + ASSERT_OK(writer->Close()); CheckBatches(descr, batches); } @@ -768,8 +769,7 @@ void DoPutTest::TestSizeLimit() { Location::ForScheme(transport(), "localhost", server_->port())); auto client_options = FlightClientOptions::Defaults(); client_options.write_size_limit_bytes = size_limit; - std::unique_ptr client; - ASSERT_OK(FlightClient::Connect(location, client_options, &client)); + ASSERT_OK_AND_ASSIGN(auto client, FlightClient::Connect(location, client_options)); auto descr = FlightDescriptor::Command("simple"); // Batch is too large to fit in one message @@ -778,12 +778,12 @@ void DoPutTest::TestSizeLimit() { auto batch1 = batch->Slice(0, 384); auto batch2 = batch->Slice(384); - std::unique_ptr stream; - std::unique_ptr reader; - ASSERT_OK(client->DoPut(descr, schema, &stream, &reader)); + ASSERT_OK_AND_ASSIGN(auto do_put_result, client->DoPut(descr, schema)); + std::unique_ptr writer = std::move(do_put_result.writer); + std::unique_ptr reader = std::move(do_put_result.reader); // Large batch will exceed the limit - const auto status = stream->WriteRecordBatch(*batch); + const auto status = writer->WriteRecordBatch(*batch); EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, ::testing::HasSubstr("exceeded soft limit"), status); auto detail = FlightWriteSizeStatusDetail::UnwrapStatus(status); @@ -792,14 +792,14 @@ void DoPutTest::TestSizeLimit() { ASSERT_GT(detail->actual(), size_limit); // But we can retry with smaller batches - ASSERT_OK(stream->WriteRecordBatch(*batch1)); - ASSERT_OK(stream->WriteWithMetadata(*batch2, Buffer::FromString("1"))); + ASSERT_OK(writer->WriteRecordBatch(*batch1)); + ASSERT_OK(writer->WriteWithMetadata(*batch2, Buffer::FromString("1"))); // Write a metadata-only message - ASSERT_OK(stream->WriteMetadata(Buffer::FromString(kExpectedMetadata))); + ASSERT_OK(writer->WriteMetadata(Buffer::FromString(kExpectedMetadata))); - ASSERT_OK(stream->DoneWriting()); - ASSERT_OK(stream->Close()); + ASSERT_OK(writer->DoneWriting()); + ASSERT_OK(writer->Close()); CheckBatches(descr, {batch1, batch2}); } void DoPutTest::TestUndrained() { @@ -808,17 +808,17 @@ void DoPutTest::TestUndrained() { auto descr = FlightDescriptor::Command("TestUndrained"); auto schema = arrow::schema({arrow::field("ints", int64())}); - std::unique_ptr stream; - std::unique_ptr reader; - ASSERT_OK(client_->DoPut(descr, schema, &stream, &reader)); + ASSERT_OK_AND_ASSIGN(auto do_put_result, client_->DoPut(descr, schema)); + std::unique_ptr writer = std::move(do_put_result.writer); + std::unique_ptr reader = std::move(do_put_result.reader); auto batch = RecordBatchFromJSON(schema, "[[1], [2], [3], [4]]"); // These calls may or may not fail depending on how quickly the // transport reacts, whether it batches, writes, etc. - ARROW_UNUSED(stream->WriteRecordBatch(*batch)); - ARROW_UNUSED(stream->WriteRecordBatch(*batch)); - ARROW_UNUSED(stream->WriteRecordBatch(*batch)); - ARROW_UNUSED(stream->WriteRecordBatch(*batch)); - ASSERT_OK(stream->Close()); + ARROW_UNUSED(writer->WriteRecordBatch(*batch)); + ARROW_UNUSED(writer->WriteRecordBatch(*batch)); + ARROW_UNUSED(writer->WriteRecordBatch(*batch)); + ARROW_UNUSED(writer->WriteRecordBatch(*batch)); + ASSERT_OK(writer->Close()); // We should be able to make another call CheckDoPut(FlightDescriptor::Command("foo"), schema, {batch, batch}); @@ -878,8 +878,8 @@ void AppMetadataTest::TearDown() { } void AppMetadataTest::TestDoGet() { Ticket ticket{""}; - std::unique_ptr stream; - ASSERT_OK(client_->DoGet(ticket, &stream)); + ASSERT_OK_AND_ASSIGN(std::unique_ptr stream, + client_->DoGet(ticket)); RecordBatchVector expected_batches; ASSERT_OK(ExampleIntBatches(&expected_batches)); @@ -901,8 +901,8 @@ void AppMetadataTest::TestDoGet() { // from the record batch, and not one of the dictionary batches. void AppMetadataTest::TestDoGetDictionaries() { Ticket ticket{"dicts"}; - std::unique_ptr stream; - ASSERT_OK(client_->DoGet(ticket, &stream)); + ASSERT_OK_AND_ASSIGN(std::unique_ptr stream, + client_->DoGet(ticket)); RecordBatchVector expected_batches; ASSERT_OK(ExampleDictBatches(&expected_batches)); @@ -919,10 +919,9 @@ void AppMetadataTest::TestDoGetDictionaries() { ASSERT_EQ(nullptr, chunk.data); } void AppMetadataTest::TestDoPut() { - std::unique_ptr writer; - std::unique_ptr reader; std::shared_ptr schema = ExampleIntSchema(); - ASSERT_OK(client_->DoPut(FlightDescriptor{}, schema, &writer, &reader)); + ASSERT_OK_AND_ASSIGN(auto do_put_result, client_->DoPut(FlightDescriptor{}, schema)); + std::unique_ptr writer = std::move(do_put_result.writer); RecordBatchVector expected_batches; ASSERT_OK(ExampleIntBatches(&expected_batches)); @@ -943,8 +942,6 @@ void AppMetadataTest::TestDoPut() { // Test DoPut() with dictionaries. This tests a corner case in the // server-side reader; see DoGetDictionaries above. void AppMetadataTest::TestDoPutDictionaries() { - std::unique_ptr writer; - std::unique_ptr reader; RecordBatchVector expected_batches; ASSERT_OK(ExampleDictBatches(&expected_batches)); // ARROW-8749: don't get the schema via ExampleDictSchema because @@ -953,8 +950,10 @@ void AppMetadataTest::TestDoPutDictionaries() { // (identity-wise) than the schema of the first batch we write, // we'll end up generating a duplicate set of dictionaries that // confuses the reader. - ASSERT_OK(client_->DoPut(FlightDescriptor{}, expected_batches[0]->schema(), &writer, - &reader)); + ASSERT_OK_AND_ASSIGN(auto do_put_result, + client_->DoPut(FlightDescriptor{}, expected_batches[0]->schema())); + std::unique_ptr writer = std::move(do_put_result.writer); + std::shared_ptr chunk; std::shared_ptr metadata; auto num_batches = static_cast(expected_batches.size()); @@ -965,10 +964,10 @@ void AppMetadataTest::TestDoPutDictionaries() { ASSERT_OK(writer->Close()); } void AppMetadataTest::TestDoPutReadMetadata() { - std::unique_ptr writer; - std::unique_ptr reader; std::shared_ptr schema = ExampleIntSchema(); - ASSERT_OK(client_->DoPut(FlightDescriptor{}, schema, &writer, &reader)); + ASSERT_OK_AND_ASSIGN(auto do_put_result, client_->DoPut(FlightDescriptor{}, schema)); + std::unique_ptr writer = std::move(do_put_result.writer); + std::unique_ptr reader = std::move(do_put_result.reader); RecordBatchVector expected_batches; ASSERT_OK(ExampleIntBatches(&expected_batches)); @@ -1062,7 +1061,7 @@ void IpcOptionsTest::TestDoGetReadOptions() { auto options = FlightCallOptions(); options.read_options.max_recursion_depth = 1; std::unique_ptr stream; - ASSERT_OK(client_->DoGet(options, ticket, &stream)); + ASSERT_OK_AND_ASSIGN(stream, client_->DoGet(options, ticket)); ASSERT_RAISES(Invalid, stream->Next()); } void IpcOptionsTest::TestDoPutWriteOptions() { @@ -1074,10 +1073,10 @@ void IpcOptionsTest::TestDoPutWriteOptions() { auto options = FlightCallOptions(); options.write_options.max_recursion_depth = 1; - ASSERT_OK(client_->DoPut(options, FlightDescriptor{}, expected_batches[0]->schema(), - &writer, &reader)); + ASSERT_OK_AND_ASSIGN(auto do_put_result, client_->DoPut(options, FlightDescriptor{}, + expected_batches[0]->schema())); for (const auto& batch : expected_batches) { - ASSERT_RAISES(Invalid, writer->WriteRecordBatch(*batch)); + ASSERT_RAISES(Invalid, do_put_result.writer->WriteRecordBatch(*batch)); } } void IpcOptionsTest::TestDoExchangeClientWriteOptions() { @@ -1086,9 +1085,8 @@ void IpcOptionsTest::TestDoExchangeClientWriteOptions() { auto options = FlightCallOptions(); options.write_options.max_recursion_depth = 1; auto descr = FlightDescriptor::Command(""); - std::unique_ptr reader; - std::unique_ptr writer; - ASSERT_OK(client_->DoExchange(options, descr, &writer, &reader)); + ASSERT_OK_AND_ASSIGN(auto do_exchange_result, client_->DoExchange(options, descr)); + std::unique_ptr writer = std::move(do_exchange_result.writer); RecordBatchVector batches; ASSERT_OK(ExampleNestedBatches(&batches)); ASSERT_OK(writer->Begin(batches[0]->schema())); @@ -1103,9 +1101,8 @@ void IpcOptionsTest::TestDoExchangeClientWriteOptionsBegin() { // fail the call. Here the options are set explicitly when we write data and not in the // call options. auto descr = FlightDescriptor::Command(""); - std::unique_ptr reader; - std::unique_ptr writer; - ASSERT_OK(client_->DoExchange(descr, &writer, &reader)); + ASSERT_OK_AND_ASSIGN(auto do_exchange_result, client_->DoExchange(descr)); + std::unique_ptr writer = std::move(do_exchange_result.writer); RecordBatchVector batches; ASSERT_OK(ExampleNestedBatches(&batches)); auto options = ipc::IpcWriteOptions::Defaults(); @@ -1121,9 +1118,8 @@ void IpcOptionsTest::TestDoExchangeServerWriteOptions() { // Call DoExchange and write nested data, but with a very low nesting depth set to fail // the call. (The low nesting depth is set on the server side.) auto descr = FlightDescriptor::Command(""); - std::unique_ptr reader; - std::unique_ptr writer; - ASSERT_OK(client_->DoExchange(descr, &writer, &reader)); + ASSERT_OK_AND_ASSIGN(auto do_exchange_result, client_->DoExchange(descr)); + std::unique_ptr writer = std::move(do_exchange_result.writer); RecordBatchVector batches; ASSERT_OK(ExampleNestedBatches(&batches)); ASSERT_OK(writer->Begin(batches[0]->schema())); @@ -1269,8 +1265,7 @@ void CudaDataTest::TestDoGet() { checked_cast(server_.get())->batches(); Ticket ticket{""}; - std::unique_ptr stream; - ASSERT_OK(client_->DoGet(options, ticket, &stream)); + ASSERT_OK_AND_ASSIGN(auto stream, client_->DoGet(options, ticket)); size_t idx = 0; while (true) { @@ -1294,10 +1289,9 @@ void CudaDataTest::TestDoPut() { RecordBatchVector batches; ASSERT_OK(ExampleIntBatches(&batches)); - std::unique_ptr writer; - std::unique_ptr reader; auto descriptor = FlightDescriptor::Path({""}); - ASSERT_OK(client_->DoPut(descriptor, batches[0]->schema(), &writer, &reader)); + ASSERT_OK_AND_ASSIGN(auto do_put_result, + client_->DoPut(descriptor, batches[0]->schema())); ipc::DictionaryMemo memo; for (const auto& batch : batches) { @@ -1307,9 +1301,9 @@ void CudaDataTest::TestDoPut() { cuda::ReadRecordBatch(batch->schema(), &memo, buffer)); ASSERT_OK(CheckBuffersOnDevice(*cuda_batch, *impl_->device)); - ASSERT_OK(writer->WriteRecordBatch(*cuda_batch)); + ASSERT_OK(do_put_result.writer->WriteRecordBatch(*cuda_batch)); } - ASSERT_OK(writer->Close()); + ASSERT_OK(do_put_result.writer->Close()); ASSERT_OK(impl_->context->Synchronize()); const RecordBatchVector& written = @@ -1332,11 +1326,9 @@ void CudaDataTest::TestDoExchange() { RecordBatchVector batches; ASSERT_OK(ExampleIntBatches(&batches)); - std::unique_ptr writer; - std::unique_ptr reader; auto descriptor = FlightDescriptor::Path({""}); - ASSERT_OK(client_->DoExchange(options, descriptor, &writer, &reader)); - ASSERT_OK(writer->Begin(batches[0]->schema())); + ASSERT_OK_AND_ASSIGN(auto exchange, client_->DoExchange(options, descriptor)); + ASSERT_OK(exchange.writer->Begin(batches[0]->schema())); ipc::DictionaryMemo write_memo; ipc::DictionaryMemo read_memo; @@ -1347,16 +1339,16 @@ void CudaDataTest::TestDoExchange() { cuda::ReadRecordBatch(batch->schema(), &write_memo, buffer)); ASSERT_OK(CheckBuffersOnDevice(*cuda_batch, *impl_->device)); - ASSERT_OK(writer->WriteRecordBatch(*cuda_batch)); + ASSERT_OK(exchange.writer->WriteRecordBatch(*cuda_batch)); - ASSERT_OK_AND_ASSIGN(auto chunk, reader->Next()); + ASSERT_OK_AND_ASSIGN(auto chunk, exchange.reader->Next()); ASSERT_OK(CheckBuffersOnDevice(*chunk.data, *impl_->device)); // Bounce record batch back to host memory ASSERT_OK_AND_ASSIGN(auto host_batch, CopyBatchToHost(*chunk.data)); AssertBatchesEqual(*batch, *host_batch); } - ASSERT_OK(writer->Close()); + ASSERT_OK(exchange.writer->Close()); } #else diff --git a/cpp/src/arrow/flight/test_util.h b/cpp/src/arrow/flight/test_util.h index e44ca001d2e44..5320b958d5818 100644 --- a/cpp/src/arrow/flight/test_util.h +++ b/cpp/src/arrow/flight/test_util.h @@ -117,7 +117,7 @@ Status MakeServer(const Location& location, std::unique_ptr* s ARROW_ASSIGN_OR_RAISE(auto real_location, Location::Parse(uri)); FlightClientOptions client_options = FlightClientOptions::Defaults(); RETURN_NOT_OK(make_client_options(&client_options)); - return FlightClient::Connect(real_location, client_options, client); + return FlightClient::Connect(real_location, client_options).Value(client); } // Helper to initialize a server and matching client with callbacks to diff --git a/python/pyarrow/_flight.pyx b/python/pyarrow/_flight.pyx index 59e2e30f279f1..fa6ef29c07200 100644 --- a/python/pyarrow/_flight.pyx +++ b/python/pyarrow/_flight.pyx @@ -1145,8 +1145,8 @@ cdef class FlightClient(_Weakrefable): pair[c_string, CIntStringVariant](tobytes(key), variant)) with nogil: - check_flight_status(CFlightClient.Connect(c_location, c_options, - &self.client)) + check_flight_status(CFlightClient.Connect(c_location, c_options + ).Value(&self.client)) def wait_for_available(self, timeout=5): """Block until the server can be contacted. @@ -1259,7 +1259,7 @@ cdef class FlightClient(_Weakrefable): ( stop_handler.stop_token).stop_token with nogil: check_flight_status( - self.client.get().ListActions(deref(c_options), &results)) + self.client.get().ListActions(deref(c_options)).Value(&results)) result = [] for action_type in results: @@ -1300,7 +1300,7 @@ cdef class FlightClient(_Weakrefable): with nogil: check_flight_status( self.client.get().DoAction( - deref(c_options), c_action, &results)) + deref(c_options), c_action).Value(&results)) def _do_action_response(): cdef: @@ -1332,7 +1332,7 @@ cdef class FlightClient(_Weakrefable): with nogil: check_flight_status( self.client.get().ListFlights(deref(c_options), - c_criteria, &listing)) + c_criteria).Value(&listing)) while True: result = FlightInfo.__new__(FlightInfo) @@ -1353,7 +1353,7 @@ cdef class FlightClient(_Weakrefable): with nogil: check_flight_status(self.client.get().GetFlightInfo( - deref(c_options), c_descriptor, &result.info)) + deref(c_options), c_descriptor).Value(&result.info)) return result @@ -1387,7 +1387,7 @@ cdef class FlightClient(_Weakrefable): with nogil: check_flight_status( self.client.get().DoGet( - deref(c_options), ticket.ticket, &reader)) + deref(c_options), ticket.ticket).Value(&reader)) result = FlightStreamReader() result.reader.reset(reader.release()) return result @@ -1403,23 +1403,21 @@ cdef class FlightClient(_Weakrefable): """ cdef: shared_ptr[CSchema] c_schema = pyarrow_unwrap_schema(schema) - unique_ptr[CFlightStreamWriter] writer - unique_ptr[CFlightMetadataReader] metadata_reader + CDoPutResult c_do_put_result CFlightCallOptions* c_options = FlightCallOptions.unwrap(options) CFlightDescriptor c_descriptor = \ FlightDescriptor.unwrap(descriptor) - FlightMetadataReader reader = FlightMetadataReader() with nogil: check_flight_status(self.client.get().DoPut( deref(c_options), c_descriptor, - c_schema, - &writer, - &reader.reader)) - result = FlightStreamWriter() - result.writer.reset(writer.release()) - return result, reader + c_schema).Value(&c_do_put_result)) + py_writer = FlightStreamWriter() + py_writer.writer.reset(c_do_put_result.writer.release()) + py_reader = FlightMetadataReader() + py_reader.reader.reset(c_do_put_result.reader.release()) + return py_writer, py_reader def do_exchange(self, descriptor: FlightDescriptor, options: FlightCallOptions = None): @@ -1438,8 +1436,7 @@ cdef class FlightClient(_Weakrefable): reader : FlightStreamReader """ cdef: - unique_ptr[CFlightStreamWriter] c_writer - unique_ptr[CFlightStreamReader] c_reader + CDoExchangeResult c_do_exchange_result CFlightCallOptions* c_options = FlightCallOptions.unwrap(options) CFlightDescriptor c_descriptor = \ FlightDescriptor.unwrap(descriptor) @@ -1447,13 +1444,11 @@ cdef class FlightClient(_Weakrefable): with nogil: check_flight_status(self.client.get().DoExchange( deref(c_options), - c_descriptor, - &c_writer, - &c_reader)) + c_descriptor).Value(&c_do_exchange_result)) py_writer = FlightStreamWriter() - py_writer.writer.reset(c_writer.release()) + py_writer.writer.reset(c_do_exchange_result.writer.release()) py_reader = FlightStreamReader() - py_reader.reader.reset(c_reader.release()) + py_reader.reader.reset(c_do_exchange_result.reader.release()) return py_writer, py_reader def close(self): diff --git a/python/pyarrow/includes/libarrow_flight.pxd b/python/pyarrow/includes/libarrow_flight.pxd index 9d4663e52950b..5c70b061127c3 100644 --- a/python/pyarrow/includes/libarrow_flight.pxd +++ b/python/pyarrow/includes/libarrow_flight.pxd @@ -313,11 +313,18 @@ cdef extern from "arrow/flight/api.h" namespace "arrow" nogil: @staticmethod CFlightClientOptions Defaults() + cdef cppclass CDoPutResult" arrow::flight::FlightClient::DoPutResult": + unique_ptr[CFlightStreamWriter] writer + unique_ptr[CFlightMetadataReader] reader + + cdef cppclass CDoExchangeResult" arrow::flight::FlightClient::DoExchangeResult": + unique_ptr[CFlightStreamWriter] writer + unique_ptr[CFlightStreamReader] reader + cdef cppclass CFlightClient" arrow::flight::FlightClient": @staticmethod - CStatus Connect(const CLocation& location, - const CFlightClientOptions& options, - unique_ptr[CFlightClient]* client) + CResult[unique_ptr[CFlightClient]] Connect(const CLocation& location, + const CFlightClientOptions& options) CStatus Authenticate(CFlightCallOptions& options, unique_ptr[CClientAuthHandler] auth_handler) @@ -327,29 +334,20 @@ cdef extern from "arrow/flight/api.h" namespace "arrow" nogil: const c_string& username, const c_string& password) - CStatus DoAction(CFlightCallOptions& options, CAction& action, - unique_ptr[CResultStream]* results) - CStatus ListActions(CFlightCallOptions& options, - vector[CActionType]* actions) + CResult[unique_ptr[CResultStream]] DoAction(CFlightCallOptions& options, CAction& action) + CResult[vector[CActionType]] ListActions(CFlightCallOptions& options) - CStatus ListFlights(CFlightCallOptions& options, CCriteria criteria, - unique_ptr[CFlightListing]* listing) - CStatus GetFlightInfo(CFlightCallOptions& options, - CFlightDescriptor& descriptor, - unique_ptr[CFlightInfo]* info) + CResult[unique_ptr[CFlightListing]] ListFlights(CFlightCallOptions& options, CCriteria criteria) + CResult[unique_ptr[CFlightInfo]] GetFlightInfo(CFlightCallOptions& options, + CFlightDescriptor& descriptor) CResult[unique_ptr[CSchemaResult]] GetSchema(CFlightCallOptions& options, CFlightDescriptor& descriptor) - CStatus DoGet(CFlightCallOptions& options, CTicket& ticket, - unique_ptr[CFlightStreamReader]* stream) - CStatus DoPut(CFlightCallOptions& options, - CFlightDescriptor& descriptor, - shared_ptr[CSchema]& schema, - unique_ptr[CFlightStreamWriter]* stream, - unique_ptr[CFlightMetadataReader]* reader) - CStatus DoExchange(CFlightCallOptions& options, - CFlightDescriptor& descriptor, - unique_ptr[CFlightStreamWriter]* writer, - unique_ptr[CFlightStreamReader]* reader) + CResult[unique_ptr[CFlightStreamReader]] DoGet(CFlightCallOptions& options, CTicket& ticket) + CResult[CDoPutResult] DoPut(CFlightCallOptions& options, + CFlightDescriptor& descriptor, + shared_ptr[CSchema]& schema) + CResult[CDoExchangeResult] DoExchange(CFlightCallOptions& options, + CFlightDescriptor& descriptor) CStatus Close() cdef cppclass CFlightStatusCode" arrow::flight::FlightStatusCode":