Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-15974: [C++] Migrate flight/types.h header definitions to use Result<> #12669

Closed
wants to merge 32 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
220814e
convert flight.h APIs to use Result
zagto Mar 18, 2022
64e6562
fix PyFlightResultStream::Next
zagto Mar 18, 2022
1604877
simplify MetadataRecordBatchReader::Next
zagto Mar 18, 2022
16373c2
automated format fixes
zagto Mar 21, 2022
2e632e4
actually fix PyFlightResultStream::Next to behave as before
zagto Mar 21, 2022
8a99ba6
remove leftover doxygen parameter description
zagto Mar 21, 2022
72e9caf
make deprecation message more accurate
zagto Mar 21, 2022
8b46743
update C++ uses of result.h defintions
zagto Mar 22, 2022
bf52fc3
avoid using Result<> variables
zagto Mar 23, 2022
f53bbd4
more elegant handling of new Result<> returns
zagto Mar 23, 2022
cf6d0d2
update remaining uses of Parse
zagto Mar 24, 2022
a677cfe
update remaining uses of ForGrpcTcp
zagto Mar 24, 2022
5b70162
update remaining uses of ForGrpcTls
zagto Mar 24, 2022
c9bdcf7
update remaining uses of ForGrpcUnix
zagto Mar 24, 2022
62f349b
add versions of LocationUnknownScheme and RoundTripTypes tests to cover
zagto Mar 24, 2022
6bd5dc3
update examples
zagto Mar 24, 2022
d4c7b0d
update flight GetSchema methods and uses
zagto Mar 24, 2022
60bbb1e
update flight Next() functions and uses
zagto Mar 24, 2022
0199aef
fix python GetSchema uses
zagto Mar 24, 2022
5332a79
C++ formating
zagto Mar 24, 2022
18af3af
Python formatting
zagto Mar 24, 2022
70c4dd3
formatting of examples
zagto Mar 24, 2022
828de36
update one more GLib binding
zagto Mar 25, 2022
0c90c25
add missing method implementation
zagto Mar 28, 2022
106c356
apply PR feedback
zagto Mar 28, 2022
fb846a6
restore PyFlightServer::GetSchema
zagto Mar 28, 2022
8302c39
restore FlightTestServer::GetSchema
zagto Mar 28, 2022
96b82bd
fix some deprecated method uses somehow not detected before
zagto Mar 28, 2022
bb9d15b
update more ReadAll uses the compiler cannot find
zagto Mar 28, 2022
c5e61d8
apply PR feedback
zagto Mar 28, 2022
ddd532e
better testing of Location construction
zagto Mar 28, 2022
4717dbe
formatting
zagto Mar 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 37 additions & 25 deletions cpp/src/arrow/flight/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,21 @@ std::shared_ptr<FlightWriteSizeStatusDetail> FlightWriteSizeStatusDetail::Unwrap

FlightClientOptions FlightClientOptions::Defaults() { return FlightClientOptions(); }

arrow::Result<std::shared_ptr<Table>> FlightStreamReader::ToTable(
const StopToken& stop_token) {
ARROW_ASSIGN_OR_RAISE(auto batches, ToRecordBatches(stop_token));
ARROW_ASSIGN_OR_RAISE(auto schema, GetSchema());
return Table::FromRecordBatches(schema, std::move(batches));
}

Status FlightStreamReader::ReadAll(std::vector<std::shared_ptr<RecordBatch>>* batches,
const StopToken& stop_token) {
return ToRecordBatches(stop_token).Value(batches);
}

Status FlightStreamReader::ReadAll(std::shared_ptr<Table>* table,
const StopToken& stop_token) {
std::vector<std::shared_ptr<RecordBatch>> batches;
RETURN_NOT_OK(ReadAll(&batches, stop_token));
ARROW_ASSIGN_OR_RAISE(auto schema, GetSchema());
return Table::FromRecordBatches(schema, std::move(batches)).Value(table);
return ToTable(stop_token).Value(table);
}

