Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C++][Go][Java][FlightRPC] Add support for long-running queries #36155

Closed
Tracked by #36954
kou opened this issue Jun 19, 2023 · 7 comments · Fixed by #36946
Closed
Tracked by #36954

[C++][Go][Java][FlightRPC] Add support for long-running queries #36155

kou opened this issue Jun 19, 2023 · 7 comments · Fixed by #36946

Comments

@kou
Copy link
Member

kou commented Jun 19, 2023

Describe the enhancement requested

Based on the proposal in https://docs.google.com/document/d/1jhPyPZSOo2iy0LqIJVUs9KWPyFULVFJXTILDfkadx2g/edit# .
See also the discussion thread: https://lists.apache.org/thread/247z3t06mf132nocngc1jkp3oqglz7jp

In Flight RPC, FlightInfo includes addresses of workers alongside result partition info. This lets clients fetch data directly from workers1, in parallel or even distributed across multiple machines. But this also comes with tradeoffs.

Queries generally don’t complete instantly (as much as we would like them to). So where can we put the ‘query evaluation time’?

  • In GetFlightInfo: block and wait for the query to complete.
    • Con: this is a long-running blocking call, which may fail or time out. Then when the client retries, the server has to redo all the work.
    • Con: parts of the result may be ready before others, but the client can’t do anything until everything is ready.
  • In DoGet: return a fixed number of partitions
    • Con: this makes handling worker failures hard. Systems like Trino support fault-tolerant execution by replacing workers at runtime. But GetFlightInfo has already passed, so we can’t notify the client of new workers2.
    • Con: we have to know or fix the partitioning up front.

Neither solution is optimal.

Proposal

We can address this by adding a retryable version of GetFlightInfo. First, we add a new RPC call and result message:

service FlightService {
  // ...
  rpc PollFlightInfo(FlightDescriptor) returns (PollInfo);
}

message PollInfo {
  // The currently available results so far.
  FlightInfo info = 1;
  // The descriptor the client should use on the next try.
  // If unset, the query is complete.
  FlightDescriptor flight_descriptor = 2;
  // Query progress. Must be in [0.0, 1.0] but need not be
  // monotonic or nondecreasing. If unknown, do not set.
  optional double progress = 3;
  // Expiration time for this request. After this passes, the server
  // might not accept the retry descriptor anymore (and the query may 
  // be cancelled). This may be updated on a call to PollFlightInfo.
  google.protobuf.Timestamp expiration_time = 4;
}

A client executes a query and polls for result completion. The server returns a FlightInfo representing the state of the query execution up to that point.

