Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-15974: [C++] Migrate flight/types.h header definitions to use Result<> #12669

Closed
wants to merge 32 commits into from
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
220814e
convert flight.h APIs to use Result
zagto Mar 18, 2022
64e6562
fix PyFlightResultStream::Next
zagto Mar 18, 2022
1604877
simplify MetadataRecordBatchReader::Next
zagto Mar 18, 2022
16373c2
automated format fixes
zagto Mar 21, 2022
2e632e4
actually fix PyFlightResultStream::Next to behave as before
zagto Mar 21, 2022
8a99ba6
remove leftover doxygen parameter description
zagto Mar 21, 2022
72e9caf
make deprecation message more accurate
zagto Mar 21, 2022
8b46743
update C++ uses of result.h defintions
zagto Mar 22, 2022
bf52fc3
avoid using Result<> variables
zagto Mar 23, 2022
f53bbd4
more elegant handling of new Result<> returns
zagto Mar 23, 2022
cf6d0d2
update remaining uses of Parse
zagto Mar 24, 2022
a677cfe
update remaining uses of ForGrpcTcp
zagto Mar 24, 2022
5b70162
update remaining uses of ForGrpcTls
zagto Mar 24, 2022
c9bdcf7
update remaining uses of ForGrpcUnix
zagto Mar 24, 2022
62f349b
add versions of LocationUnknownScheme and RoundTripTypes tests to cover
zagto Mar 24, 2022
6bd5dc3
update examples
zagto Mar 24, 2022
d4c7b0d
update flight GetSchema methods and uses
zagto Mar 24, 2022
60bbb1e
update flight Next() functions and uses
zagto Mar 24, 2022
0199aef
fix python GetSchema uses
zagto Mar 24, 2022
5332a79
C++ formating
zagto Mar 24, 2022
18af3af
Python formatting
zagto Mar 24, 2022
70c4dd3
formatting of examples
zagto Mar 24, 2022
828de36
update one more GLib binding
zagto Mar 25, 2022
0c90c25
add missing method implementation
zagto Mar 28, 2022
106c356
apply PR feedback
zagto Mar 28, 2022
fb846a6
restore PyFlightServer::GetSchema
zagto Mar 28, 2022
8302c39
restore FlightTestServer::GetSchema
zagto Mar 28, 2022
96b82bd
fix some deprecated method uses somehow not detected before
zagto Mar 28, 2022
bb9d15b
update more ReadAll uses the compiler cannot find
zagto Mar 28, 2022
c5e61d8
apply PR feedback
zagto Mar 28, 2022
ddd532e
better testing of Location construction
zagto Mar 28, 2022
4717dbe
formatting
zagto Mar 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 37 additions & 25 deletions cpp/src/arrow/flight/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,21 @@ std::shared_ptr<FlightWriteSizeStatusDetail> FlightWriteSizeStatusDetail::Unwrap

FlightClientOptions FlightClientOptions::Defaults() { return FlightClientOptions(); }

arrow::Result<std::shared_ptr<Table>> FlightStreamReader::ToTable(
const StopToken& stop_token) {
ARROW_ASSIGN_OR_RAISE(auto batches, ToRecordBatches(stop_token));
ARROW_ASSIGN_OR_RAISE(auto schema, GetSchema());
return Table::FromRecordBatches(schema, std::move(batches));
}

Status FlightStreamReader::ReadAll(std::vector<std::shared_ptr<RecordBatch>>* batches,
const StopToken& stop_token) {
return ToRecordBatches(stop_token).Value(batches);
}

Status FlightStreamReader::ReadAll(std::shared_ptr<Table>* table,
const StopToken& stop_token) {
std::vector<std::shared_ptr<RecordBatch>> batches;
RETURN_NOT_OK(ReadAll(&batches, stop_token));
ARROW_ASSIGN_OR_RAISE(auto schema, GetSchema());
return Table::FromRecordBatches(schema, std::move(batches)).Value(table);
return ToTable(stop_token).Value(table);
}

