Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

misc/mdns: Update to futures-preview #1247

Merged
merged 25 commits into from
Nov 20, 2019

Conversation

mxinden
Copy link
Member

@mxinden mxinden commented Sep 18, 2019

This pull request updates misc/mdns to use futures-preview 0.3.0-alpha.17. It is ready for review but still has a couple of TODO comments that I would like to have your input on.

Copy link
Member

@tomaka tomaka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we might have to ask for help from the async-std people.

@@ -147,21 +158,27 @@ impl MdnsService {
Ok(MdnsService {
socket,
query_socket: UdpSocket::bind((Ipv4Addr::from([0u8, 0, 0, 0]), 0u16)).await?,
query_interval: Interval::new(Duration::from_secs(20)),
query_interval: Interval::new_at(Instant::now(), Duration::from_secs(20)),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simply to not have to wait for 20 seconds within the unit tests. Other than adding a bit of complexity I don't see it having an impact at the compiled logic.

The same is done in current master:

query_interval: Interval::new_at(Instant::now(), Duration::from_secs(20)),

// around this with the additional scope on match. Is there a
// prettier way?
match {
let fut = self.query_socket.send_to(&to_send, *IPV4_MDNS_MULTICAST_ADDRESS);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That doesn't work, you're dropping the future. There's a lot more code to write here for it to work unfortunately.

match {
let fut = self.socket.recv_from(&mut self.recv_buffer);
futures::pin_mut!(fut);
fut.poll(cx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same remark.

Libp2p-mdns uses the async-std crate for network io. This crate only
offers async send and receive functions. In order to use this in non
async/await functions one needs to keep the future returned by the crate
functions around across `poll` invocations.

The future returned by the crate functions references the io resource.
Thus one has to keep both the io resource as well as the future
referencing it. This results in a self-referencing struct which is not
possible to create with safe Rust.

Instead, by having `MdnsService::next` (former `MdnsService::poll`) take
ownership of `self`, the Rust async magic takes care of the above (See
code comments for more details).

As a (negative) side effect, given that `MdnsService::next` takes
ownership of `self`, there is nothing to bind the lifetime of the
returned `MdnsPacket` to. With no better solution in mind, this patch
makes `MdnsPacket` static, not referencing the `MdnsService` receive
buffer.
@mxinden
Copy link
Member Author

mxinden commented Oct 30, 2019

As discussed I included option 2 (not dropping the udp socket futures by having poll (now next) take ownership of itself). See 40a043e.

@tomaka @twittner when you have some time, could you give this another review?


/// List of nodes that we have discovered, the address, and when their TTL expires.
///
/// Each combination of `PeerId` and `Multiaddr` can only appear once, but the same `PeerId`
/// can appear multiple times.
//
// TODO: Why this optimization? Is mdns ever within the hot path of an
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't an optimization. It is necessary in order to store the Multiaddresses of the nodes we know about. Without it, you can't implement addresses_of_peer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can this not be a simple heap allocated Vec instead of a maybe stack allocated SmallVec?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it doesn't really matter.

///
/// `None` if `discovered_nodes` is empty.
// TODO: Would a simple std Instant suffice as well?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We currently report expired nodes as events of the behaviour. If we want these to be reported "pro-actively", then it has to be a Delay.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Thanks!

Copy link
Member

@tomaka tomaka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, the API of the service doesn't really make sense anymore now that the MdnsPacket is detached from the service.

closest_expiration: Option<Delay>,

/// Marker to pin the generic.
marker: PhantomData<TSubstream>,
}

/// Polling the async MdnsService within a non-async `poll` function forces one to keep the returned
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need to explain that. The second part of the doc-comment is enough.

let event = match self.service.poll(cx) {
Poll::Ready(ev) => ev,
Poll::Pending => return Poll::Pending,
let service = std::mem::replace(&mut self.service, MaybeBusyMdnsService::Unreachable);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style nit-pick.

Suggested change
let service = std::mem::replace(&mut self.service, MaybeBusyMdnsService::Unreachable);
let service = mem::replace(&mut self.service, MaybeBusyMdnsService::Unreachable);

enum MaybeBusyMdnsService {
Free(MdnsService),
Busy(Pin<Box<dyn Future<Output = (MdnsService, MdnsPacket)> + Send>>),
Unreachable,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is technically reachable if we have a panic and unwind, so I'd prefer if this was called Poisoned.

Suggested change
Unreachable,
Poisoned,

// TODO: This should move to a constant, right?
Duration::from_secs(5 * 60)
).unwrap());
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(something like that)

Suggested change
}
} else { debug_assert!(false); }

// TODO: This should move to a constant, right? Also can it be consolidated with
// the query response TTL?
service.enqueue_response(disc.build_response(Duration::from_secs(5 * 60)));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
}
} else { debug_assert!(false); }


/// Returns a future resolving to itself and the next received `MdnsPacket`.
///
/// **Note**: Why does `next` take ownership of itself?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would explain this in a regular comment, but not in a doc-comment.

let query = dns::build_query();
self.query_send_buffers.push(query.to_vec());
}
if let Some(_) = self.query_interval.next().now_or_never() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not correct, unfortunately.
Once you reach recv().await, you will block forever until the socket has data for you.
Instead, what we want is for query_interval to "interrupt" that and dispatch a new request.