/// \brief An ipc::MessageReader adapting the Flight ClientDataStream interface.
Expand Down Expand Up @@ -169,57 +178,60 @@ class ClientStreamReader : public FlightStreamReader {
RETURN_NOT_OK(EnsureDataStarted());
return batch_reader_->schema();
}
Status Next(FlightStreamChunk* out) override {
arrow::Result<FlightStreamChunk> Next() override {
FlightStreamChunk out;
internal::FlightData* data;
peekable_reader_->Peek(&data);
if (!data) {
out->app_metadata = nullptr;
out->data = nullptr;
return stream_->Finish(Status::OK());
out.app_metadata = nullptr;
out.data = nullptr;
RETURN_NOT_OK(stream_->Finish(Status::OK()));
return out;
}

if (!data->metadata) {
// Metadata-only (data->metadata is the IPC header)
out->app_metadata = data->app_metadata;
out->data = nullptr;
out.app_metadata = data->app_metadata;
out.data = nullptr;
peekable_reader_->Next(&data);
return Status::OK();
return out;
}

if (!batch_reader_) {
RETURN_NOT_OK(EnsureDataStarted());
// Re-peek here since EnsureDataStarted() advances the stream
return Next(out);
return Next();
}
auto status = batch_reader_->ReadNext(&out->data);
auto status = batch_reader_->ReadNext(&out.data);
if (ARROW_PREDICT_FALSE(!status.ok())) {
return stream_->Finish(std::move(status));
}
out->app_metadata = std::move(app_metadata_);
return Status::OK();
out.app_metadata = std::move(app_metadata_);
return out;
}
Status ReadAll(std::vector<std::shared_ptr<RecordBatch>>* batches) override {
return ReadAll(batches, stop_token_);
arrow::Result<std::vector<std::shared_ptr<RecordBatch>>> ToRecordBatches() override {
return ToRecordBatches(stop_token_);
}
Status ReadAll(std::vector<std::shared_ptr<RecordBatch>>* batches,
const StopToken& stop_token) override {
arrow::Result<std::vector<std::shared_ptr<RecordBatch>>> ToRecordBatches(
const StopToken& stop_token) override {
std::vector<std::shared_ptr<RecordBatch>> batches;
FlightStreamChunk chunk;

while (true) {
if (stop_token.IsStopRequested()) {
Cancel();
return stop_token.Poll();
}
RETURN_NOT_OK(Next(&chunk));
ARROW_ASSIGN_OR_RAISE(chunk, Next());
if (!chunk.data) break;
batches->emplace_back(std::move(chunk.data));
batches.emplace_back(std::move(chunk.data));
}
return Status::OK();
return batches;
}
Status ReadAll(std::shared_ptr<Table>* table) override {
return ReadAll(table, stop_token_);
arrow::Result<std::shared_ptr<Table>> ToTable() override {
return ToTable(stop_token_);
}
using FlightStreamReader::ReadAll;
using FlightStreamReader::ToTable;
void Cancel() override { stream_->TryCancel(); }

private:
Expand Down
17 changes: 14 additions & 3 deletions cpp/src/arrow/flight/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,22 @@ class ARROW_FLIGHT_EXPORT FlightStreamReader : public MetadataRecordBatchReader
public:
/// \brief Try to cancel the call.
virtual void Cancel() = 0;
using MetadataRecordBatchReader::ReadAll;

using MetadataRecordBatchReader::ToRecordBatches;
/// \brief Consume entire stream as a vector of record batches
virtual Status ReadAll(std::vector<std::shared_ptr<RecordBatch>>* batches,
const StopToken& stop_token) = 0;
virtual arrow::Result<std::vector<std::shared_ptr<RecordBatch>>> ToRecordBatches(
const StopToken& stop_token) = 0;

using MetadataRecordBatchReader::ReadAll;
ARROW_DEPRECATED("Deprecated in 8.0.0. Use ToRecordBatches instead.")
Status ReadAll(std::vector<std::shared_ptr<RecordBatch>>* batches,
const StopToken& stop_token);

using MetadataRecordBatchReader::ToTable;
/// \brief Consume entire stream as a Table
arrow::Result<std::shared_ptr<Table>> ToTable(const StopToken& stop_token);

ARROW_DEPRECATED("Deprecated in 8.0.0. Use ToTable instead.")
Status ReadAll(std::shared_ptr<Table>* table, const StopToken& stop_token);
};

Expand Down
13 changes: 6 additions & 7 deletions cpp/src/arrow/flight/flight_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1445,19 +1445,18 @@ TEST_F(TestBasicHeaderAuthMiddleware, ValidCredentials) { RunValidClientAuth();
TEST_F(TestBasicHeaderAuthMiddleware, InvalidCredentials) { RunInvalidClientAuth(); }

class ForeverFlightListing : public FlightListing {
Status Next(std::unique_ptr<FlightInfo>* info) override {
arrow::Result<std::unique_ptr<FlightInfo>> Next() override {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
*info = arrow::internal::make_unique<FlightInfo>(ExampleFlightInfo()[0]);
return Status::OK();
return arrow::internal::make_unique<FlightInfo>(ExampleFlightInfo()[0]);
}
};

class ForeverResultStream : public ResultStream {
Status Next(std::unique_ptr<Result>* result) override {
arrow::Result<std::unique_ptr<Result>> Next() override {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
*result = arrow::internal::make_unique<Result>();
(*result)->body = Buffer::FromString("foo");
return Status::OK();
auto result = arrow::internal::make_unique<Result>();
result->body = Buffer::FromString("foo");
return result;
}
};

Expand Down
23 changes: 12 additions & 11 deletions cpp/src/arrow/flight/transport_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -109,31 +109,32 @@ class TransportMessageReader final : public FlightMessageReader {
return batch_reader_->schema();
}

Status Next(FlightStreamChunk* out) override {
arrow::Result<FlightStreamChunk> Next() override {
FlightStreamChunk out;
internal::FlightData* data;
peekable_reader_->Peek(&data);
if (!data) {
out->app_metadata = nullptr;
out->data = nullptr;
return Status::OK();
out.app_metadata = nullptr;
out.data = nullptr;
return out;
}

if (!data->metadata) {
// Metadata-only (data->metadata is the IPC header)
out->app_metadata = data->app_metadata;
out->data = nullptr;
out.app_metadata = data->app_metadata;
out.data = nullptr;
peekable_reader_->Next(&data);
return Status::OK();
return out;
}

if (!batch_reader_) {
RETURN_NOT_OK(EnsureDataStarted());
// re-peek here since EnsureDataStarted() advances the stream
return Next(out);
return Next();
}
RETURN_NOT_OK(batch_reader_->ReadNext(&out->data));
out->app_metadata = std::move(app_metadata_);
return Status::OK();
RETURN_NOT_OK(batch_reader_->ReadNext(&out.data));
out.app_metadata = std::move(app_metadata_);
return out;
}

private:
Expand Down
113 changes: 75 additions & 38 deletions cpp/src/arrow/flight/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,15 @@ Status FlightPayload::Validate() const {
return Status::OK();
}

arrow::Result<std::shared_ptr<Schema>> SchemaResult::GetSchema(
ipc::DictionaryMemo* dictionary_memo) const {
io::BufferReader schema_reader(raw_schema_);
return ipc::ReadSchema(&schema_reader, dictionary_memo);
}

Status SchemaResult::GetSchema(ipc::DictionaryMemo* dictionary_memo,
std::shared_ptr<Schema>* out) const {
io::BufferReader schema_reader(raw_schema_);
return ipc::ReadSchema(&schema_reader, dictionary_memo).Value(out);
return GetSchema(dictionary_memo).Value(out);
}

arrow::Result<std::string> FlightDescriptor::SerializeToString() const {
Expand Down Expand Up @@ -233,17 +238,20 @@ arrow::Result<FlightInfo> FlightInfo::Make(const Schema& schema,
return FlightInfo(data);
}

Status FlightInfo::GetSchema(ipc::DictionaryMemo* dictionary_memo,
std::shared_ptr<Schema>* out) const {
arrow::Result<std::shared_ptr<Schema>> FlightInfo::GetSchema(
ipc::DictionaryMemo* dictionary_memo) const {
if (reconstructed_schema_) {
*out = schema_;
return Status::OK();
return schema_;
}
io::BufferReader schema_reader(data_.schema);
RETURN_NOT_OK(ipc::ReadSchema(&schema_reader, dictionary_memo).Value(&schema_));
reconstructed_schema_ = true;
*out = schema_;
return Status::OK();
return schema_;
}

Status FlightInfo::GetSchema(ipc::DictionaryMemo* dictionary_memo,
std::shared_ptr<Schema>* out) const {
return GetSchema(dictionary_memo).Value(out);
}

arrow::Result<std::string> FlightInfo::SerializeToString() const {
Expand Down Expand Up @@ -284,35 +292,55 @@ Status FlightInfo::Deserialize(const std::string& serialized,

Location::Location() { uri_ = std::make_shared<arrow::internal::Uri>(); }

Status FlightListing::Next(std::unique_ptr<FlightInfo>* info) {
return Next().Value(info);
}

arrow::Result<Location> Location::Parse(const std::string& uri_string) {
Location location;
RETURN_NOT_OK(location.uri_->Parse(uri_string));
return location;
}

Status Location::Parse(const std::string& uri_string, Location* location) {
return location->uri_->Parse(uri_string);
return Parse(uri_string).Value(location);
}

Status Location::ForGrpcTcp(const std::string& host, const int port, Location* location) {
arrow::Result<Location> Location::ForGrpcTcp(const std::string& host, const int port) {
std::stringstream uri_string;
uri_string << "grpc+tcp://" << host << ':' << port;
return Location::Parse(uri_string.str(), location);
return Location::Parse(uri_string.str());
}

Status Location::ForGrpcTls(const std::string& host, const int port, Location* location) {
Status Location::ForGrpcTcp(const std::string& host, const int port, Location* location) {
return ForGrpcTcp(host, port).Value(location);
}

arrow::Result<Location> Location::ForGrpcTls(const std::string& host, const int port) {
std::stringstream uri_string;
uri_string << "grpc+tls://" << host << ':' << port;
return Location::Parse(uri_string.str(), location);
return Location::Parse(uri_string.str());
}

Status Location::ForGrpcUnix(const std::string& path, Location* location) {
Status Location::ForGrpcTls(const std::string& host, const int port, Location* location) {
return ForGrpcTls(host, port).Value(location);
}

arrow::Result<Location> Location::ForGrpcUnix(const std::string& path) {
std::stringstream uri_string;
uri_string << "grpc+unix://" << path;
return Location::Parse(uri_string.str(), location);
return Location::Parse(uri_string.str());
}

Status Location::ForGrpcUnix(const std::string& path, Location* location) {
return ForGrpcUnix(path).Value(location);
}

arrow::Result<Location> Location::ForScheme(const std::string& scheme,
const std::string& host, const int port) {
Location location;
std::stringstream uri_string;
uri_string << scheme << "://" << host << ':' << port;
RETURN_NOT_OK(Location::Parse(uri_string.str(), &location));
return location;
return Location::Parse(uri_string.str());
}

std::string Location::ToString() const { return uri_->ToString(); }
Expand All @@ -337,23 +365,36 @@ bool ActionType::Equals(const ActionType& other) const {
return type == other.type && description == other.description;
}

Status MetadataRecordBatchReader::ReadAll(
std::vector<std::shared_ptr<RecordBatch>>* batches) {
FlightStreamChunk chunk;
Status ResultStream::Next(std::unique_ptr<Result>* info) { return Next().Value(info); }

Status MetadataRecordBatchReader::Next(FlightStreamChunk* next) {
return Next().Value(next);
}

arrow::Result<std::vector<std::shared_ptr<RecordBatch>>>
MetadataRecordBatchReader::ToRecordBatches() {
std::vector<std::shared_ptr<RecordBatch>> batches;
while (true) {
RETURN_NOT_OK(Next(&chunk));
ARROW_ASSIGN_OR_RAISE(FlightStreamChunk chunk, Next());
if (!chunk.data) break;
batches->emplace_back(std::move(chunk.data));
batches.emplace_back(std::move(chunk.data));
}
return Status::OK();
return batches;
}

Status MetadataRecordBatchReader::ReadAll(std::shared_ptr<Table>* table) {
std::vector<std::shared_ptr<RecordBatch>> batches;
RETURN_NOT_OK(ReadAll(&batches));
Status MetadataRecordBatchReader::ReadAll(
std::vector<std::shared_ptr<RecordBatch>>* batches) {
return ToRecordBatches().Value(batches);
}

arrow::Result<std::shared_ptr<Table>> MetadataRecordBatchReader::ToTable() {
ARROW_ASSIGN_OR_RAISE(auto batches, ToRecordBatches());
ARROW_ASSIGN_OR_RAISE(auto schema, GetSchema());
return Table::FromRecordBatches(schema, std::move(batches)).Value(table);
return Table::FromRecordBatches(schema, std::move(batches));
}

Status MetadataRecordBatchReader::ReadAll(std::shared_ptr<Table>* table) {
return ToTable().Value(table);
}

Status MetadataRecordBatchWriter::Begin(const std::shared_ptr<Schema>& schema) {
Expand Down Expand Up @@ -402,25 +443,21 @@ SimpleFlightListing::SimpleFlightListing(const std::vector<FlightInfo>& flights)
SimpleFlightListing::SimpleFlightListing(std::vector<FlightInfo>&& flights)
: position_(0), flights_(std::move(flights)) {}

Status SimpleFlightListing::Next(std::unique_ptr<FlightInfo>* info) {
arrow::Result<std::unique_ptr<FlightInfo>> SimpleFlightListing::Next() {
if (position_ >= static_cast<int>(flights_.size())) {
*info = nullptr;
return Status::OK();
return nullptr;
}
*info = std::unique_ptr<FlightInfo>(new FlightInfo(std::move(flights_[position_++])));
return Status::OK();
return std::unique_ptr<FlightInfo>(new FlightInfo(std::move(flights_[position_++])));
}

SimpleResultStream::SimpleResultStream(std::vector<Result>&& results)
: results_(std::move(results)), position_(0) {}

Status SimpleResultStream::Next(std::unique_ptr<Result>* result) {
arrow::Result<std::unique_ptr<Result>> SimpleResultStream::Next() {
if (position_ >= results_.size()) {
*result = nullptr;
return Status::OK();
return nullptr;
}
*result = std::unique_ptr<Result>(new Result(std::move(results_[position_++])));
return Status::OK();
return std::unique_ptr<Result>(new Result(std::move(results_[position_++])));
}

arrow::Result<BasicAuth> BasicAuth::Deserialize(arrow::util::string_view serialized) {
Expand Down
Loading