/// \brief An ipc::MessageReader adapting the Flight ClientDataStream interface.
Expand Down Expand Up @@ -169,57 +178,60 @@ class ClientStreamReader : public FlightStreamReader {
RETURN_NOT_OK(EnsureDataStarted());
return batch_reader_->schema();
}
Status Next(FlightStreamChunk* out) override {
arrow::Result<FlightStreamChunk> Next() override {
FlightStreamChunk out;
internal::FlightData* data;
peekable_reader_->Peek(&data);
if (!data) {
out->app_metadata = nullptr;
out->data = nullptr;
return stream_->Finish(Status::OK());
out.app_metadata = nullptr;
out.data = nullptr;
RETURN_NOT_OK(stream_->Finish(Status::OK()));
return out;
}

if (!data->metadata) {
// Metadata-only (data->metadata is the IPC header)
out->app_metadata = data->app_metadata;
out->data = nullptr;
out.app_metadata = data->app_metadata;
out.data = nullptr;
peekable_reader_->Next(&data);
return Status::OK();
return out;
}

if (!batch_reader_) {
RETURN_NOT_OK(EnsureDataStarted());
// Re-peek here since EnsureDataStarted() advances the stream
return Next(out);
return Next();
}
auto status = batch_reader_->ReadNext(&out->data);
auto status = batch_reader_->ReadNext(&out.data);
if (ARROW_PREDICT_FALSE(!status.ok())) {
return stream_->Finish(std::move(status));
}
out->app_metadata = std::move(app_metadata_);
return Status::OK();
out.app_metadata = std::move(app_metadata_);
return out;
}
Status ReadAll(std::vector<std::shared_ptr<RecordBatch>>* batches) override {
return ReadAll(batches, stop_token_);
arrow::Result<std::vector<std::shared_ptr<RecordBatch>>> ToRecordBatches() override {
return ToRecordBatches(stop_token_);
}
Status ReadAll(std::vector<std::shared_ptr<RecordBatch>>* batches,
const StopToken& stop_token) override {
arrow::Result<std::vector<std::shared_ptr<RecordBatch>>> ToRecordBatches(
const StopToken& stop_token) override {
std::vector<std::shared_ptr<RecordBatch>> batches;
FlightStreamChunk chunk;

while (true) {
if (stop_token.IsStopRequested()) {
Cancel();
return stop_token.Poll();
}
RETURN_NOT_OK(Next(&chunk));
ARROW_ASSIGN_OR_RAISE(chunk, Next());
if (!chunk.data) break;
batches->emplace_back(std::move(chunk.data));
batches.emplace_back(std::move(chunk.data));
}
return Status::OK();
return batches;
}
Status ReadAll(std::shared_ptr<Table>* table) override {
return ReadAll(table, stop_token_);
arrow::Result<std::shared_ptr<Table>> ToTable() override {
return ToTable(stop_token_);
}
using FlightStreamReader::ReadAll;
using FlightStreamReader::ToTable;
void Cancel() override { stream_->TryCancel(); }

private:
Expand Down
17 changes: 14 additions & 3 deletions cpp/src/arrow/flight/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,22 @@ class ARROW_FLIGHT_EXPORT FlightStreamReader : public MetadataRecordBatchReader
public:
/// \brief Try to cancel the call.
virtual void Cancel() = 0;
using MetadataRecordBatchReader::ReadAll;

using MetadataRecordBatchReader::ToRecordBatches;
/// \brief Consume entire stream as a vector of record batches
virtual Status ReadAll(std::vector<std::shared_ptr<RecordBatch>>* batches,
const StopToken& stop_token) = 0;
virtual arrow::Result<std::vector<std::shared_ptr<RecordBatch>>> ToRecordBatches(
const StopToken& stop_token) = 0;

using MetadataRecordBatchReader::ReadAll;
ARROW_DEPRECATED("Deprecated in 8.0.0. Use ToRecordBatches instead.")
Status ReadAll(std::vector<std::shared_ptr<RecordBatch>>* batches,
const StopToken& stop_token);

using MetadataRecordBatchReader::ToTable;
/// \brief Consume entire stream as a Table
arrow::Result<std::shared_ptr<Table>> ToTable(const StopToken& stop_token);

ARROW_DEPRECATED("Deprecated in 8.0.0. Use ToTable instead.")
Status ReadAll(std::shared_ptr<Table>* table, const StopToken& stop_token);
};

Expand Down
12 changes: 5 additions & 7 deletions cpp/src/arrow/flight/flight_internals_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,8 @@ TEST(FlightTypes, FlightDescriptorToFromProto) {
// ARROW-6017: we should be able to construct locations for unknown
// schemes
TEST(FlightTypes, LocationUnknownScheme) {
Location location;
ASSERT_OK(Location::Parse("s3://test", &location));
ASSERT_OK(Location::Parse("https://example.com/foo", &location));
ASSERT_OK(Location::Parse("s3://test"));
ASSERT_OK(Location::Parse("https://example.com/foo"));
}

TEST(FlightTypes, RoundTripTypes) {
Expand All @@ -105,10 +104,9 @@ TEST(FlightTypes, RoundTripTypes) {
std::shared_ptr<Schema> schema =
arrow::schema({field("a", int64()), field("b", int64()), field("c", int64()),
field("d", int64())});
Location location1, location2, location3;
ASSERT_OK(Location::ForGrpcTcp("localhost", 10010, &location1));
ASSERT_OK(Location::ForGrpcTls("localhost", 10010, &location2));
ASSERT_OK(Location::ForGrpcUnix("/tmp/test.sock", &location3));
ASSERT_OK_AND_ASSIGN(auto location1, Location::ForGrpcTcp("localhost", 10010));
ASSERT_OK_AND_ASSIGN(auto location2, Location::ForGrpcTls("localhost", 10010));
ASSERT_OK_AND_ASSIGN(auto location3, Location::ForGrpcUnix("/tmp/test.sock"));
std::vector<FlightEndpoint> endpoints{FlightEndpoint{ticket, {location1, location2}},
FlightEndpoint{ticket, {location3}}};
ASSERT_OK(MakeFlightInfo(*schema, desc, endpoints, -1, -1, &data));
Expand Down
13 changes: 6 additions & 7 deletions cpp/src/arrow/flight/flight_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1445,19 +1445,18 @@ TEST_F(TestBasicHeaderAuthMiddleware, ValidCredentials) { RunValidClientAuth();
TEST_F(TestBasicHeaderAuthMiddleware, InvalidCredentials) { RunInvalidClientAuth(); }

class ForeverFlightListing : public FlightListing {
Status Next(std::unique_ptr<FlightInfo>* info) override {
arrow::Result<std::unique_ptr<FlightInfo>> Next() override {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
*info = arrow::internal::make_unique<FlightInfo>(ExampleFlightInfo()[0]);
return Status::OK();
return arrow::internal::make_unique<FlightInfo>(ExampleFlightInfo()[0]);
}
};

class ForeverResultStream : public ResultStream {
Status Next(std::unique_ptr<Result>* result) override {
arrow::Result<std::unique_ptr<Result>> Next() override {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
*result = arrow::internal::make_unique<Result>();
(*result)->body = Buffer::FromString("foo");
return Status::OK();
auto result = arrow::internal::make_unique<Result>();
result->body = Buffer::FromString("foo");
return result;
}
};

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/flight/serialization_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ Status ToProto(const Criteria& criteria, pb::Criteria* pb_criteria) {
// Location

Status FromProto(const pb::Location& pb_location, Location* location) {
return Location::Parse(pb_location.uri(), location);
return Location::Parse(pb_location.uri()).Value(location);
}

Status ToProto(const Location& location, pb::Location* pb_location) {
Expand Down
3 changes: 1 addition & 2 deletions cpp/src/arrow/flight/sql/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,7 @@ arrow::Result<std::shared_ptr<PreparedStatement>> FlightSqlClient::Prepare(

ARROW_RETURN_NOT_OK(DoAction(options, action, &results));

std::unique_ptr<Result> result;
ARROW_RETURN_NOT_OK(results->Next(&result));
ARROW_ASSIGN_OR_RAISE(std::unique_ptr<Result> result, results->Next());

google::protobuf::Any prepared_result;

Expand Down
6 changes: 2 additions & 4 deletions cpp/src/arrow/flight/sql/server_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,7 @@ class TestFlightSqlServer : public ::testing::Test {
std::string uri = ss.str();

std::unique_ptr<FlightClient> client;
Location location;
ASSERT_OK(Location::Parse(uri, &location));
ASSERT_OK_AND_ASSIGN(auto location, Location::Parse(uri));
ASSERT_OK(FlightClient::Connect(location, &client));

sql_client.reset(new FlightSqlClient(std::move(client)));
Expand All @@ -184,8 +183,7 @@ class TestFlightSqlServer : public ::testing::Test {
std::mutex server_ready_m;

void RunServer() {
arrow::flight::Location location;
ARROW_CHECK_OK(arrow::flight::Location::ForGrpcTcp("localhost", port, &location));
Location location = *Location::ForGrpcTcp("localhost", port);
zagto marked this conversation as resolved.
Show resolved Hide resolved
arrow::flight::FlightServerOptions options(location);

ARROW_CHECK_OK(example::SQLiteFlightSqlServer::Create().Value(&server));
Expand Down
3 changes: 1 addition & 2 deletions cpp/src/arrow/flight/sql/test_app_cli.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,7 @@ Status PrintResults(FlightSqlClient& client, const FlightCallOptions& call_optio

Status RunMain() {
std::unique_ptr<FlightClient> client;
Location location;
ARROW_RETURN_NOT_OK(Location::ForGrpcTcp(FLAGS_host, FLAGS_port, &location));
ARROW_ASSIGN_OR_RAISE(auto location, Location::ForGrpcTcp(FLAGS_host, FLAGS_port));
ARROW_RETURN_NOT_OK(FlightClient::Connect(location, &client));

FlightCallOptions call_options;
Expand Down
5 changes: 2 additions & 3 deletions cpp/src/arrow/flight/sql/test_server_cli.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,8 @@
DEFINE_int32(port, 31337, "Server port to listen on");

arrow::Status RunMain() {
arrow::flight::Location location;
ARROW_CHECK_OK(arrow::flight::Location::ForGrpcTcp("0.0.0.0", FLAGS_port, &location));
arrow::flight::FlightServerOptions options(location);
auto location = arrow::flight::Location::ForGrpcTcp("0.0.0.0", FLAGS_port);
zagto marked this conversation as resolved.
Show resolved Hide resolved
arrow::flight::FlightServerOptions options(*location);

std::shared_ptr<arrow::flight::sql::example::SQLiteFlightSqlServer> server;
ARROW_ASSIGN_OR_RAISE(server,
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/arrow/flight/test_definitions.cc
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,10 @@ void DataTest::CheckDoGet(
ASSERT_OK(client_->GetFlightInfo(descr, &info));
check_endpoints(info->endpoints());

std::shared_ptr<Schema> schema;
ipc::DictionaryMemo dict_memo;
ASSERT_OK(info->GetSchema(&dict_memo, &schema));
AssertSchemaEqual(*expected_schema, *schema);
arrow::Result<std::shared_ptr<Schema>> schema = info->GetSchema(&dict_memo);
zagto marked this conversation as resolved.
Show resolved Hide resolved
ASSERT_OK(schema);
AssertSchemaEqual(*expected_schema, **schema);

// By convention, fetch the first endpoint
Ticket ticket = info->endpoints()[0].ticket;
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/flight/test_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ int main(int argc, char** argv) {

arrow::flight::Location location;
if (FLAGS_unix.empty()) {
ARROW_CHECK_OK(arrow::flight::Location::ForGrpcTcp("0.0.0.0", FLAGS_port, &location));
location = *arrow::flight::Location::ForGrpcTcp("0.0.0.0", FLAGS_port);
} else {
ARROW_CHECK_OK(arrow::flight::Location::ForGrpcUnix(FLAGS_unix, &location));
location = *arrow::flight::Location::ForGrpcUnix(FLAGS_unix);
}
arrow::flight::FlightServerOptions options(location);

Expand Down
15 changes: 5 additions & 10 deletions cpp/src/arrow/flight/test_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -556,16 +556,11 @@ std::shared_ptr<Schema> ExampleLargeSchema() {
}

std::vector<FlightInfo> ExampleFlightInfo() {
Location location1;
Location location2;
Location location3;
Location location4;
Location location5;
ARROW_EXPECT_OK(Location::ForGrpcTcp("foo1.bar.com", 12345, &location1));
ARROW_EXPECT_OK(Location::ForGrpcTcp("foo2.bar.com", 12345, &location2));
ARROW_EXPECT_OK(Location::ForGrpcTcp("foo3.bar.com", 12345, &location3));
ARROW_EXPECT_OK(Location::ForGrpcTcp("foo4.bar.com", 12345, &location4));
ARROW_EXPECT_OK(Location::ForGrpcTcp("foo5.bar.com", 12345, &location5));
Location location1 = *Location::ForGrpcTcp("foo1.bar.com", 12345);
Location location2 = *Location::ForGrpcTcp("foo2.bar.com", 12345);
Location location3 = *Location::ForGrpcTcp("foo3.bar.com", 12345);
Location location4 = *Location::ForGrpcTcp("foo4.bar.com", 12345);
Location location5 = *Location::ForGrpcTcp("foo5.bar.com", 12345);

FlightInfo::Data flight1, flight2, flight3, flight4;

Expand Down
11 changes: 4 additions & 7 deletions cpp/src/arrow/flight/test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,10 @@ namespace flight {
// Helpers to compare values for equality

inline void AssertEqual(const FlightInfo& expected, const FlightInfo& actual) {
std::shared_ptr<Schema> ex_schema, actual_schema;
ipc::DictionaryMemo expected_memo;
ipc::DictionaryMemo actual_memo;
ASSERT_OK(expected.GetSchema(&expected_memo, &ex_schema));
ASSERT_OK(actual.GetSchema(&actual_memo, &actual_schema));
ASSERT_OK_AND_ASSIGN(auto ex_schema, expected.GetSchema(&expected_memo));
ASSERT_OK_AND_ASSIGN(auto actual_schema, actual.GetSchema(&actual_memo));

AssertSchemaEqual(*ex_schema, *actual_schema);
ASSERT_EQ(expected.total_records(), actual.total_records());
Expand Down Expand Up @@ -113,10 +112,9 @@ Status MakeServer(const Location& location, std::unique_ptr<FlightServerBase>* s
FlightServerOptions server_options(location);
RETURN_NOT_OK(make_server_options(&server_options));
RETURN_NOT_OK((*server)->Init(server_options));
Location real_location;
std::string uri =
location.scheme() + "://localhost:" + std::to_string((*server)->port());
RETURN_NOT_OK(Location::Parse(uri, &real_location));
ARROW_ASSIGN_OR_RAISE(auto real_location, Location::Parse(uri));
FlightClientOptions client_options = FlightClientOptions::Defaults();
RETURN_NOT_OK(make_client_options(&client_options));
return FlightClient::Connect(real_location, client_options, client);
Expand All @@ -130,8 +128,7 @@ Status MakeServer(std::unique_ptr<FlightServerBase>* server,
std::function<Status(FlightServerOptions*)> make_server_options,
std::function<Status(FlightClientOptions*)> make_client_options,
Args&&... server_args) {
Location location;
RETURN_NOT_OK(Location::ForGrpcTcp("localhost", 0, &location));
ARROW_ASSIGN_OR_RAISE(auto location, Location::ForGrpcTcp("localhost", 0));
return MakeServer<T>(location, server, client, std::move(make_server_options),
std::move(make_client_options),
std::forward<Args>(server_args)...);
Expand Down
22 changes: 12 additions & 10 deletions cpp/src/arrow/flight/transport/grpc/grpc_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -244,12 +244,12 @@ class GrpcServiceHandler final : public FlightService::Service {
// Write flight info to stream until listing is exhausted
while (true) {
ProtoType pb_value;
std::unique_ptr<UserType> value;
GRPC_RETURN_NOT_OK(iterator->Next(&value));
if (!value) {
arrow::Result<std::unique_ptr<UserType>> value = iterator->Next();
GRPC_RETURN_NOT_OK(value.status());
zagto marked this conversation as resolved.
Show resolved Hide resolved
if (!*value) {
break;
}
GRPC_RETURN_NOT_OK(internal::ToProto(*value, &pb_value));
GRPC_RETURN_NOT_OK(internal::ToProto(**value, &pb_value));

// Blocking write
if (!writer->Write(pb_value)) {
Expand Down Expand Up @@ -494,14 +494,16 @@ class GrpcServiceHandler final : public FlightService::Service {
}

while (true) {
std::unique_ptr<Result> result;
SERVICE_RETURN_NOT_OK(flight_context, results->Next(&result));
if (!result) {
auto arrow_result = results->Next();
SERVICE_RETURN_NOT_OK(flight_context, arrow_result.status());
std::unique_ptr<Result> flight_result = std::move(*arrow_result);
if (!flight_result) {
// No more results
break;
}
pb::Result pb_result;
SERVICE_RETURN_NOT_OK(flight_context, internal::ToProto(*result, &pb_result));
SERVICE_RETURN_NOT_OK(flight_context,
internal::ToProto(*flight_result, &pb_result));
if (!writer->Write(pb_result)) {
// Stream may be closed
break;
Expand Down Expand Up @@ -587,9 +589,9 @@ class GrpcServerTransport : public internal::ServerTransport {
}

if (scheme == kSchemeGrpcTls) {
RETURN_NOT_OK(Location::ForGrpcTls(uri.host(), port, &location_));
RETURN_NOT_OK(Location::ForGrpcTls(uri.host(), port).Value(&location_));
zagto marked this conversation as resolved.
Show resolved Hide resolved
} else if (scheme == kSchemeGrpc || scheme == kSchemeGrpcTcp) {
RETURN_NOT_OK(Location::ForGrpcTcp(uri.host(), port, &location_));
RETURN_NOT_OK(Location::ForGrpcTcp(uri.host(), port).Value(&location_));
}
return Status::OK();
}
Expand Down
Loading