Skip to content

Commit

Permalink
apacheGH-36155: [C++][Go][FlightRPC] Add support for long-running que…
Browse files Browse the repository at this point in the history
…ries

In Flight RPC, FlightInfo includes addresses of workers alongside
result partition info. This lets clients fetch data directly from
workers, in parallel or even distributed across multiple machines. But
this also comes with tradeoffs.

Queries generally don't complete instantly (as much as we would like
them to). So where can we put the 'query evaluation time'?

* In `GetFlightInfo`: block and wait for the query to complete.
  * Con: this is a long-running blocking call, which may fail or time
    out. Then when the client retries, the server has to redo all the
    work.
  * Con: parts of the result may be ready before others, but the
    client can't do anything until everything is ready.
* In `DoGet`: return a fixed number of partitions
  * Con: this makes handling worker failures hard. Systems like Trino
    support fault-tolerant execution by replacing workers at
    runtime. But GetFlightInfo has already passed, so we can't notify
    the client of new workers.
  * Con: we have to know or fix the partitioning up front.

Neither solution is optimal.

We can address this by adding a retryable version of
`GetFlightInfo`: `PollFlightInfo(FlightDescriptor)`

`PollFlightInfo` returns `RetryInfo`:

```proto
message RetryInfo {
  // The currently available results so far.
  FlightInfo info = 1;
  // The descriptor the client should use on the next try.
  // If unset, the query is complete.
  FlightDescriptor flight_descriptor = 2;
  // Query progress. Must be in [0.0, 1.0] but need not be
  // monotonic or nondecreasing. If unknown, do not set.
  optional double progress = 3;
  // Expiration time for this request. After this passes, the server
  // might not accept the retry descriptor anymore (and the query may
  // be cancelled). This may be updated on a call to PollFlightInfo.
  google.protobuf.Timestamp expiration_time = 4;
}
```

See the documentation changes for details of them.
  • Loading branch information
kou committed Jul 31, 2023
1 parent bedc1e7 commit c81dad7
Show file tree
Hide file tree
Showing 30 changed files with 1,405 additions and 585 deletions.
8 changes: 8 additions & 0 deletions cpp/src/arrow/flight/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,14 @@ arrow::Result<std::unique_ptr<FlightInfo>> FlightClient::GetFlightInfo(
return info;
}

arrow::Result<std::unique_ptr<RetryInfo>> FlightClient::PollFlightInfo(
const FlightCallOptions& options, const FlightDescriptor& descriptor) {
std::unique_ptr<RetryInfo> info;
RETURN_NOT_OK(CheckOpen());
RETURN_NOT_OK(transport_->PollFlightInfo(options, descriptor, &info));
return info;
}

arrow::Result<std::unique_ptr<SchemaResult>> FlightClient::GetSchema(
const FlightCallOptions& options, const FlightDescriptor& descriptor) {
RETURN_NOT_OK(CheckOpen());
Expand Down
13 changes: 13 additions & 0 deletions cpp/src/arrow/flight/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,19 @@ class ARROW_FLIGHT_EXPORT FlightClient {
return GetFlightInfo({}, descriptor);
}

/// \brief Request and poll a long running query
/// \param[in] options Per-RPC options
/// \param[in] descriptor the dataset request or a descriptor returned by a
/// prioir PollFlightInfo call
/// \return Arrow result with the RetryInfo describing the status of
/// the requested query
arrow::Result<std::unique_ptr<RetryInfo>> PollFlightInfo(
const FlightCallOptions& options, const FlightDescriptor& descriptor);
arrow::Result<std::unique_ptr<RetryInfo>> PollFlightInfo(
const FlightDescriptor& descriptor) {
return PollFlightInfo({}, descriptor);
}

/// \brief Request schema for a single flight, which may be an existing
/// dataset or a command to be executed
/// \param[in] options Per-RPC options
Expand Down
37 changes: 36 additions & 1 deletion cpp/src/arrow/flight/flight_internals_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,12 @@ void TestRoundtrip(const std::vector<FlightType>& values,

ASSERT_OK_AND_ASSIGN(std::string serialized, values[i].SerializeToString());
ASSERT_OK_AND_ASSIGN(auto deserialized, FlightType::Deserialize(serialized));
if constexpr (std::is_same_v<FlightType, FlightInfo>) {
if constexpr (std::is_same_v<FlightType, FlightInfo> ||
std::is_same_v<FlightType, RetryInfo>) {
ARROW_SCOPED_TRACE("Deserialized = ", deserialized->ToString());
EXPECT_EQ(values[i], *deserialized);
} else {
ARROW_SCOPED_TRACE("Deserialized = ", deserialized.ToString());
EXPECT_EQ(values[i], deserialized);
}

Expand Down Expand Up @@ -254,6 +257,38 @@ TEST(FlightTypes, FlightInfo) {
ASSERT_NO_FATAL_FAILURE(TestRoundtrip<pb::FlightInfo>(values, reprs));
}

TEST(FlightTypes, RetryInfo) {
ASSERT_OK_AND_ASSIGN(auto location, Location::ForGrpcTcp("localhost", 1234));
Schema schema({field("ints", int64())});
auto desc = FlightDescriptor::Command("foo");
auto endpoint = FlightEndpoint{Ticket{"foo"}, {}, std::nullopt};
auto info = MakeFlightInfo(schema, desc, {endpoint}, -1, 42, true);
// 2023-06-19 03:14:06.004330100
// We must use microsecond resolution here for portability.
// std::chrono::system_clock::time_point may not provide nanosecond
// resolution on some platforms such as Windows.
const auto expiration_time_duration =
std::chrono::seconds{1687144446} + std::chrono::nanoseconds{4339000};
Timestamp expiration_time(
std::chrono::duration_cast<Timestamp::duration>(expiration_time_duration));
std::vector<RetryInfo> values = {
RetryInfo{std::make_unique<FlightInfo>(info), std::nullopt, std::nullopt,
std::nullopt},
RetryInfo{std::make_unique<FlightInfo>(info), FlightDescriptor::Command("retry"),
0.1, expiration_time},
};
std::vector<std::string> reprs = {
"<RetryInfo info=" + info.ToString() +
" descriptor=null "
"progress=null expiration_time=null>",
"<RetryInfo info=" + info.ToString() +
" descriptor=<FlightDescriptor cmd='retry'> "
"progress=0.1 expiration_time=2023-06-19 03:14:06.004339000>",
};

ASSERT_NO_FATAL_FAILURE(TestRoundtrip<pb::RetryInfo>(values, reprs));
}

TEST(FlightTypes, Result) {
std::vector<Result> values = {
{Buffer::FromString("")},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ TEST(FlightIntegration, ExpirationTimeRenewFlightEndpoint) {
ASSERT_OK(RunScenario("expiration_time:renew_flight_endpoint"));
}

TEST(FlightIntegration, PollFlightInfo) { ASSERT_OK(RunScenario("poll_flight_info")); }

TEST(FlightIntegration, FlightSql) { ASSERT_OK(RunScenario("flight_sql")); }

TEST(FlightIntegration, FlightSqlExtension) {
Expand Down
79 changes: 76 additions & 3 deletions cpp/src/arrow/flight/integration_tests/test_integration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -708,9 +708,7 @@ class ExpirationTimeCancelFlightInfoScenario : public Scenario {

/// \brief The expiration time scenario - RenewFlightEndpoint.
///
/// This tests that the client can renew a FlightEndpoint and read
/// data in renewed expiration time even when the original
/// expiration time is over.
/// This tests that the client can renew a FlightEndpoint.
class ExpirationTimeRenewFlightEndpointScenario : public Scenario {
Status MakeServer(std::unique_ptr<FlightServerBase>* server,
FlightServerOptions* options) override {
Expand Down Expand Up @@ -746,6 +744,78 @@ class ExpirationTimeRenewFlightEndpointScenario : public Scenario {
}
};

/// \brief The server used for testing PollFlightInfo().
class PollFlightInfoServer : public FlightServerBase {
public:
PollFlightInfoServer() : FlightServerBase() {}

Status PollFlightInfo(const ServerCallContext& context,
const FlightDescriptor& descriptor,
std::unique_ptr<RetryInfo>* result) override {
auto schema = arrow::schema({arrow::field("number", arrow::uint32(), false)});
std::vector<FlightEndpoint> endpoints = {
FlightEndpoint{{"long-running query"}, {}, std::nullopt}};
ARROW_ASSIGN_OR_RAISE(
auto info, FlightInfo::Make(*schema, descriptor, endpoints, -1, -1, false));
if (descriptor == FlightDescriptor::Command("retry")) {
*result = std::make_unique<RetryInfo>(std::make_unique<FlightInfo>(std::move(info)),
std::nullopt, 1.0, std::nullopt);
} else {
*result =
std::make_unique<RetryInfo>(std::make_unique<FlightInfo>(std::move(info)),
FlightDescriptor::Command("retry"), 0.1,
Timestamp::clock::now() + std::chrono::seconds{10});
}
return Status::OK();
}
};

/// \brief The PollFlightInfo scenario.
///
/// This tests that the client can poll a long-running query.
class PollFlightInfoScenario : public Scenario {
Status MakeServer(std::unique_ptr<FlightServerBase>* server,
FlightServerOptions* options) override {
*server = std::make_unique<PollFlightInfoServer>();
return Status::OK();
}

Status MakeClient(FlightClientOptions* options) override { return Status::OK(); }

Status RunClient(std::unique_ptr<FlightClient> client) override {
ARROW_ASSIGN_OR_RAISE(
auto info,
client->PollFlightInfo(FlightDescriptor::Command("heavy query")));
if (!info->descriptor.has_value()) {
return Status::Invalid("Description is missing: ", info->ToString());
}
if (!info->progress.has_value()) {
return Status::Invalid("Progress is missing: ", info->ToString());
}
if (!(0.0 <= *info->progress && *info->progress <= 1.0)) {
return Status::Invalid("Invalid progress: ", info->ToString());
}
if (!info->expiration_time.has_value()) {
return Status::Invalid("Expiration time is missing: ", info->ToString());
}
ARROW_ASSIGN_OR_RAISE(info, client->PollFlightInfo(*info->descriptor));
if (info->descriptor.has_value()) {
return Status::Invalid("Retried but not finished yet: ", info->ToString());
}
if (!info->progress.has_value()) {
return Status::Invalid("Progress is missing in finished query: ", info->ToString());
}
if (fabs(*info->progress - 1.0) > arrow::kDefaultAbsoluteTolerance) {
return Status::Invalid("Progress for finished query isn't 1.0: ", info->ToString());
}
if (info->expiration_time.has_value()) {
return Status::Invalid("Expiration time must not be set for finished query: ",
info->ToString());
}
return Status::OK();
}
};

/// \brief Schema to be returned for mocking the statement/prepared statement results.
///
/// Must be the same across all languages.
Expand Down Expand Up @@ -1825,6 +1895,9 @@ Status GetScenario(const std::string& scenario_name, std::shared_ptr<Scenario>*
} else if (scenario_name == "expiration_time:renew_flight_endpoint") {
*out = std::make_shared<ExpirationTimeRenewFlightEndpointScenario>();
return Status::OK();
} else if (scenario_name == "poll_flight_info") {
*out = std::make_shared<PollFlightInfoScenario>();
return Status::OK();
} else if (scenario_name == "flight_sql") {
*out = std::make_shared<FlightSqlScenario>();
return Status::OK();
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/flight/middleware.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ enum class FlightMethod : char {
DoAction = 7,
ListActions = 8,
DoExchange = 9,
PollFlightInfo = 10,
};

/// \brief Get a human-readable name for a Flight method.
Expand Down
81 changes: 67 additions & 14 deletions cpp/src/arrow/flight/serialization_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,26 @@ namespace arrow {
namespace flight {
namespace internal {

// Timestamp

Status FromProto(const google::protobuf::Timestamp& pb_timestamp, Timestamp* timestamp) {
const auto seconds = std::chrono::seconds{pb_timestamp.seconds()};
const auto nanoseconds = std::chrono::nanoseconds{pb_timestamp.nanos()};
const auto duration =
std::chrono::duration_cast<Timestamp::duration>(seconds + nanoseconds);
*timestamp = Timestamp(duration);
return Status::OK();
}

Status ToProto(const Timestamp& timestamp, google::protobuf::Timestamp* pb_timestamp) {
const auto since_epoch = timestamp.time_since_epoch();
const auto since_epoch_ns =
std::chrono::duration_cast<std::chrono::nanoseconds>(since_epoch).count();
pb_timestamp->set_seconds(since_epoch_ns / std::nano::den);
pb_timestamp->set_nanos(since_epoch_ns % std::nano::den);
return Status::OK();
}

// ActionType

Status FromProto(const pb::ActionType& pb_type, ActionType* type) {
Expand Down Expand Up @@ -153,13 +173,9 @@ Status FromProto(const pb::FlightEndpoint& pb_endpoint, FlightEndpoint* endpoint
RETURN_NOT_OK(FromProto(pb_endpoint.location(i), &endpoint->locations[i]));
}
if (pb_endpoint.has_expiration_time()) {
const auto& pb_expiration_time = pb_endpoint.expiration_time();
const auto seconds = std::chrono::seconds{pb_expiration_time.seconds()};
const auto nanoseconds = std::chrono::nanoseconds{pb_expiration_time.nanos()};
const auto duration =
std::chrono::duration_cast<Timestamp::duration>(seconds + nanoseconds);
const Timestamp expiration_time(duration);
endpoint->expiration_time = expiration_time;
Timestamp expiration_time;
RETURN_NOT_OK(FromProto(pb_endpoint.expiration_time(), &expiration_time));
endpoint->expiration_time = std::move(expiration_time);
}
return Status::OK();
}
Expand All @@ -171,13 +187,8 @@ Status ToProto(const FlightEndpoint& endpoint, pb::FlightEndpoint* pb_endpoint)
RETURN_NOT_OK(ToProto(location, pb_endpoint->add_location()));
}
if (endpoint.expiration_time) {
const auto expiration_time = endpoint.expiration_time.value();
const auto since_epoch = expiration_time.time_since_epoch();
const auto since_epoch_ns =
std::chrono::duration_cast<std::chrono::nanoseconds>(since_epoch).count();
auto pb_expiration_time = pb_endpoint->mutable_expiration_time();
pb_expiration_time->set_seconds(since_epoch_ns / std::nano::den);
pb_expiration_time->set_nanos(since_epoch_ns % std::nano::den);
RETURN_NOT_OK(ToProto(endpoint.expiration_time.value(),
pb_endpoint->mutable_expiration_time()));
}
return Status::OK();
}
Expand Down Expand Up @@ -287,6 +298,48 @@ Status ToProto(const FlightInfo& info, pb::FlightInfo* pb_info) {
return Status::OK();
}

// RetryInfo

Status FromProto(const pb::RetryInfo& pb_info, RetryInfo* info) {
FlightInfo::Data data;
RETURN_NOT_OK(FromProto(pb_info.info(), &data));
info->info = std::make_unique<FlightInfo>(std::move(data));
if (pb_info.has_flight_descriptor()) {
FlightDescriptor descriptor;
RETURN_NOT_OK(FromProto(pb_info.flight_descriptor(), &descriptor));
info->descriptor = std::move(descriptor);
} else {
info->descriptor = std::nullopt;
}
if (pb_info.has_progress()) {
info->progress = pb_info.progress();
} else {
info->progress = std::nullopt;
}
if (pb_info.has_expiration_time()) {
Timestamp expiration_time;
RETURN_NOT_OK(FromProto(pb_info.expiration_time(), &expiration_time));
info->expiration_time = std::move(expiration_time);
} else {
info->expiration_time = std::nullopt;
}
return Status::OK();
}

Status ToProto(const RetryInfo& info, pb::RetryInfo* pb_info) {
RETURN_NOT_OK(ToProto(*info.info, pb_info->mutable_info()));
if (info.descriptor) {
RETURN_NOT_OK(ToProto(*info.descriptor, pb_info->mutable_flight_descriptor()));
}
if (info.progress) {
pb_info->set_progress(info.progress.value());
}
if (info.expiration_time) {
RETURN_NOT_OK(ToProto(*info.expiration_time, pb_info->mutable_expiration_time()));
}
return Status::OK();
}

// CancelFlightInfoRequest

Status FromProto(const pb::CancelFlightInfoRequest& pb_request,
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/arrow/flight/serialization_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ Status SchemaToString(const Schema& schema, std::string* out);

// These functions depend on protobuf types which are not exported in the Flight DLL.

Status FromProto(const google::protobuf::Timestamp& pb_timestamp, Timestamp* timestamp);
Status FromProto(const pb::ActionType& pb_type, ActionType* type);
Status FromProto(const pb::Action& pb_action, Action* action);
Status FromProto(const pb::Result& pb_result, Result* result);
Expand All @@ -60,16 +61,19 @@ Status FromProto(const pb::FlightEndpoint& pb_endpoint, FlightEndpoint* endpoint
Status FromProto(const pb::RenewFlightEndpointRequest& pb_request,
RenewFlightEndpointRequest* request);
Status FromProto(const pb::FlightInfo& pb_info, FlightInfo::Data* info);
Status FromProto(const pb::RetryInfo& pb_info, RetryInfo* info);
Status FromProto(const pb::CancelFlightInfoRequest& pb_request,
CancelFlightInfoRequest* request);
Status FromProto(const pb::SchemaResult& pb_result, std::string* result);
Status FromProto(const pb::BasicAuth& pb_basic_auth, BasicAuth* info);

Status ToProto(const Timestamp& timestamp, google::protobuf::Timestamp* pb_timestamp);
Status ToProto(const FlightDescriptor& descr, pb::FlightDescriptor* pb_descr);
Status ToProto(const FlightEndpoint& endpoint, pb::FlightEndpoint* pb_endpoint);
Status ToProto(const RenewFlightEndpointRequest& request,
pb::RenewFlightEndpointRequest* pb_request);
Status ToProto(const FlightInfo& info, pb::FlightInfo* pb_info);
Status ToProto(const RetryInfo& info, pb::RetryInfo* pb_info);
Status ToProto(const CancelFlightInfoRequest& request,
pb::CancelFlightInfoRequest* pb_request);
Status ToProto(const ActionType& type, pb::ActionType* pb_type);
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/arrow/flight/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,12 @@ Status FlightServerBase::GetFlightInfo(const ServerCallContext& context,
return Status::NotImplemented("NYI");
}

Status FlightServerBase::PollFlightInfo(const ServerCallContext& context,
const FlightDescriptor& request,
std::unique_ptr<RetryInfo>* info) {
return Status::NotImplemented("NYI");
}

Status FlightServerBase::DoGet(const ServerCallContext& context, const Ticket& request,
std::unique_ptr<FlightDataStream>* data_stream) {
return Status::NotImplemented("NYI");
Expand Down
14 changes: 12 additions & 2 deletions cpp/src/arrow/flight/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -243,16 +243,26 @@ class ARROW_FLIGHT_EXPORT FlightServerBase {
/// \brief Retrieve the schema and an access plan for the indicated
/// descriptor
/// \param[in] context The call context.
/// \param[in] request may be null
/// \param[in] request the dataset request, whether a named dataset or command
/// \param[out] info the returned flight info provider
/// \return Status
virtual Status GetFlightInfo(const ServerCallContext& context,
const FlightDescriptor& request,
std::unique_ptr<FlightInfo>* info);

/// \brief Retrieve the current status of the target query
/// \param[in] context The call context.
/// \param[in] request the dataset request or a descriptor returned by a
/// prioir PollFlightInfo call
/// \param[out] info the returned retry info provider
/// \return Status
virtual Status PollFlightInfo(const ServerCallContext& context,
const FlightDescriptor& request,
std::unique_ptr<RetryInfo>* info);

/// \brief Retrieve the schema for the indicated descriptor
/// \param[in] context The call context.
/// \param[in] request may be null
/// \param[in] request the dataset request, whether a named dataset or command
/// \param[out] schema the returned flight schema provider
/// \return Status
virtual Status GetSchema(const ServerCallContext& context,
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/arrow/flight/transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ Status ClientTransport::GetFlightInfo(const FlightCallOptions& options,
std::unique_ptr<FlightInfo>* info) {
return Status::NotImplemented("GetFlightInfo for this transport");
}
Status ClientTransport::PollFlightInfo(const FlightCallOptions& options,
const FlightDescriptor& descriptor,
std::unique_ptr<RetryInfo>* info) {
return Status::NotImplemented("PollFlightInfo for this transport");
}
arrow::Result<std::unique_ptr<SchemaResult>> ClientTransport::GetSchema(
const FlightCallOptions& options, const FlightDescriptor& descriptor) {
return Status::NotImplemented("GetSchema for this transport");
Expand Down
Loading

0 comments on commit c81dad7

Please sign in to comment.