self,
peer_id: PeerId,
addresses: TAddresses,
ttl: Duration,
) -> Result<(), MdnsResponseError>
) -> Result<Vec<u8>, MdnsResponseError>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, that's not great.
The respond method was a way to tell the service to send a response.
Now that MdnsQuery is detached from the service, it doesn't make sense to keep the API similar to what it was before.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some more details here? Due to the fact that MdnsQuery does not have a reference to the send_buffer anymore it only builds the response, which later on is send by the Mdns NetworkBehaviour implementation to the MdnsService.

Copy link
Member

@tomaka tomaka Nov 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean is that it doesn't make sense for the MdnsQuery object itself to build the response. With the exception of query_id, the self parameter isn't necessary. It used to be like that because self.send_buffers is an implementation detail that is hidden from the user.
This method should now be a free-standing function instead.

pub fn respond(self, ttl: Duration) {
let response = dns::build_service_discovery_response(self.query_id, ttl);
self.send_buffers.push(response);
pub fn build_response(self, ttl: Duration) -> Vec<u8> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same remark.

self.packet.answers.iter().filter_map(move |record| {
impl MdnsResponse {
/// Creates a new `MdnsResponse` based on the provided `Packet`.
pub fn new(packet: Packet, from: SocketAddr) -> MdnsResponse {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub fn new(packet: Packet, from: SocketAddr) -> MdnsResponse {
fn new(packet: Packet, from: SocketAddr) -> MdnsResponse {

@mxinden
Copy link
Member Author

mxinden commented Nov 11, 2019

@tomaka Could you take another look at this?

@mxinden mxinden marked this pull request as ready for review November 11, 2019 18:55
Copy link
Member

@tomaka tomaka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The general polling logic looks good 👍 , but the API surface not so much in my opinion.
I'll take another look and might have more comments.

/// Returns the list of peers that have been reported in this packet.
///
/// > **Note**: Keep in mind that this will also contain the responses we sent ourselves.
pub fn discovered_peers(&self) -> &Vec<MdnsPeer> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub fn discovered_peers(&self) -> &Vec<MdnsPeer> {
pub fn discovered_peers(&self) -> impl Iterator<Item = &MdnsPeer> {

self.packet
impl MdnsPeer {
/// Creates a new `MdnsPeer` based on the provided `Packet`.
pub fn new(packet: &Packet, record_value: String, my_peer_id: PeerId, ttl: u32) -> MdnsPeer {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MdnsPeer is supposed to be a borrow for a value inside of a Packet, similar to a MutexGuard being a borrow of a Mutex for example.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Packet is not static, thus we need to do the parsing when constructing the MdnsPeer as the final MdnsPeer needs to be static.

use crate::service::{MdnsPacket, MdnsService};
use multiaddr::multihash::*;

fn discover(peer_id: PeerId) {
let mut service = MdnsService::new().unwrap();
let stream = stream::poll_fn(move |cx| -> Poll<Option<Result<(), io::Error>>> {
block_on (async {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
block_on (async {
block_on(async {

}
}
}
MdnsPacket::ServiceDiscovery(_) => {}
MdnsPacket::ServiceDiscovery(_) => {panic!("did not expect a service discovery packet")}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
MdnsPacket::ServiceDiscovery(_) => {panic!("did not expect a service discovery packet")}
MdnsPacket::ServiceDiscovery(_) => panic!("did not expect a service discovery packet")

})
}

#[test]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#[test]

#[test]
// As of today the underlying UDP socket is not stubbed out. Thus tests run in parallel to this
// unit tests inter fear with it. Test needs to be run in sequence to ensure test properties.
fn respect_query_interval() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fn respect_query_interval() {
#[test]
fn respect_query_interval() {

@mxinden
Copy link
Member Author

mxinden commented Nov 11, 2019

Thanks for bearing with me @tomaka. The above comment and 17fd131 should address your feedback.

Copy link
Member

@tomaka tomaka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I forgot about this PR.
We had some IRL chat last week about that, and I think it's fine! So, let's merge this if no one has an objection.

@tomaka tomaka merged commit a26620b into libp2p:stable-futures Nov 20, 2019
@mxinden mxinden mentioned this pull request Mar 10, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants