Skip to content

Commit

Permalink
SuiSyncer
Browse files Browse the repository at this point in the history
  • Loading branch information
longbowlu committed Dec 5, 2023
1 parent dfe0dbf commit 0629d81
Show file tree
Hide file tree
Showing 10 changed files with 268 additions and 48 deletions.
23 changes: 6 additions & 17 deletions crates/sui-bridge/src/eth_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ use std::sync::Arc;
use tokio::sync::watch;
use tokio::task::JoinHandle;
use tokio::time::{self, Duration};
use tokio_retry::strategy::{jitter, ExponentialBackoff};
use tokio_retry::Retry;

const ETH_EVENTS_CHANNEL_SIZE: usize = 1000;
const FINALIZED_BLOCK_QUERY_INTERVAL: Duration = Duration::from_secs(2);
Expand Down Expand Up @@ -94,6 +96,9 @@ where
)
.expect("Failed to get last finalzied block from eth client after retry");
tracing::debug!("Last finalized block: {}", new_value);

// TODO add a metrics for the last finalized block

if new_value > last_block_number {
last_finalized_block_sender
.send(new_value)
Expand Down Expand Up @@ -138,29 +143,13 @@ where
.send(events)
.await
.expect("All Eth event channel receivers are closed");
tracing::info!(
contract_address=?contract_address,
"Observed {len} new events",
);
tracing::info!(?contract_address, "Observed {len} new Eth events",);
}
start_block = new_finalized_block + 1;
}
}
}

use tokio_retry::strategy::{jitter, ExponentialBackoff};
use tokio_retry::Retry;

#[macro_export]
macro_rules! retry_with_max_delay {
($func:expr, $max_delay:expr) => {{
let retry_strategy = ExponentialBackoff::from_millis(100)
.max_delay($max_delay)
.map(jitter);
Retry::spawn(retry_strategy, || $func).await
}};
}

#[cfg(test)]
mod tests {
use std::{collections::HashSet, str::FromStr};
Expand Down
11 changes: 11 additions & 0 deletions crates/sui-bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,20 @@ pub mod events;
pub mod handler;
pub mod server;
pub mod sui_client;
pub mod sui_syncer;

#[cfg(test)]
pub(crate) mod eth_mock_provider;

#[cfg(test)]
pub(crate) mod sui_mock_client;

#[macro_export]
macro_rules! retry_with_max_delay {
($func:expr, $max_delay:expr) => {{
let retry_strategy = ExponentialBackoff::from_millis(100)
.max_delay($max_delay)
.map(jitter);
Retry::spawn(retry_strategy, || $func).await
}};
}
31 changes: 14 additions & 17 deletions crates/sui-bridge/src/sui_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use tap::TapFallible;
use crate::error::{BridgeError, BridgeResult};
use crate::events::SuiBridgeEvent;

pub(crate) struct SuiClient<P> {
pub struct SuiClient<P> {
inner: P,
}

Expand Down Expand Up @@ -84,10 +84,7 @@ where
let mut is_first_page = true;
let mut all_events: Vec<sui_json_rpc_types::SuiEvent> = vec![];
loop {
let events = self
.inner
.query_events(filter.clone(), cursor.clone())
.await?;
let events = self.inner.query_events(filter.clone(), cursor).await?;
if events.data.is_empty() {
return Ok(Page {
data: all_events,
Expand All @@ -97,7 +94,7 @@ where
}

// unwrap safe: we just checked data is not empty
let new_cursor = events.data.last().unwrap().id.clone();
let new_cursor = events.data.last().unwrap().id;

// Now check if we need to query more events for the sake of
// paginating in transaction granularity
Expand Down Expand Up @@ -349,7 +346,7 @@ mod tests {
next_cursor: Some(event_2.id.clone()),
has_next_page: true,
};
mock_client.add_event_response(package, module.clone(), event_1.id.clone(), events_page_2);
mock_client.add_event_response(package, module.clone(), event_1.id, events_page_2);
// page 3 (event 3, event 4, different tx_digest)
let mut event_3 = SuiEvent::random_for_testing();
event_3.id.tx_digest = event_2.id.tx_digest;
Expand All @@ -358,10 +355,10 @@ mod tests {
assert_ne!(event_3.id.tx_digest, event_4.id.tx_digest);
let events_page_3 = EventPage {
data: vec![event_3.clone(), event_4.clone()],
next_cursor: Some(event_4.id.clone()),
next_cursor: Some(event_4.id),
has_next_page: true,
};
mock_client.add_event_response(package, module.clone(), event_2.id.clone(), events_page_3);
mock_client.add_event_response(package, module.clone(), event_2.id, events_page_3);
let page: Page<SuiEvent, TransactionDigest> = sui_client
.query_events_by_module(package, module.clone(), cursor)
.await
Expand Down Expand Up @@ -390,23 +387,23 @@ mod tests {
// second page
assert_eq!(
mock_client.pop_front_past_event_query_params().unwrap(),
(package, module.clone(), event_1.id.clone())
(package, module.clone(), event_1.id)
);
// third page
assert_eq!(
mock_client.pop_front_past_event_query_params().unwrap(),
(package, module.clone(), event_2.id.clone())
(package, module.clone(), event_2.id)
);
// no more
assert_eq!(mock_client.pop_front_past_event_query_params(), None);

// Case 4, modify page 3 in case 3 to return event_4 only
let events_page_3 = EventPage {
data: vec![event_4.clone()],
next_cursor: Some(event_4.id.clone()),
next_cursor: Some(event_4.id),
has_next_page: true,
};
mock_client.add_event_response(package, module.clone(), event_2.id.clone(), events_page_3);
mock_client.add_event_response(package, module.clone(), event_2.id, events_page_3);
let page: Page<SuiEvent, TransactionDigest> = sui_client
.query_events_by_module(package, module.clone(), cursor)
.await
Expand Down Expand Up @@ -434,23 +431,23 @@ mod tests {
// second page
assert_eq!(
mock_client.pop_front_past_event_query_params().unwrap(),
(package, module.clone(), event_1.id.clone())
(package, module.clone(), event_1.id)
);
// third page
assert_eq!(
mock_client.pop_front_past_event_query_params().unwrap(),
(package, module.clone(), event_2.id.clone())
(package, module.clone(), event_2.id)
);
// no more
assert_eq!(mock_client.pop_front_past_event_query_params(), None);

// Case 5, modify page 2 in case 3 to mark has_next_page as false
let events_page_2 = EventPage {
data: vec![event_2.clone()],
next_cursor: Some(event_2.id.clone()),
next_cursor: Some(event_2.id),
has_next_page: false,
};
mock_client.add_event_response(package, module.clone(), event_1.id.clone(), events_page_2);
mock_client.add_event_response(package, module.clone(), event_1.id, events_page_2);
let page: Page<SuiEvent, TransactionDigest> = sui_client
.query_events_by_module(package, module.clone(), cursor)
.await
Expand Down
4 changes: 2 additions & 2 deletions crates/sui-bridge/src/sui_mock_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,10 @@ impl SuiClientInner for SuiMockClient {
self.past_event_query_params.lock().unwrap().push_back((
package,
module.clone(),
cursor.clone(),
cursor,
));
Ok(events
.get(&(package, module.clone(), cursor.clone()))
.get(&(package, module.clone(), cursor))
.cloned()
.unwrap_or_else(|| {
panic!(
Expand Down
Loading

0 comments on commit 0629d81

Please sign in to comment.