diff --git a/cpp/src/arrow/flight/sql/client.cc b/cpp/src/arrow/flight/sql/client.cc index a914a0587cc5c..0e9768b9887ab 100644 --- a/cpp/src/arrow/flight/sql/client.cc +++ b/cpp/src/arrow/flight/sql/client.cc @@ -213,22 +213,19 @@ arrow::Result FlightSqlClient::ExecuteUpdate(const FlightCallOptions& o ARROW_ASSIGN_OR_RAISE(FlightDescriptor descriptor, GetFlightDescriptorForCommand(command)); - std::unique_ptr writer; - std::unique_ptr reader; - - ARROW_RETURN_NOT_OK(DoPut(options, descriptor, arrow::schema({}), &writer, &reader)); + ARROW_ASSIGN_OR_RAISE(auto result, DoPut(options, descriptor, arrow::schema({}))) std::shared_ptr metadata; - ARROW_RETURN_NOT_OK(reader->ReadMetadata(&metadata)); - ARROW_RETURN_NOT_OK(writer->Close()); + ARROW_RETURN_NOT_OK(result.reader->ReadMetadata(&metadata)); + ARROW_RETURN_NOT_OK(result.writer->Close()); if (!metadata) return Status::IOError("Server did not send a response"); - flight_sql_pb::DoPutUpdateResult result; - if (!result.ParseFromArray(metadata->data(), static_cast(metadata->size()))) { + flight_sql_pb::DoPutUpdateResult update_result; + if (!update_result.ParseFromArray(metadata->data(), static_cast(metadata->size()))) { return Status::Invalid("Unable to parse DoPutUpdateResult"); } - return result.record_count(); + return update_result.record_count(); } arrow::Result FlightSqlClient::ExecuteSubstraitUpdate( @@ -243,21 +240,18 @@ arrow::Result FlightSqlClient::ExecuteSubstraitUpdate( ARROW_ASSIGN_OR_RAISE(FlightDescriptor descriptor, GetFlightDescriptorForCommand(command)); - std::unique_ptr writer; - std::unique_ptr reader; - - ARROW_RETURN_NOT_OK(DoPut(options, descriptor, arrow::schema({}), &writer, &reader)); + ARROW_ASSIGN_OR_RAISE(auto result, DoPut(options, descriptor, arrow::schema({}))); std::shared_ptr metadata; - ARROW_RETURN_NOT_OK(reader->ReadMetadata(&metadata)); - ARROW_RETURN_NOT_OK(writer->Close()); + ARROW_RETURN_NOT_OK(result.reader->ReadMetadata(&metadata)); + ARROW_RETURN_NOT_OK(result.writer->Close()); - flight_sql_pb::DoPutUpdateResult result; - if (!result.ParseFromArray(metadata->data(), static_cast(metadata->size()))) { + flight_sql_pb::DoPutUpdateResult update_result; + if (!update_result.ParseFromArray(metadata->data(), static_cast(metadata->size()))) { return Status::Invalid("Unable to parse DoPutUpdateResult"); } - return result.record_count(); + return update_result.record_count(); } arrow::Result> FlightSqlClient::GetCatalogs( @@ -478,10 +472,7 @@ arrow::Result> FlightSqlClient::GetSqlInfoSchema( arrow::Result> FlightSqlClient::DoGet( const FlightCallOptions& options, const Ticket& ticket) { - std::unique_ptr stream; - ARROW_RETURN_NOT_OK(DoGet(options, ticket, &stream)); - - return std::move(stream); + return impl_->DoGet(options, ticket); } arrow::Result> FlightSqlClient::Prepare( @@ -493,9 +484,8 @@ arrow::Result> FlightSqlClient::Prepare( request.set_transaction_id(transaction.transaction_id()); } - std::unique_ptr results; ARROW_ASSIGN_OR_RAISE(auto action, PackAction("CreatePreparedStatement", request)); - ARROW_RETURN_NOT_OK(DoAction(options, action, &results)); + ARROW_ASSIGN_OR_RAISE(auto results, DoAction(options, action)) return PreparedStatement::ParseResponse(this, std::move(results)); } @@ -509,9 +499,8 @@ arrow::Result> FlightSqlClient::PrepareSubstr request.set_transaction_id(transaction.transaction_id()); } - std::unique_ptr results; ARROW_ASSIGN_OR_RAISE(auto action, PackAction("CreatePreparedSubstraitPlan", request)); - ARROW_RETURN_NOT_OK(DoAction(options, action, &results)); + ARROW_ASSIGN_OR_RAISE(auto results, DoAction(options, action)) return PreparedStatement::ParseResponse(this, std::move(results)); } @@ -657,9 +646,8 @@ Status PreparedStatement::Close(const FlightCallOptions& options) { flight_sql_pb::ActionClosePreparedStatementRequest request; request.set_prepared_statement_handle(handle_); - std::unique_ptr results; ARROW_ASSIGN_OR_RAISE(auto action, PackAction("ClosePreparedStatement", request)); - ARROW_RETURN_NOT_OK(client_->DoAction(options, action, &results)); + ARROW_ASSIGN_OR_RAISE(auto results, client_->DoAction(options, action)); ARROW_RETURN_NOT_OK(results->Drain()); is_closed_ = true; @@ -670,9 +658,8 @@ ::arrow::Result FlightSqlClient::BeginTransaction( const FlightCallOptions& options) { flight_sql_pb::ActionBeginTransactionRequest request; - std::unique_ptr results; ARROW_ASSIGN_OR_RAISE(auto action, PackAction("BeginTransaction", request)); - ARROW_RETURN_NOT_OK(DoAction(options, action, &results)); + ARROW_ASSIGN_OR_RAISE(auto results, DoAction(options, action)); flight_sql_pb::ActionBeginTransactionResult transaction; ARROW_RETURN_NOT_OK(ReadResult(results.get(), &transaction)); @@ -695,9 +682,8 @@ ::arrow::Result FlightSqlClient::BeginSavepoint( request.set_transaction_id(transaction.transaction_id()); request.set_name(name); - std::unique_ptr results; ARROW_ASSIGN_OR_RAISE(auto action, PackAction("BeginSavepoint", request)); - ARROW_RETURN_NOT_OK(DoAction(options, action, &results)); + ARROW_ASSIGN_OR_RAISE(auto results, DoAction(options, action)); flight_sql_pb::ActionBeginSavepointResult savepoint; ARROW_RETURN_NOT_OK(ReadResult(results.get(), &savepoint)); @@ -719,9 +705,8 @@ Status FlightSqlClient::Commit(const FlightCallOptions& options, request.set_transaction_id(transaction.transaction_id()); request.set_action(flight_sql_pb::ActionEndTransactionRequest::END_TRANSACTION_COMMIT); - std::unique_ptr results; ARROW_ASSIGN_OR_RAISE(auto action, PackAction("EndTransaction", request)); - ARROW_RETURN_NOT_OK(DoAction(options, action, &results)); + ARROW_ASSIGN_OR_RAISE(auto results, DoAction(options, action)); ARROW_RETURN_NOT_OK(results->Drain()); return Status::OK(); @@ -737,9 +722,8 @@ Status FlightSqlClient::Release(const FlightCallOptions& options, request.set_savepoint_id(savepoint.savepoint_id()); request.set_action(flight_sql_pb::ActionEndSavepointRequest::END_SAVEPOINT_RELEASE); - std::unique_ptr results; ARROW_ASSIGN_OR_RAISE(auto action, PackAction("EndSavepoint", request)); - ARROW_RETURN_NOT_OK(DoAction(options, action, &results)); + ARROW_ASSIGN_OR_RAISE(auto results, DoAction(options, action)); ARROW_RETURN_NOT_OK(results->Drain()); return Status::OK(); @@ -756,9 +740,8 @@ Status FlightSqlClient::Rollback(const FlightCallOptions& options, request.set_action( flight_sql_pb::ActionEndTransactionRequest::END_TRANSACTION_ROLLBACK); - std::unique_ptr results; ARROW_ASSIGN_OR_RAISE(auto action, PackAction("EndTransaction", request)); - ARROW_RETURN_NOT_OK(DoAction(options, action, &results)); + ARROW_ASSIGN_OR_RAISE(auto results, DoAction(options, action)); ARROW_RETURN_NOT_OK(results->Drain()); return Status::OK(); @@ -774,9 +757,8 @@ Status FlightSqlClient::Rollback(const FlightCallOptions& options, request.set_savepoint_id(savepoint.savepoint_id()); request.set_action(flight_sql_pb::ActionEndSavepointRequest::END_SAVEPOINT_ROLLBACK); - std::unique_ptr results; ARROW_ASSIGN_OR_RAISE(auto action, PackAction("EndSavepoint", request)); - ARROW_RETURN_NOT_OK(DoAction(options, action, &results)); + ARROW_ASSIGN_OR_RAISE(auto results, DoAction(options, action)); ARROW_RETURN_NOT_OK(results->Drain()); return Status::OK(); @@ -788,9 +770,8 @@ ::arrow::Result FlightSqlClient::CancelQuery( ARROW_ASSIGN_OR_RAISE(auto serialized_info, info.SerializeToString()); request.set_info(std::move(serialized_info)); - std::unique_ptr results; ARROW_ASSIGN_OR_RAISE(auto action, PackAction("CancelQuery", request)); - ARROW_RETURN_NOT_OK(DoAction(options, action, &results)); + ARROW_ASSIGN_OR_RAISE(auto results, DoAction(options, action)) flight_sql_pb::ActionCancelQueryResult result; ARROW_RETURN_NOT_OK(ReadResult(results.get(), &result)); diff --git a/cpp/src/arrow/flight/sql/client.h b/cpp/src/arrow/flight/sql/client.h index 812b5ffb58333..7409f40d561f5 100644 --- a/cpp/src/arrow/flight/sql/client.h +++ b/cpp/src/arrow/flight/sql/client.h @@ -131,7 +131,7 @@ class ARROW_FLIGHT_SQL_EXPORT FlightSqlClient { /// \param[in] options Per-RPC options /// \param[in] ticket The flight ticket to use /// \return The returned RecordBatchReader - arrow::Result> DoGet( + virtual arrow::Result> DoGet( const FlightCallOptions& options, const Ticket& ticket); /// \brief Request a list of tables. @@ -364,25 +364,14 @@ class ARROW_FLIGHT_SQL_EXPORT FlightSqlClient { Status Close(); protected: - virtual Status DoPut(const FlightCallOptions& options, + virtual ::arrow::Result DoPut(const FlightCallOptions& options, const FlightDescriptor& descriptor, - const std::shared_ptr& schema, - std::unique_ptr* writer, - std::unique_ptr* reader) { - ARROW_ASSIGN_OR_RAISE(auto result, impl_->DoPut(options, descriptor, schema)); - *writer = std::move(result.writer); - *reader = std::move(result.reader); - return Status::OK(); + const std::shared_ptr& schema) { + return impl_->DoPut(options, descriptor, schema); } - virtual Status DoGet(const FlightCallOptions& options, const Ticket& ticket, - std::unique_ptr* stream) { - return impl_->DoGet(options, ticket).Value(stream); - } - - virtual Status DoAction(const FlightCallOptions& options, const Action& action, - std::unique_ptr* results) { - return impl_->DoAction(options, action).Value(results); + virtual ::arrow::Result> DoAction(const FlightCallOptions& options, const Action& action) { + return impl_->DoAction(options, action); } };