Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

451 reduce the "already-connected peer" error logs from appearing #454

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions clients/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ mod trace;

const RESET_RESTART_TIME_IN_SECS: u64 = 1800; // reset the restart time in 30 minutes
const DEFAULT_RESTART_TIME_IN_SECS: u64 = 20; // default sleep time before restarting everything
const RESTART_BACKOFF_DELAY: u64 = 10;

#[async_trait]
pub trait Service<Config, InnerError> {
Expand Down Expand Up @@ -75,11 +76,11 @@ impl<Config: Clone + Send + 'static, F: Fn()> ConnectionManager<Config, F> {
&self,
) -> Result<(), Error<InnerError>> {
let mut restart_in_secs = DEFAULT_RESTART_TIME_IN_SECS; // set default to 20 seconds for restart
let mut time_as_of_recording = SystemTime::now();
let mut last_start_timestamp = SystemTime::now();

loop {
let time_now = SystemTime::now();
let _ = time_now.duration_since(time_as_of_recording).map(|duration| {
let _ = time_now.duration_since(last_start_timestamp).map(|duration| {
// Revert the counter if the restart happened more than 30 minutes (or 1800 seconds)
// ago
if duration.as_secs() > RESET_RESTART_TIME_IN_SECS {
Expand All @@ -91,8 +92,8 @@ impl<Config: Clone + Send + 'static, F: Fn()> ConnectionManager<Config, F> {
// Else, these straggler packets will interfere with the new connection.
// https://www.rfc-editor.org/rfc/rfc793#page-22
else {
restart_in_secs += 10;
time_as_of_recording = time_now;
restart_in_secs += RESTART_BACKOFF_DELAY;
last_start_timestamp = time_now;
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ pub(crate) async fn poll_messages_from_stellar(
send_to_node_receiver.close();
drop(send_to_user_sender);

log::info!("poll_messages_from_stellar(): stopped.");
log::debug!("poll_messages_from_stellar(): stopped.");
}

/// Returns Xdr format of the `StellarMessage` sent from the Stellar Node
Expand Down
2 changes: 1 addition & 1 deletion clients/stellar-relay-lib/src/connection/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub enum Error {
#[error(display = "Received Error from Overlay: {:?}", _0)]
OverlayError(ErrorCode),

#[error(display = "Timeout elapsed")]
#[error(display = "Encountered timeout")]
Timeout,

#[error(display = "Config Error: Version String too long")]
Expand Down
78 changes: 38 additions & 40 deletions clients/stellar-relay-lib/src/overlay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,38 @@ impl StellarOverlayConnection {
self.sender.send(msg).await
}

/// Returns an `StellarOverlayConnection` when a connection to Stellar Node is successful.
pub async fn connect(
local_node_info: NodeInfo,
conn_info: ConnectionInfo,
) -> Result<Self, Error> {
log::info!("connect(): connecting to {conn_info:?}");

// this is a channel to communicate with the user/caller.
let (send_to_user_sender, send_to_user_receiver) = mpsc::channel::<StellarMessage>(1024);

let (send_to_node_sender, send_to_node_receiver) = mpsc::channel::<StellarMessage>(1024);

// split the stream for easy handling of read and write
let (read_stream_overlay, write_stream_overlay) =
create_stream(&conn_info.address()).await?;

let mut connector = Connector::new(local_node_info, conn_info, write_stream_overlay);
connector.send_hello_message().await?;

tokio::spawn(poll_messages_from_stellar(
connector,
read_stream_overlay,
send_to_user_sender,
send_to_node_receiver,
));

Ok(StellarOverlayConnection {
sender: send_to_node_sender,
receiver: send_to_user_receiver,
})
}

pub async fn listen(&mut self) -> Result<Option<StellarMessage>, Error> {
loop {
if !self.is_alive() {
Expand All @@ -52,18 +84,18 @@ impl StellarOverlayConnection {
}
}

pub fn is_alive(&self) -> bool {
let result = self.sender.is_closed();
pub fn is_alive(&mut self) -> bool {
let is_closed = self.sender.is_closed();

if result {
drop(self);
if is_closed {
self.disconnect();
}

!result
!is_closed
}

pub fn disconnect(&mut self) {
log::info!("disconnect(): closing channel");
log::info!("disconnect(): closing connection to overlay network");
self.receiver.close();
}
}
Expand All @@ -73,37 +105,3 @@ impl Drop for StellarOverlayConnection {
self.disconnect();
}
}

impl StellarOverlayConnection {
/// Returns an `StellarOverlayConnection` when a connection to Stellar Node is successful.
pub async fn connect(
local_node_info: NodeInfo,
conn_info: ConnectionInfo,
) -> Result<Self, Error> {
log::info!("connect(): connecting to {conn_info:?}");

// this is a channel to communicate with the user/caller.
let (send_to_user_sender, send_to_user_receiver) = mpsc::channel::<StellarMessage>(1024);

let (send_to_node_sender, send_to_node_receiver) = mpsc::channel::<StellarMessage>(1024);

// split the stream for easy handling of read and write
let (read_stream_overlay, write_stream_overlay) =
create_stream(&conn_info.address()).await?;

let mut connector = Connector::new(local_node_info, conn_info, write_stream_overlay);
connector.send_hello_message().await?;

tokio::spawn(poll_messages_from_stellar(
connector,
read_stream_overlay,
send_to_user_sender,
send_to_node_receiver,
));

Ok(StellarOverlayConnection {
sender: send_to_node_sender,
receiver: send_to_user_receiver,
})
}
}
7 changes: 3 additions & 4 deletions clients/vault/src/oracle/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,10 @@ pub async fn start_oracle_agent(
},
}
}

tracing::info!("start_oracle_agent(): LOOP STOPPED!");
});

tokio::spawn(on_shutdown(shutdown_sender.clone(), async move {
tracing::info!("start_oracle_agent(): sending signal to shutdown overlay connection...");
tracing::debug!("start_oracle_agent(): sending signal to shutdown overlay connection...");
if let Err(e) = disconnect_signal_sender.send(()).await {
tracing::warn!("start_oracle_agent(): failed to send disconnect signal: {e:?}");
}
Expand Down Expand Up @@ -218,6 +216,7 @@ mod tests {
#[ntest::timeout(1_800_000)] // timeout at 30 minutes
#[serial]
async fn test_get_proof_for_current_slot() {
env_logger::init();
ebma marked this conversation as resolved.
Show resolved Hide resolved
let shutdown_sender = ShutdownSender::new();
let agent = start_oracle_agent(
get_test_stellar_relay_config(true),
Expand All @@ -236,7 +235,7 @@ mod tests {
}
// use a future slot (2 slots ahead) to ensure enough messages can be collected
// and to avoid "missed" messages.
latest_slot += 2;
latest_slot += 3;

let proof_result = agent.get_proof(latest_slot).await;
assert!(proof_result.is_ok(), "Failed to get proof for slot: {}", latest_slot);
Expand Down
Loading