Skip to content

Commit 90f081c

Browse files
committed
RMME reset
1 parent 069538c commit 90f081c

File tree

4 files changed

+30
-7
lines changed

4 files changed

+30
-7
lines changed

src/mqueue.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use std::{
1010

1111
use compact_encoding::CompactEncoding as _;
1212
use futures::{Sink, Stream};
13-
use tracing::{error, instrument};
13+
use tracing::{error, info, instrument};
1414

1515
use crate::{message::ChannelMessage, noise::EncryptionInfo, NoiseEvent};
1616

@@ -119,9 +119,18 @@ impl<IO: Stream<Item = NoiseEvent> + Sink<Vec<u8>> + Send + Unpin + 'static> Str
119119
{
120120
type Item = MqueueEvent;
121121

122+
#[instrument(skip_all, ret)]
122123
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
123124
let _ = self.poll_outbound(cx);
124-
self.poll_inbound(cx)
125+
match self.poll_inbound(cx) {
126+
Poll::Ready(Some(MqueueEvent::Message(Ok(x)))) => {
127+
for m in x.iter() {
128+
info!("RX ChannelMessage::{m}");
129+
}
130+
Poll::Ready(Some(MqueueEvent::Message(Ok(x))))
131+
}
132+
x => x,
133+
}
125134
}
126135
}
127136

src/noise.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ where
110110
}
111111

112112
/// Check that we've done as much work as possible. Sending, receiving, encrypting and decrypting.
113-
#[instrument(skip_all, ret)]
113+
#[instrument(name = "did_as_much_as_possible", skip_all, ret)]
114114
fn did_as_much_as_possible(&mut self, cx: &mut Context<'_>) -> bool {
115115
// No incoming encrypted messages available.
116116
self.poll_incomming_encrypted_messages(cx).is_pending()
@@ -124,8 +124,7 @@ where
124124

125125
/// Handle all message throughput. Sends, encrypts and decrypts messages
126126
/// Returns `true` `step` is already [`Step::Established`].
127-
#[allow(clippy::too_many_arguments)]
128-
#[instrument(skip_all, ret)]
127+
#[instrument(name = "poll_message_throughput", skip_all, ret)]
129128
fn poll_message_throughput(&mut self, cx: &mut Context<'_>) -> bool {
130129
self.poll_outgoing_encrypted_messages(cx);
131130
let _ = self.poll_incomming_encrypted_messages(cx);

src/test_utils.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,9 +86,10 @@ pub(crate) fn log() {
8686
.with_targets(true)
8787
.with_bracketed_fields(true)
8888
.with_indent_lines(true)
89-
.with_span_modes(true)
9089
.with_thread_ids(false)
91-
.with_thread_names(false);
90+
.with_thread_names(true)
91+
//.with_span_modes(true)
92+
;
9293

9394
tracing_subscriber::registry()
9495
.with(env_filter)

tests/js_interop.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use tokio::{
2727
task,
2828
time::sleep,
2929
};
30+
use tracing::instrument;
3031

3132
use hypercore_protocol::{discovery_key, schema::*, Channel, Event, Message, ProtocolBuilder};
3233

@@ -186,6 +187,7 @@ async fn rcns(server_writer: bool, port: u32) -> Result<()> {
186187
&result_path,
187188
)
188189
.await?;
190+
dbg!();
189191
assert_result(result_path, item_count, item_size, data_char).await?;
190192
drop(server);
191193

@@ -330,11 +332,15 @@ async fn run_client(
330332
data_path: &str,
331333
result_path: &str,
332334
) -> Result<()> {
335+
dbg!();
333336
let hypercore = if is_writer {
337+
dbg!();
334338
create_writer_hypercore(data_count, data_size, data_char, data_path).await?
335339
} else {
340+
dbg!();
336341
create_reader_hypercore(data_path).await?
337342
};
343+
dbg!();
338344
let hypercore_wrapper = HypercoreWrapper::from_disk_hypercore(
339345
hypercore,
340346
if is_writer {
@@ -343,7 +349,9 @@ async fn run_client(
343349
Some(result_path.to_string())
344350
},
345351
);
352+
dbg!();
346353
tcp_client(port, on_replication_connection, Arc::new(hypercore_wrapper)).await?;
354+
dbg!();
347355
Ok(())
348356
}
349357

@@ -433,28 +441,34 @@ pub fn get_test_key_pair(include_secret: bool) -> PartialKeypair {
433441
}
434442

435443
#[cfg(feature = "tokio")]
444+
#[instrument(skip_all)]
436445
async fn on_replication_connection(
437446
stream: TcpStream,
438447
is_initiator: bool,
439448
hypercore: Arc<HypercoreWrapper>,
440449
) -> Result<()> {
450+
use tracing::info;
451+
441452
let mut protocol = ProtocolBuilder::new(is_initiator).connect(stream.compat());
442453
while let Some(event) = protocol.next().await {
443454
let event = event?;
444455
match event {
445456
Event::Handshake(_) => {
457+
info!("Event::Handshake");
446458
if is_initiator {
447459
protocol.open(*hypercore.key()).await?;
448460
}
449461
}
450462
Event::DiscoveryKey(dkey) => {
463+
info!("Event::DiscoveryKey");
451464
if hypercore.discovery_key == dkey {
452465
protocol.open(*hypercore.key()).await?;
453466
} else {
454467
panic!("Invalid discovery key");
455468
}
456469
}
457470
Event::Channel(channel) => {
471+
info!("Event::Channel");
458472
hypercore.on_replication_peer(channel);
459473
}
460474
Event::Close(_dkey) => {

0 commit comments

Comments
 (0)