-
Notifications
You must be signed in to change notification settings - Fork 229
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add RabbitMQ Stream source connector #798
Conversation
ea03894
to
e42e521
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few comments but overall this looks great! Thanks for the contribution.
type TableT = RabbitmqStreamTable; | ||
|
||
fn name(&self) -> &'static str { | ||
"rabbitmq_stream" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think just rabbitmq
would be a better name, as it aligns with our other connectors that identify the system rather than the particular way they interact. So for example, we have a single redis
connector that can read from various kinds of redis datastructures.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will also rename the RabbitmqStreamConnector
struct to RabbitmqConnector
. Ok?
client_keys_path: options.remove("tls_config.client_keys_path"), | ||
}); | ||
|
||
let load_balancer_mode = options.remove("json.unstructured").map(|t| t == "true"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not quite sure what's going on here—json.unstructured
controls our deserialization logic. Was this meant to be another config?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it should be load_balancer_mode
! Thank you for identifying this during the review process. This option forces the client to connect to the load balancer instead of connecting directly to the node.
https://www.rabbitmq.com/blog/2021/07/23/connecting-to-streams#with-a-load-balancer
let s = ctx.table_manager.get_global_keyed_state("s") | ||
.await | ||
.expect("should be able to get rabbitmq stream state"); | ||
s.insert(self.stream.clone(), offset).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On line 77 we're trying to read the state as a RabbitmqStreamState, but here we're inserting a u64. That breaks when we try to checkpoint the state with
2024-12-05T00:05:42.869074Z ERROR arroyo_server_common: panicked at crates/arroyo-connectors/src/rabbitmq_stream/source.rs:147:33:
should be able to get rabbitmq stream state: Failed to downcast table s to key type alloc::string::String and value type u64 panic.file="crates/arroyo-connectors/src/rabbitmq_stream/source.rs" panic.line=147 panic.column=33
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great!
This PR adds RabbitMQ Stream source connector.