-
Notifications
You must be signed in to change notification settings - Fork 169
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Pull Consumer #479
Add Pull Consumer #479
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Solid base for iteration, first batch of comments.
async-nats/src/jetstream/pull.rs
Outdated
/// can know when you reached the end of the stream for example. A 409 is returned if the | ||
/// Consumer has reached MaxAckPending limits. | ||
#[serde(default, skip_serializing_if = "is_default")] | ||
pub no_wait: bool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Variants that use skip serialize, would probably be better off as overloads.
pub no_wait: bool, | |
pub no_wait: bool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this will become unnecesarily messy. Skipping serialization is basically a way of saving a few bytes over the wire, nothing else.
async-nats/src/jetstream/pull.rs
Outdated
|
||
/// Used for next Pull Request for Pull Consumer | ||
#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)] | ||
pub struct BatchOptions { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would like to not have this options bag, and instead expose it as different overloads/iterators. Always nicer t call flat consumer.batch_<variant>()
calls.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keep in mind we have different behaviors depending on passed values:
- expires > 0 + no wait: true -> waits for first message (even if not available) then returns No Messages as soon as there are no no messages for a Consumer
- expires : 0 + no wait: true -> does not wait for first messages. Returns No Messages whenever there are no messages
- expires: whatever, no_wait: false -> waits until expires or fulfills batch.
Add to that max_bytes and heartbeat ... and we would end up with A LOT of variants. To many to have ergonomic API. Keep in mind this is not only an option bag, but an actual serialized struct as a json going via the API that we don't want to expose.
I would instead create low-level API that exposes those behaviors. But that is A LOT of overloads if we don't have arguments passed as Option
or config bag or builder.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keep in mind we have different behaviors depending on passed values:
Sounds like an argument FOR named variants that give us good documentation blocks? no waits sounds like a try variant for instance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already have two variants - fetch and process (one with no wait true, on with false.).
We're missing just the third variant and you just found a nice name for it.
We could actually hide BatchConfig
from the user, but we said that the low level API should be enabled for power users.
The only question is how to enable passing hearbet/max_bytes/fetch to those 3 methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would have first pass with current approach and extend while writing docs as a follow up PR.
@caspervonb WDYT?
async-nats/src/jetstream/consumer.rs
Outdated
pub(crate) config: T, | ||
pub(crate) info: ConsumerInfo, | ||
pub(crate) inbox: String, | ||
pub(crate) subscription: Option<Subscriber>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, shared state? can't this live on Batch
?
pub(crate) subscription: Option<Subscriber>, | |
pub(crate) subscription: Option<Subscriber>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should not. Creating new subscription for every batch is pretty bad for performance.
We can though switch to such approach as soon as muxing request subject is possible for Pull Consumers.
Would remove state and be more even more performant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't be for every batch, this was partially why I proposed having iterators of iterators: to have somewhere that is not the consumer that can have proper mutability semantics while owning the state, performing the requests as needed etc.
Right now consumer acts like its an iterator semantically, it's just not implementing the traits. You'll run into borrow issues going this way too.
Quick abstract, the basic lowest level layer can semantically be something along the lines of the following:
// A fetch operation here is an iterator fetches batches (naming is hard).
pub struct Fetch {
// owns the subscriber
subscriber: Subscriber,
// other relevant state.
}
impl Stream for Fetch {
// Result to deal with errors coming from requests
type Item = Result<Batch, Error>;
}
pub struct Batch {
// take a reference etc, if you need it.
subscriber: &'a Subscriber,
}
impl Stream for Batch {
// Can't fail as far as I'm aware at this point, so no result.
type Item = Message;
fn poll_next(
mut self: pin::Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> task::Poll<Option<Self::Item>> {
// Pull from the batch as needed.
}
}
impl Consumer {
pub fn fetch() -> Result<Fetch, Error> {
Ok(Fetch {})
}
fn poll_next(
mut self: pin::Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> task::Poll<Option<Self::Item>> {
// Dispatch requests as needed etc.
}
}
fn main() {
let consumer = ...;
// Built-in combinators can flatten this again.
for batch in consumer.fetch().next().await? {
for message in batch.next().await? {
println!("{:?}", message);
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for clarity: we seem to find reasonable compromise with current implementation.
simplified |
2a6de4e
to
63888e9
Compare
c27b43e
to
5b6b8b5
Compare
This is a first pass to get the API and implementation merged with the intent to follow up with iterators rewrite, docs and possibly API improvemnts Signed-off-by: Tomasz Pietrek <tomasz@nats.io>
5b6b8b5
to
e74da1f
Compare
No description provided.