-
Notifications
You must be signed in to change notification settings - Fork 839
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix flight sql do put handling, add bind parameter support to FlightSQL cli client #4797
Conversation
arrow-flight/src/sql/client.rs
Outdated
fn flight_error_to_arrow_error(err: FlightError) -> ArrowError { | ||
match err { | ||
FlightError::Arrow(e) => e, | ||
FlightError::NotYetImplemented(s) => ArrowError::NotYetImplemented(s), | ||
FlightError::Tonic(status) => status_to_arrow_error(status), | ||
FlightError::ProtocolError(e) => ArrowError::IpcError(e), | ||
FlightError::DecodeError(s) => ArrowError::IpcError(s), | ||
FlightError::ExternalError(e) => ArrowError::ExternalError(e), | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure why we return ArrowError
from the Flight client (instead of FlightError
), but I am trying to keep this PR scoped, so I just decided to stay consistent
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree this should probably be FlightError
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me, however, I'm not very well versed in FlightSQL and so have asked some potentially stupid questions
const PREPARED_QUERY: &str = "SELECT * FROM table WHERE field = $1"; | ||
const PREPARED_STATEMENT_HANDLE: &str = "prepared_statement_handle"; | ||
|
||
#[tokio::test(flavor = "multi_thread", worker_threads = 1)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need multi thread here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really think so. I just followed what the existing test did, but I don't really know see why it should. I will try and see if it works without it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems to work fine without it, so I just removed it
arrow-flight/src/sql/server.rs
Outdated
// To allow the first message to be reused by the `do_put` handler, | ||
// we wrap this stream in a `Peekable` one, which allows us to peek at | ||
// the first message without discarding it. | ||
let mut request = request.map(futures::StreamExt::peekable); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So if I am following correctly, the issue is do_put accepts a FlightData stream, but the first request in the stream will contain a FlightDescriptor in addition to potentially any data. I continue to be utterly baffled by the design of Flight 😅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that is my understanding as well. Prior to this change, decoding a flight stream inside one of the do_put
methods would give you an error like Received RecordBatch prior to schema
arrow-flight/src/sql/client.rs
Outdated
fn flight_error_to_arrow_error(err: FlightError) -> ArrowError { | ||
match err { | ||
FlightError::Arrow(e) => e, | ||
FlightError::NotYetImplemented(s) => ArrowError::NotYetImplemented(s), | ||
FlightError::Tonic(status) => status_to_arrow_error(status), | ||
FlightError::ProtocolError(e) => ArrowError::IpcError(e), | ||
FlightError::DecodeError(s) => ArrowError::IpcError(s), | ||
FlightError::ExternalError(e) => ArrowError::ExternalError(e), | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree this should probably be FlightError
.await | ||
.map_err(flight_error_to_arrow_error)?; | ||
|
||
self.flight_sql_client |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This appears consistent with the FlightSQL specification, it uses do_put to bind the parameter arguments. What isn't clear to me is if the result should be being used in some way.
This would seem to imply some sort of server-side state which I had perhaps expected FlightSQL to not rely on
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think we are in agreement about it implying server-side state. FWIW FlightSQL also supports transactions which I think (maybe wrongly) would also require state. There was also some discussion happening about adding new RPC's for managing session state at some point (like a close
RPC or something)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like a fundamental flaw in FlightSQL tbh, gRPC is not a connection-oriented protocol and so the lifetime of any server state is non-deterministic... I believe @alamb plans to start a discussion to see if we can't fix this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I filed apache/arrow#37720 and will circulate this around
arrow-flight/src/sql/server.rs
Outdated
@@ -366,7 +366,7 @@ pub trait FlightSqlService: Sync + Send + Sized + 'static { | |||
/// Implementors may override to handle additional calls to do_put() | |||
async fn do_put_fallback( | |||
&self, | |||
_request: Request<Streaming<FlightData>>, | |||
_request: Request<Peekable<Streaming<FlightData>>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another option might be to pass the first ticket request as a separate argument. I don't feel strongly either way
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a tough decision for me. I prefer using Peekable
as it can be used as if it were the original stream, but I hate the fact that we have to leak its usage. We could pass the first FlightData
as a separate argument, but it would require the user to chain
it with the stream, if they wanted to use any APIs expecting a stream of FlightData. So I think I would stick with Peekable
in the absence of any preference from others.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This server is the scaffolding to help people build flightsql servers -- they can always use the raw FlightService if they prefer. Thus I think the change in API is less critical and given the requirements it seems inevitable.
I think the only thing we should try and improve in this PR is improving the documentation to explain why peekable is used somehow (to lower the cognative burden on people trying to use this).
One potential option to document this is rather than using Peekable<Streaming<...>>
dirctly, would make our own wrapper, PeekableStreaming
or something. While this would require duplicate a bunch of the peekable API, there would be a natural place to document what it was for and how to use it which I think would lower the barrier to usage.
For example:
/// A wrapper around `Streaming` that allows inspection of the first message.
/// This is needed because sometimes the first request in the stream will contain
/// a [`FlightDescriptor`] in addition to potentially any data and the dispatch logic
/// must inspect this information.
///
/// # example:
/// <show an example here of calling `into_inner()` to get the original data back
struct PeekableStreaming {
inner: Peekable<Streaming<FlightData>>
}
impl PeekableStreaming {
/// return the inner stream
pub fn into_inner(self) -> Streaming<FlightData> { self.inner.into_inner() }
...
}
We could also potentially use something like BoxStream<FlightData>
but that would lose the gRPC specific stuff like status codes and trailers exposed by Streaming
as well as being an API change as well.
Thus I think this design is the best of several less than ideal solutions. To proceed perhaps we can add some documentation on the do_*_fallback
methods that mentions the stream comes from peekable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a new type, PeekableFlightDataStream
, which exposes into_inner
and peek
, similarly to Peekable
. I think this is a good enough subset of functionality for FlightSQL use cases, and if users need access to more of the lower-level functionality of Peekable
, they can call PeekableFlightDataStream::into_peekable
.
Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com>
let cmd = CommandPreparedStatementQuery { | ||
self.write_bind_params().await?; | ||
|
||
let cmd = CommandPreparedStatementUpdate { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also forgot to mention, I think this was a bug in the existing implementation. ExecuteUpdate
should be performed with a CommandPreparedStatementUpdate
command, not a CommandPreparedStatementQuery
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree - update
should use CommandPreparedStatementUpdate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let cmd = CommandPreparedStatementQuery { | ||
self.write_bind_params().await?; | ||
|
||
let cmd = CommandPreparedStatementUpdate { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree - update
should use CommandPreparedStatementUpdate
.arg(addr.port().to_string()) | ||
.arg("prepared-statement-query") | ||
.arg(PREPARED_QUERY) | ||
.args(["-p", "$1=string"]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💯 for the tests
arrow-flight/src/sql/server.rs
Outdated
@@ -366,7 +366,7 @@ pub trait FlightSqlService: Sync + Send + Sized + 'static { | |||
/// Implementors may override to handle additional calls to do_put() | |||
async fn do_put_fallback( | |||
&self, | |||
_request: Request<Streaming<FlightData>>, | |||
_request: Request<Peekable<Streaming<FlightData>>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This server is the scaffolding to help people build flightsql servers -- they can always use the raw FlightService if they prefer. Thus I think the change in API is less critical and given the requirements it seems inevitable.
I think the only thing we should try and improve in this PR is improving the documentation to explain why peekable is used somehow (to lower the cognative burden on people trying to use this).
One potential option to document this is rather than using Peekable<Streaming<...>>
dirctly, would make our own wrapper, PeekableStreaming
or something. While this would require duplicate a bunch of the peekable API, there would be a natural place to document what it was for and how to use it which I think would lower the barrier to usage.
For example:
/// A wrapper around `Streaming` that allows inspection of the first message.
/// This is needed because sometimes the first request in the stream will contain
/// a [`FlightDescriptor`] in addition to potentially any data and the dispatch logic
/// must inspect this information.
///
/// # example:
/// <show an example here of calling `into_inner()` to get the original data back
struct PeekableStreaming {
inner: Peekable<Streaming<FlightData>>
}
impl PeekableStreaming {
/// return the inner stream
pub fn into_inner(self) -> Streaming<FlightData> { self.inner.into_inner() }
...
}
We could also potentially use something like BoxStream<FlightData>
but that would lose the gRPC specific stuff like status codes and trailers exposed by Streaming
as well as being an API change as well.
Thus I think this design is the best of several less than ideal solutions. To proceed perhaps we can add some documentation on the do_*_fallback
methods that mentions the stream comes from peekable
.await | ||
.map_err(flight_error_to_arrow_error)?; | ||
|
||
self.flight_sql_client |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I filed apache/arrow#37720 and will circulate this around
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is really nicely done -- thank you @suremarc
|
||
/// A wrapper around [`Streaming<FlightData>`] that allows "peeking" at the | ||
/// message at the front of the stream without consuming it. | ||
/// This is needed because sometimes the first message in the stream will contain |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Which issue does this PR close?
Closes #4658 and closes #3598
Rationale for this change
DoPut
requests with nonempty flight streams cannot be handled properly by the Rust FlightSQL server implementation in its current state.What changes are included in this PR?
We change all
DoPut
methods on theFlightSqlService
trait to accept aPeekable<Streaming<FlightData>>
instead of a regularStreaming<FlightData>
. We also finished the parameter binding functionality in the client, in order to test prepared statements properly.Are there any user-facing changes?
Yes, there is unfortunately an API change for the
FlightSqlService
trait. I am open to alternatives, as it is probably possible to do evil things withPeekable
, but I do not think it is possible to fix this without a breaking change.