sequenceDiagram
    Client->>Server: PollFlightInfo(FlightDescriptor)
    Server->>Client: PollInfo(FlightDescriptor', FlightInfo)
    Client->>Server: PollFlightInfo(FlightDescriptor')
    Server->>Client: PollInfo(FlightDescriptor'', FlightInfo)
    Client->>Server: PollFlightInfo(FlightDescriptor'')
    Server->>Client: PollInfo(_, FlightInfo)
Loading

The server:

  • Must respond with the complete FlightInfo each time, not just the delta between the previous and current FlightInfo.

  • Should respond as quickly as possible on the first call.

  • Should not respond until the result would be different from last time. (That way, the client can “long poll” for updates without constantly making requests. Clients can set a short timeout to avoid blocking calls if desired.)

  • May respond by only updating the PollInfo.progress value (though it shouldn’t spam the client with updates).

  • Should recognize a PollInfo.flight_descriptor that is not necessarily the latest (in case the client misses an update in between).

  • Should only append to the endpoints in FlightInfo each time. (Otherwise the client has to do extra work to identify what endpoints it has and hasn’t seen.)

    When FlightInfo.ordered is set, this means the server returns endpoints in order.

  • Should return an error status instead of a response if the query fails. The client should not retry the request (except for TIMED_OUT and UNAVAILABLE, which may not originate from the server).

Prior Art

  • Amazon Redshift: executing a query gives an ID that can be used to check the query status and fetch results.
  • Google BigQuery Storage: you explicitly create a “read session”, after which you can read subsets of the response with further requests. There is no “query execution time” since BigQuery Storage only queries tables. Instead, running a query (with the base BigQuery API) will cache the result in a table that can be read via BigQuery Storage.
  • Snowflake: short queries return synchronously. Longer queries require polling for completion of the query. You cannot retrieve any results until the query is complete.

Component(s)

C++, FlightRPC, Format, Go, Java

Footnotes

  1. Of course, servers are free to return the location of a proxy/load balancer/etc., or omit locations and have the client fetch results from the same server that they issued the query to. Flight RPC offers this flexibility to servers; clients don’t have to know or care.

  2. Again, the server could proxy workers, or depend on Kubernetes DNS routing, or configure gRPC XDS. But this somewhat defeats the point of returning worker locations in the first place, and is much more complicated (operationally, implementation-wise).

@kou
Copy link
Member Author

kou commented Jul 28, 2023

We've added CancelFlightInfo action by #35500 but it accepts FlightInfo not FlightDescriptor.
So we can't use it for cancelling a running query with cancel_descriptor.

I think that we need to use PollFlightInfo(cancel_descriptor) instead of CancelFlightInfo action to cancel a running query but it may confuse users. Or we can remove RetryInfo::cancel_descriptor and use CancelFlightInfo(RetryInfo::info) instead.

@lidavidm Do you have any opinion for this case? If you don't have any opinion, I'll use the removing RetryInfo::cancel_descriptor and using CancelFlightInfo(RetryInfo::info) approach.

@lidavidm
Copy link
Member

I think the only problem there is, what if the query hasn't progressed far enough yet and there is nothing in the RetryInfo::info yet (no endpoints, no schema)?

We could try to handle this by adding a bytes app_metadata field to FlightInfo like we do for other messages. That way the server always has a field where it can safely store some metadata to identify the query.

@kou
Copy link
Member Author

kou commented Jul 28, 2023

I think the only problem there is, what if the query hasn't progressed far enough yet and there is nothing in the RetryInfo::info yet (no endpoints, no schema)?

Ah, you're right.

We could try to handle this by adding a bytes app_metadata field to FlightInfo like we do for other messages. That way the server always has a field where it can safely store some metadata to identify the query.

Or can we put cancel_descriptor content to RetryInfo::info::flight_descriptor?
FlightInfo::flight_descriptor must be the same FlightDescriptor client sent?

(I don't object the bytes app_metadata approach.)

@lidavidm
Copy link
Member

Or can we put cancel_descriptor content to RetryInfo::info::flight_descriptor?
FlightInfo::flight_descriptor must be the same FlightDescriptor client sent?

Oh, hmm, you're right. I don't think it's a requirement to be the same. (Actually, I never understood why it's there in the first place!) So we can do that :)

@kou
Copy link
Member Author

kou commented Jul 31, 2023

OK. I'll remove cancel_descriptor and rename RetryInfo::retry_descriptor to RetryInfo::descriptor because there is only one descriptor.

@kou
Copy link
Member Author

kou commented Jul 31, 2023

Ah, we can't use RetryInfo::descriptor because ProtoBuf generated codes use descriptor.
I'll use RetryInfo::flight_descriptor like FlightInfo::flight_descriptor...

kou added a commit to kou/arrow that referenced this issue Jul 31, 2023
…ries

In Flight RPC, FlightInfo includes addresses of workers alongside
result partition info. This lets clients fetch data directly from
workers, in parallel or even distributed across multiple machines. But
this also comes with tradeoffs.

Queries generally don't complete instantly (as much as we would like
them to). So where can we put the 'query evaluation time'?

* In `GetFlightInfo`: block and wait for the query to complete.
  * Con: this is a long-running blocking call, which may fail or time
    out. Then when the client retries, the server has to redo all the
    work.
  * Con: parts of the result may be ready before others, but the
    client can't do anything until everything is ready.
* In `DoGet`: return a fixed number of partitions
  * Con: this makes handling worker failures hard. Systems like Trino
    support fault-tolerant execution by replacing workers at
    runtime. But GetFlightInfo has already passed, so we can't notify
    the client of new workers.
  * Con: we have to know or fix the partitioning up front.

Neither solution is optimal.

We can address this by adding a retryable version of
`GetFlightInfo`: `PollFlightInfo(FlightDescriptor)`

`PollFlightInfo` returns `RetryInfo`:

```proto
message RetryInfo {
  // The currently available results so far.
  FlightInfo info = 1;
  // The descriptor the client should use on the next try.
  // If unset, the query is complete.
  FlightDescriptor flight_descriptor = 2;
  // Query progress. Must be in [0.0, 1.0] but need not be
  // monotonic or nondecreasing. If unknown, do not set.
  optional double progress = 3;
  // Expiration time for this request. After this passes, the server
  // might not accept the retry descriptor anymore (and the query may
  // be cancelled). This may be updated on a call to PollFlightInfo.
  google.protobuf.Timestamp expiration_time = 4;
}
```

See the documentation changes for details of them.
@kou kou changed the title [C++][Go][FlightRPC] Add support for long-running queries [C++][Go][Java][FlightRPC] Add support for long-running queries Aug 1, 2023
kou added a commit to kou/arrow that referenced this issue Aug 1, 2023
…ries

In Flight RPC, FlightInfo includes addresses of workers alongside
result partition info. This lets clients fetch data directly from
workers, in parallel or even distributed across multiple machines. But
this also comes with tradeoffs.

Queries generally don't complete instantly (as much as we would like
them to). So where can we put the 'query evaluation time'?

* In `GetFlightInfo`: block and wait for the query to complete.
  * Con: this is a long-running blocking call, which may fail or time
    out. Then when the client retries, the server has to redo all the
    work.
  * Con: parts of the result may be ready before others, but the
    client can't do anything until everything is ready.
* In `DoGet`: return a fixed number of partitions
  * Con: this makes handling worker failures hard. Systems like Trino
    support fault-tolerant execution by replacing workers at
    runtime. But GetFlightInfo has already passed, so we can't notify
    the client of new workers.
  * Con: we have to know or fix the partitioning up front.

Neither solution is optimal.

We can address this by adding a retryable version of
`GetFlightInfo`: `PollFlightInfo(FlightDescriptor)`

`PollFlightInfo` returns `RetryInfo`:

```proto
message RetryInfo {
  // The currently available results so far.
  FlightInfo info = 1;
  // The descriptor the client should use on the next try.
  // If unset, the query is complete.
  FlightDescriptor flight_descriptor = 2;
  // Query progress. Must be in [0.0, 1.0] but need not be
  // monotonic or nondecreasing. If unknown, do not set.
  optional double progress = 3;
  // Expiration time for this request. After this passes, the server
  // might not accept the retry descriptor anymore (and the query may
  // be cancelled). This may be updated on a call to PollFlightInfo.
  google.protobuf.Timestamp expiration_time = 4;
}
```

See the documentation changes for details of them.
kou added a commit to kou/arrow that referenced this issue Aug 9, 2023
…ries

In Flight RPC, FlightInfo includes addresses of workers alongside
result partition info. This lets clients fetch data directly from
workers, in parallel or even distributed across multiple machines. But
this also comes with tradeoffs.

Queries generally don't complete instantly (as much as we would like
them to). So where can we put the 'query evaluation time'?

* In `GetFlightInfo`: block and wait for the query to complete.
  * Con: this is a long-running blocking call, which may fail or time
    out. Then when the client retries, the server has to redo all the
    work.
  * Con: parts of the result may be ready before others, but the
    client can't do anything until everything is ready.
* In `DoGet`: return a fixed number of partitions
  * Con: this makes handling worker failures hard. Systems like Trino
    support fault-tolerant execution by replacing workers at
    runtime. But GetFlightInfo has already passed, so we can't notify
    the client of new workers.
  * Con: we have to know or fix the partitioning up front.

Neither solution is optimal.

We can address this by adding a retryable version of
`GetFlightInfo`: `PollFlightInfo(FlightDescriptor)`

`PollFlightInfo` returns `RetryInfo`:

```proto
message RetryInfo {
  // The currently available results so far.
  FlightInfo info = 1;
  // The descriptor the client should use on the next try.
  // If unset, the query is complete.
  FlightDescriptor flight_descriptor = 2;
  // Query progress. Must be in [0.0, 1.0] but need not be
  // monotonic or nondecreasing. If unknown, do not set.
  optional double progress = 3;
  // Expiration time for this request. After this passes, the server
  // might not accept the retry descriptor anymore (and the query may
  // be cancelled). This may be updated on a call to PollFlightInfo.
  google.protobuf.Timestamp expiration_time = 4;
}
```

See the documentation changes for details of them.
kou added a commit to kou/arrow that referenced this issue Aug 10, 2023
…ries

In Flight RPC, FlightInfo includes addresses of workers alongside
result partition info. This lets clients fetch data directly from
workers, in parallel or even distributed across multiple machines. But
this also comes with tradeoffs.

Queries generally don't complete instantly (as much as we would like
them to). So where can we put the 'query evaluation time'?

* In `GetFlightInfo`: block and wait for the query to complete.
  * Con: this is a long-running blocking call, which may fail or time
    out. Then when the client retries, the server has to redo all the
    work.
  * Con: parts of the result may be ready before others, but the
    client can't do anything until everything is ready.
* In `DoGet`: return a fixed number of partitions
  * Con: this makes handling worker failures hard. Systems like Trino
    support fault-tolerant execution by replacing workers at
    runtime. But GetFlightInfo has already passed, so we can't notify
    the client of new workers.
  * Con: we have to know or fix the partitioning up front.

Neither solution is optimal.

We can address this by adding a retryable version of
`GetFlightInfo`: `PollFlightInfo(FlightDescriptor)`

`PollFlightInfo` returns `RetryInfo`:

```proto
message RetryInfo {
  // The currently available results so far.
  FlightInfo info = 1;
  // The descriptor the client should use on the next try.
  // If unset, the query is complete.
  FlightDescriptor flight_descriptor = 2;
  // Query progress. Must be in [0.0, 1.0] but need not be
  // monotonic or nondecreasing. If unknown, do not set.
  optional double progress = 3;
  // Expiration time for this request. After this passes, the server
  // might not accept the retry descriptor anymore (and the query may
  // be cancelled). This may be updated on a call to PollFlightInfo.
  google.protobuf.Timestamp expiration_time = 4;
}
```

See the documentation changes for details of them.
@kou
Copy link
Member Author

kou commented Aug 15, 2023

@kou kou added this to the 14.0.0 milestone Aug 15, 2023
kou added a commit that referenced this issue Aug 15, 2023
…ries (#36946)

### Rationale for this change

In Flight RPC, FlightInfo includes addresses of workers alongside result partition info. This lets clients fetch data directly from workers, in parallel or even distributed across multiple machines. But this also comes with tradeoffs.

Queries generally don't complete instantly (as much as we would like them to). So where can we put the 'query evaluation time'?

* In `GetFlightInfo`: block and wait for the query to complete.
  * Con: this is a long-running blocking call, which may fail or time out. Then when the client retries, the server has to redo all the work.
  * Con: parts of the result may be ready before others, but the client can't do anything until everything is ready.
* In `DoGet`: return a fixed number of partitions
  * Con: this makes handling worker failures hard. Systems like Trino support fault-tolerant execution by replacing workers at runtime. But GetFlightInfo has already passed, so we can't notify the client of new workers.
  * Con: we have to know or fix the partitioning up front.

Neither solution is optimal.

### What changes are included in this PR?

We can address this by adding a retryable version of `GetFlightInfo`: `PollFlightInfo(FlightDescriptor)`

`PollFlightInfo` returns `PollInfo`:

```proto
message PollInfo {
  // The currently available results so far.
  FlightInfo info = 1;
  // The descriptor the client should use on the next try.
  // If unset, the query is complete.
  FlightDescriptor flight_descriptor = 2;
  // Query progress. Must be in [0.0, 1.0] but need not be
  // monotonic or nondecreasing. If unknown, do not set.
  optional double progress = 3;
  // Expiration time for this request. After this passes, the server
  // might not accept the retry descriptor anymore (and the query may
  // be cancelled). This may be updated on a call to PollFlightInfo.
  google.protobuf.Timestamp expiration_time = 4;
}
```

See the documentation changes for details of them:
http://crossbow.voltrondata.com/pr_docs/36946/format/Flight.html#downloading-data-by-running-a-heavy-query

### Are these changes tested?

Yes.

This has C++, Go and Java implementations and an integration test with them.

### Are there any user-facing changes?

Yes.
* Closes: #36155

Lead-authored-by: Sutou Kouhei <kou@clear-code.com>
Co-authored-by: Sutou Kouhei <kou@cozmixng.org>
Co-authored-by: David Li <li.davidm96@gmail.com>
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Signed-off-by: Sutou Kouhei <kou@clear-code.com>
tdcmeehan added a commit to tdcmeehan/arrow that referenced this issue Sep 1, 2023
…g-running queries

With apache#36155, implementations of Flight RPC may not return quickly via a newly added pollFlightInfo function. Sometimes, the system implementing this function may not know the output schema for some time--for example, after a lengthy queue time as elapsed, or after planning.

In proto3, fields may not be present, and it's a coding convention to require them 1. To support upcoming client integration work for pollFlightInfo, the schema field can be made optional so that it's not a requirement to populate the FlightInfo's schema on the first pollFlightInfo request.

We can modify our client code to allow this field to be optional. This is already the case for the Go code.

This changes the Java client code to allow the Schema to be null.  `getSchema` methods now return `Optional<Schema>`, which is a backwards incompatible change.
tdcmeehan added a commit to tdcmeehan/arrow that referenced this issue Sep 1, 2023
…g-running queries

With apache#36155, implementations of Flight RPC may not return quickly via a newly added pollFlightInfo function. Sometimes, the system implementing this function may not know the output schema for some time--for example, after a lengthy queue time as elapsed, or after planning.

In proto3, fields may not be present, and it's a coding convention to require them 1. To support upcoming client integration work for pollFlightInfo, the schema field can be made optional so that it's not a requirement to populate the FlightInfo's schema on the first pollFlightInfo request.

We can modify our client code to allow this field to be optional. This is already the case for the Go code.

This changes the Java client code to allow the Schema to be null.  `getSchema` methods now return `Optional<Schema>`, which is a backwards incompatible change.
tdcmeehan added a commit to tdcmeehan/arrow that referenced this issue Sep 1, 2023
…g-running queries

With apache#36155, implementations of Flight RPC may not return quickly via a newly added pollFlightInfo function. Sometimes, the system implementing this function may not know the output schema for some time--for example, after a lengthy queue time as elapsed, or after planning.

In proto3, fields may not be present, and it's a coding convention to require them 1. To support upcoming client integration work for pollFlightInfo, the schema field can be made optional so that it's not a requirement to populate the FlightInfo's schema on the first pollFlightInfo request.

We can modify our client code to allow this field to be optional. This is already the case for the Go code.

This changes the Java client code to allow the Schema to be null.  `getSchema` methods now return `Optional<Schema>`, which is a backwards incompatible change.
tdcmeehan added a commit to tdcmeehan/arrow that referenced this issue Sep 1, 2023
…g-running queries

With apache#36155, implementations of Flight RPC may not return quickly via a newly added pollFlightInfo function. Sometimes, the system implementing this function may not know the output schema for some time--for example, after a lengthy queue time as elapsed, or after planning.

In proto3, fields may not be present, and it's a coding convention to require them 1. To support upcoming client integration work for pollFlightInfo, the schema field can be made optional so that it's not a requirement to populate the FlightInfo's schema on the first pollFlightInfo request.

We can modify our client code to allow this field to be optional. This is already the case for the Go code.

This changes the Java client code to allow the Schema to be null.  `getSchema` methods now return `Optional<Schema>`, which is a backwards incompatible change.
tdcmeehan added a commit to tdcmeehan/arrow that referenced this issue Sep 5, 2023
…g-running queries

With apache#36155, implementations of Flight RPC may not return quickly via a newly added pollFlightInfo function. Sometimes, the system implementing this function may not know the output schema for some time--for example, after a lengthy queue time has elapsed, or after planning.

In proto3, fields may not be present, and it's a coding convention to require them. To support upcoming client integration work for pollFlightInfo, the schema field can be made optional so that it's not a requirement to populate the FlightInfo's schema on the first pollFlightInfo request.

We can modify our client code to allow this field to be optional. This is already the case for the Go code.

This changes the Java client code to allow the Schema to be null.  `getSchema` methods now return `Optional<Schema>`, which is a backwards incompatible change.
tdcmeehan added a commit to tdcmeehan/arrow that referenced this issue Sep 6, 2023
…g-running queries

With apache#36155, implementations of Flight RPC may not return quickly via a newly added pollFlightInfo function. Sometimes, the system implementing this function may not know the output schema for some time--for example, after a lengthy queue time has elapsed, or after planning.

In proto3, fields may not be present, and it's a coding convention to require them. To support upcoming client integration work for pollFlightInfo, the schema field can be made optional so that it's not a requirement to populate the FlightInfo's schema on the first pollFlightInfo request.

We can modify our client code to allow this field to be optional. This is already the case for the Go code.

This changes the Java client code to allow the Schema to be null.  `getSchema` methods now return `Optional<Schema>`, which is a backwards incompatible change.
tdcmeehan added a commit to tdcmeehan/arrow that referenced this issue Sep 6, 2023
…g-running queries

With apache#36155, implementations of Flight RPC may not return quickly via a newly added pollFlightInfo function. Sometimes, the system implementing this function may not know the output schema for some time--for example, after a lengthy queue time has elapsed, or after planning.

In proto3, fields may not be present, and it's a coding convention to require them. To support upcoming client integration work for pollFlightInfo, the schema field can be made optional so that it's not a requirement to populate the FlightInfo's schema on the first pollFlightInfo request.

We can modify our client code to allow this field to be optional. This is already the case for the Go code.

This changes the Java client code to allow the Schema to be null.  `getSchema` methods now return `Optional<Schema>`, which is a backwards incompatible change.
tdcmeehan added a commit to tdcmeehan/arrow that referenced this issue Sep 6, 2023
…g-running queries

With apache#36155, implementations of Flight RPC may not return quickly via a newly added pollFlightInfo function. Sometimes, the system implementing this function may not know the output schema for some time--for example, after a lengthy queue time has elapsed, or after planning.

In proto3, fields may not be present, and it's a coding convention to require them. To support upcoming client integration work for pollFlightInfo, the schema field can be made optional so that it's not a requirement to populate the FlightInfo's schema on the first pollFlightInfo request.

We can modify our client code to allow this field to be optional. This is already the case for the Go code.

This changes the Java client code to allow the Schema to be null.  `getSchema` methods now return `Optional<Schema>`, which is a backwards incompatible change.
tdcmeehan added a commit to tdcmeehan/arrow that referenced this issue Sep 6, 2023
…g-running queries

With apache#36155, implementations of Flight RPC may not return quickly via a newly added pollFlightInfo function. Sometimes, the system implementing this function may not know the output schema for some time--for example, after a lengthy queue time has elapsed, or after planning.

In proto3, fields may not be present, and it's a coding convention to require them. To support upcoming client integration work for pollFlightInfo, the schema field can be made optional so that it's not a requirement to populate the FlightInfo's schema on the first pollFlightInfo request.

We can modify our client code to allow this field to be optional. This is already the case for the Go code.

This changes the Java client code to allow the Schema to be null.  `getSchema` methods now return `Optional<Schema>`, which is a backwards incompatible change.
tdcmeehan added a commit to tdcmeehan/arrow that referenced this issue Sep 6, 2023
…g-running queries

With apache#36155, implementations of Flight RPC may not return quickly via a newly added pollFlightInfo function. Sometimes, the system implementing this function may not know the output schema for some time--for example, after a lengthy queue time has elapsed, or after planning.

In proto3, fields may not be present, and it's a coding convention to require them. To support upcoming client integration work for pollFlightInfo, the schema field can be made optional so that it's not a requirement to populate the FlightInfo's schema on the first pollFlightInfo request.

We can modify our client code to allow this field to be optional. This is already the case for the Go code.

This changes the Java client code to allow the Schema to be null.  `getSchema` methods now return `Optional<Schema>`, which is a backwards incompatible change.
tdcmeehan added a commit to tdcmeehan/arrow that referenced this issue Sep 6, 2023
…g-running queries

With apache#36155, implementations of Flight RPC may not return quickly via a newly added pollFlightInfo function. Sometimes, the system implementing this function may not know the output schema for some time--for example, after a lengthy queue time has elapsed, or after planning.

In proto3, fields may not be present, and it's a coding convention to require them. To support upcoming client integration work for pollFlightInfo, the schema field can be made optional so that it's not a requirement to populate the FlightInfo's schema on the first pollFlightInfo request.

We can modify our client code to allow this field to be optional. This is already the case for the Go code.

This changes the Java client code to allow the Schema to be null.  `getSchema` methods now return `Optional<Schema>`, which is a backwards incompatible change.
tdcmeehan added a commit to tdcmeehan/arrow that referenced this issue Sep 7, 2023
…g-running queries

With apache#36155, implementations of Flight RPC may not return quickly via a newly added pollFlightInfo function. Sometimes, the system implementing this function may not know the output schema for some time--for example, after a lengthy queue time has elapsed, or after planning.

In proto3, fields may not be present, and it's a coding convention to require them. To support upcoming client integration work for pollFlightInfo, the schema field can be made optional so that it's not a requirement to populate the FlightInfo's schema on the first pollFlightInfo request.

We can modify our client code to allow this field to be optional. This is already the case for the Go code.

This changes the Java client code to allow the Schema to be null.  `getSchema` methods now return `Optional<Schema>`, which is a backwards incompatible change.
lidavidm pushed a commit that referenced this issue Sep 7, 2023
…ing queries (#37528)

With #36155, implementations of Flight RPC may not return quickly via a newly added pollFlightInfo function. Sometimes, the system implementing this function may not know the output schema for some time--for example, after a lengthy queue time as elapsed, or after planning.

In proto3, fields may not be present, and it's a coding convention to require them 1. To support upcoming client integration work for pollFlightInfo, the schema field can be made optional so that it's not a requirement to populate the FlightInfo's schema on the first pollFlightInfo request.

We can modify our client code to allow this field to be optional. This is already the case for the Go code.

This changes the Java client code to allow the Schema to be null.  A new `getSchemaOptional` method returns `Optional<Schema>`, which is a backwards compatible change.  The existing method is deprecated, but will still return an empty schema if the schema is not present on wire (as it used to before).

### Rationale for this change

With #36155, implementations of Flight RPC may not return quickly via a newly added pollFlightInfo function. Sometimes, the system implementing this function may not know the output schema for some time--for example, after a lengthy queue time as elapsed, or after planning.

In proto3, fields may not be present, and it's a coding convention to require them 1. To support upcoming client integration work for pollFlightInfo, the schema field can be made optional so that it's not a requirement to populate the FlightInfo's schema on the first pollFlightInfo request.

CC: `@ lidavidm`

### What changes are included in this PR?

This changes the Java client code to allow the Schema to be null.  `getSchema` is now deprecated and a new `getSchemaOptional` returns `Optional<Schema>`, which is a backwards compatible change.

### Are these changes tested?

Existing tests ensure serialization and deserialization continue to work.

### Are there any user-facing changes?

The `getSchema` methods are now deprecated in favor of `getSchemaOptional`.

* Closes: #37553

Authored-by: Tim Meehan <tim@timdmeehan.com>
Signed-off-by: David Li <li.davidm96@gmail.com>
loicalleyne pushed a commit to loicalleyne/arrow that referenced this issue Nov 13, 2023
…ng queries (apache#36946)

### Rationale for this change

In Flight RPC, FlightInfo includes addresses of workers alongside result partition info. This lets clients fetch data directly from workers, in parallel or even distributed across multiple machines. But this also comes with tradeoffs.

Queries generally don't complete instantly (as much as we would like them to). So where can we put the 'query evaluation time'?

* In `GetFlightInfo`: block and wait for the query to complete.
  * Con: this is a long-running blocking call, which may fail or time out. Then when the client retries, the server has to redo all the work.
  * Con: parts of the result may be ready before others, but the client can't do anything until everything is ready.
* In `DoGet`: return a fixed number of partitions
  * Con: this makes handling worker failures hard. Systems like Trino support fault-tolerant execution by replacing workers at runtime. But GetFlightInfo has already passed, so we can't notify the client of new workers.
  * Con: we have to know or fix the partitioning up front.

Neither solution is optimal.

### What changes are included in this PR?

We can address this by adding a retryable version of `GetFlightInfo`: `PollFlightInfo(FlightDescriptor)`

`PollFlightInfo` returns `PollInfo`:

```proto
message PollInfo {
  // The currently available results so far.
  FlightInfo info = 1;
  // The descriptor the client should use on the next try.
  // If unset, the query is complete.
  FlightDescriptor flight_descriptor = 2;
  // Query progress. Must be in [0.0, 1.0] but need not be
  // monotonic or nondecreasing. If unknown, do not set.
  optional double progress = 3;
  // Expiration time for this request. After this passes, the server
  // might not accept the retry descriptor anymore (and the query may
  // be cancelled). This may be updated on a call to PollFlightInfo.
  google.protobuf.Timestamp expiration_time = 4;
}
```

See the documentation changes for details of them:
http://crossbow.voltrondata.com/pr_docs/36946/format/Flight.html#downloading-data-by-running-a-heavy-query

### Are these changes tested?

Yes.

This has C++, Go and Java implementations and an integration test with them.

### Are there any user-facing changes?

Yes.
* Closes: apache#36155

Lead-authored-by: Sutou Kouhei <kou@clear-code.com>
Co-authored-by: Sutou Kouhei <kou@cozmixng.org>
Co-authored-by: David Li <li.davidm96@gmail.com>
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Signed-off-by: Sutou Kouhei <kou@clear-code.com>
loicalleyne pushed a commit to loicalleyne/arrow that referenced this issue Nov 13, 2023
…g-running queries (apache#37528)

With apache#36155, implementations of Flight RPC may not return quickly via a newly added pollFlightInfo function. Sometimes, the system implementing this function may not know the output schema for some time--for example, after a lengthy queue time as elapsed, or after planning.

In proto3, fields may not be present, and it's a coding convention to require them 1. To support upcoming client integration work for pollFlightInfo, the schema field can be made optional so that it's not a requirement to populate the FlightInfo's schema on the first pollFlightInfo request.

We can modify our client code to allow this field to be optional. This is already the case for the Go code.

This changes the Java client code to allow the Schema to be null.  A new `getSchemaOptional` method returns `Optional<Schema>`, which is a backwards compatible change.  The existing method is deprecated, but will still return an empty schema if the schema is not present on wire (as it used to before).

### Rationale for this change

With apache#36155, implementations of Flight RPC may not return quickly via a newly added pollFlightInfo function. Sometimes, the system implementing this function may not know the output schema for some time--for example, after a lengthy queue time as elapsed, or after planning.

In proto3, fields may not be present, and it's a coding convention to require them 1. To support upcoming client integration work for pollFlightInfo, the schema field can be made optional so that it's not a requirement to populate the FlightInfo's schema on the first pollFlightInfo request.

CC: `@ lidavidm`

### What changes are included in this PR?

This changes the Java client code to allow the Schema to be null.  `getSchema` is now deprecated and a new `getSchemaOptional` returns `Optional<Schema>`, which is a backwards compatible change.

### Are these changes tested?

Existing tests ensure serialization and deserialization continue to work.

### Are there any user-facing changes?

The `getSchema` methods are now deprecated in favor of `getSchemaOptional`.

* Closes: apache#37553

Authored-by: Tim Meehan <tim@timdmeehan.com>
Signed-off-by: David Li <li.davidm96@gmail.com>
dgreiss pushed a commit to dgreiss/arrow that referenced this issue Feb 19, 2024
…g-running queries (apache#37528)

With apache#36155, implementations of Flight RPC may not return quickly via a newly added pollFlightInfo function. Sometimes, the system implementing this function may not know the output schema for some time--for example, after a lengthy queue time as elapsed, or after planning.

In proto3, fields may not be present, and it's a coding convention to require them 1. To support upcoming client integration work for pollFlightInfo, the schema field can be made optional so that it's not a requirement to populate the FlightInfo's schema on the first pollFlightInfo request.

We can modify our client code to allow this field to be optional. This is already the case for the Go code.

This changes the Java client code to allow the Schema to be null.  A new `getSchemaOptional` method returns `Optional<Schema>`, which is a backwards compatible change.  The existing method is deprecated, but will still return an empty schema if the schema is not present on wire (as it used to before).

### Rationale for this change

With apache#36155, implementations of Flight RPC may not return quickly via a newly added pollFlightInfo function. Sometimes, the system implementing this function may not know the output schema for some time--for example, after a lengthy queue time as elapsed, or after planning.

In proto3, fields may not be present, and it's a coding convention to require them 1. To support upcoming client integration work for pollFlightInfo, the schema field can be made optional so that it's not a requirement to populate the FlightInfo's schema on the first pollFlightInfo request.

CC: `@ lidavidm`

### What changes are included in this PR?

This changes the Java client code to allow the Schema to be null.  `getSchema` is now deprecated and a new `getSchemaOptional` returns `Optional<Schema>`, which is a backwards compatible change.

### Are these changes tested?

Existing tests ensure serialization and deserialization continue to work.

### Are there any user-facing changes?

The `getSchema` methods are now deprecated in favor of `getSchemaOptional`.

* Closes: apache#37553

Authored-by: Tim Meehan <tim@timdmeehan.com>
Signed-off-by: David Li <li.davidm96@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment