-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fetch response queue #63
base: main
Are you sure you want to change the base?
Conversation
d82ef44
to
4e00657
Compare
// A list of ops. | ||
message Ops { | ||
// Ops. | ||
repeated kitsune2.op_store.Op data = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Re-use op definition.
} | ||
|
||
/// Serialize list of ops for sending over the wire. | ||
pub fn serialize_ops(value: Vec<MetaOp>) -> bytes::Bytes { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very similar to the previous op id one. I could write this as a generic function which saves us code here but requires then types when calling the function.
@@ -157,191 +147,4 @@ mod test { | |||
|
|||
assert!(!back_off_list.is_agent_on_back_off(&agent_id)); | |||
} | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These tests are just moved to the request_queue.rs
file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file contains now shared util functions. Tests for the parts of the fetch module are split out into their own test files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No new test cases here, just updated API.
4e00657
to
57b3547
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good, I like the approach and most of my comments are about naming and testing.
// An op. | ||
message Op { | ||
// Op id. | ||
bytes op_id = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Want to question the value of sending the op_id
with the Op
. When we're communicating a diff, we have to work in terms of op_id
s but when we have the op_data
, we shouldn't trust the op_id
that's been provided, we should calculate it from the op_data
. This links to what we were discussing on the DHT diff PR where hash length mismatches would cause an error, we need to be properly checking these as they arrive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's true. I'll refactor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait, we don't have a hashing for that yet tho. Should I add some temporary hashing for the id?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes please. I had to do something similar with receiving ops into the store vs notifying the DHT model that ops have arrived on the host. Maybe it needs to be a proper test utility as part of demonstrating the host. I can't remember exactly where I did this because it's been a couple of weeks, but I think I skipped actually calculating hashes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll have to compute hashes if the ops that are requested by id and then sent without id need to be compared. I can add that as a test utility.
//! - Simple queue which processes items in the order of the incoming requests. | ||
//! - Requests consist of a list of requested op ids and the requesting agent id. | ||
//! - Attempts to look up the op in the data store and send response are done once. | ||
//! - Requests for data that the remote doesn't hold should be logged. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like it needs a second look? Is this "requests for data we don't have are logged"?
//! - Attempts to look up the op in the data store and send response are done once. | ||
//! - Requests for data that the remote doesn't hold should be logged. | ||
//! - If none of the requested ops could be read from the store, no response is sent. | ||
//! - If sending or reception fails, it's the caller's responsibility to request again. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
//! - If sending or reception fails, it's the caller's responsibility to request again. | |
//! - If sending or receiving fails, it's the caller's responsibility to request again. |
let requested_ops_1 = vec![op_id_1.clone(), op_id_2.clone()]; | ||
let requested_ops_2 = vec![op_id_3.clone(), op_id_4.clone()]; | ||
futures::future::join_all([ | ||
fetch.respond_with_ops(requested_ops_1, agent_id_1.clone()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if request_op_data
might be a clearer name for this? I can't quite get this into my head
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't find the name very comprehensible either. But it's not a request, it's a response to a request which pulls ops from the store and sends them to the requesting agent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see what you're saying but in my head it kind of is a request. We're choosing to queue the work in the implementation and this doesn't actually respond in the function return but it is still a remote peer requesting us to do work... Naming is hard :)
Maybe something like handle_op_data_request
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's good and fits the handle_incoming_ops
in the next PR. I'll change it to handle_op_request
. I'd change add_ops
to request_ops
too.
} | ||
|
||
#[tokio::test(flavor = "multi_thread")] | ||
async fn no_response_sent_when_no_ops_found() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Possible to also test agent info not in peer store?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and also it'd be great to test error, and then queuing the request again for success. Just to demonstrate that there's no state preventing that working?
587bc39
to
0368024
Compare
- Adds protobuf definition for an individual `MetaOp` to `op_store`. - Adds protobuf definition for a list of ops to `fetch`.
57d09d2
to
edbf162
Compare
edbf162
to
a841ce5
Compare
Adds a simple response queue to the fetch module which processes fetch requests one by one by loading requested ops from an op store and sending the available ops to the requester.
depends on #60
resolves #30