Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change relayer URI address to be vector and use quorum provider #2122

Merged
merged 13 commits into from
Aug 27, 2024
Merged
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

## [Unreleased]

### Added
- [2122](https://github.com/FuelLabs/fuel-core/pull/2122): Changed the relayer URI address to be a vector and use a quorum provider. The `relayer` argument now supports multiple URLs to fetch information from different sources.

### Changed
- [2113](https://github.com/FuelLabs/fuel-core/pull/2113): Modify the way the gas price service and shared algo is initialized to have some default value based on best guess instead of `None`, and initialize service before graphql.
- [2112](https://github.com/FuelLabs/fuel-core/pull/2112): Alter the way the sealed blocks are fetched with a given height.
Expand Down
4 changes: 2 additions & 2 deletions bin/fuel-core/src/cli/run/relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ pub struct RelayerArgs {
#[clap(long = "enable-relayer", action)]
pub enable_relayer: bool,

/// Uri address to ethereum client. It can be in format of `http://localhost:8545/` or `ws://localhost:8545/`.
/// Uri addresses to ethereum client. It can be in format of `http://localhost:8545/` or `ws://localhost:8545/`.
/// If not set relayer will not start.
#[arg(long = "relayer", env)]
#[arg(required_if_eq("enable_relayer", "true"))]
#[arg(requires_if(IsPresent, "enable_relayer"))]
pub relayer: Option<url::Url>,
pub relayer: Option<Vec<url::Url>>,

/// Ethereum contract address. Create EthAddress into fuel_types
#[arg(long = "relayer-v2-listening-contracts", value_parser = parse_h160, value_delimiter = ',', env)]
Expand Down
4 changes: 2 additions & 2 deletions crates/services/relayer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ pub(crate) static ETH_FORCED_TX: Lazy<H256> =
pub struct Config {
/// The da block to which the contract was deployed.
pub da_deploy_height: DaBlockHeight,
/// Uri address to ethereum client.
pub relayer: Option<url::Url>,
/// Uri addresses to ethereum client.
pub relayer: Option<Vec<url::Url>>,
netrome marked this conversation as resolved.
Show resolved Hide resolved
// TODO: Create `EthAddress` into `fuel_core_types`.
/// Ethereum contract address.
pub eth_v2_listening_contracts: Vec<H160>,
Expand Down
25 changes: 16 additions & 9 deletions crates/services/relayer/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ use ethers_providers::{
Middleware,
Provider,
ProviderError,
Quorum,
QuorumProvider,
WeightedProvider,
};
use fuel_core_services::{
RunnableService,
Expand Down Expand Up @@ -55,7 +58,7 @@ type Synced = watch::Receiver<Option<DaBlockHeight>>;
type NotifySynced = watch::Sender<Option<DaBlockHeight>>;

/// The alias of runnable relayer service.
pub type Service<D> = CustomizableService<Provider<Http>, D>;
pub type Service<D> = CustomizableService<Provider<QuorumProvider<Http>>, D>;
type CustomizableService<P, D> = ServiceRunner<NotInitializedTask<P, D>>;

/// The shared state of the relayer task.
Expand Down Expand Up @@ -348,14 +351,18 @@ pub fn new_service<D>(database: D, config: Config) -> anyhow::Result<Service<D>>
where
D: RelayerDb + Clone + 'static,
{
let url = config.relayer.clone().ok_or_else(|| {
anyhow::anyhow!(
"Tried to start Relayer without setting an eth_client in the config"
)
})?;
// TODO: Does this handle https?
let http = Http::new(url);
let eth_node = Provider::new(http);
let urls = config
.relayer
.clone()
.ok_or_else(|| {
anyhow::anyhow!(
"Tried to start Relayer without setting an eth_client in the config"
)
})?
.into_iter()
.map(|url| WeightedProvider::new(Http::new(url)));

let eth_node = Provider::new(QuorumProvider::new(Quorum::Majority, urls));
let retry_on_error = true;
Ok(new_service_internal(
eth_node,
Expand Down
111 changes: 111 additions & 0 deletions crates/services/relayer/src/service/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,117 @@ async fn can_download_logs() {
assert_eq!(result, logs);
}

#[tokio::test]
async fn quorum_agrees_on_logs() {
let eth_node = MockMiddleware::default();
let logs = vec![
Log {
address: Default::default(),
block_number: Some(3.into()),
..Default::default()
},
Log {
address: Default::default(),
block_number: Some(5.into()),
..Default::default()
},
];
eth_node.update_data(|data| data.logs_batch = vec![logs.clone()]);

let eth_state = super::state::test_builder::TestDataSource {
eth_remote_finalized: 5,
eth_local_finalized: Some(1),
};
let eth_state = state::build_eth(&eth_state).await.unwrap();

// Given
let provider = Provider::new(
QuorumProvider::builder()
.add_provider(WeightedProvider::new(eth_node.clone()))
.add_provider(WeightedProvider::new(eth_node))
.quorum(Quorum::Majority)
.build(),
);
let contracts = vec![Default::default()];

// When
let result = download_logs(
&eth_state.needs_to_sync_eth().unwrap(),
contracts,
&provider,
DEFAULT_LOG_PAGE_SIZE,
)
.map_ok(|logs| logs.logs)
.try_concat()
.await
.unwrap();

// Then
assert_eq!(result, logs);
}

#[tokio::test]
async fn quorum__disagree_on_logs() {
let eth_node_two_logs = MockMiddleware::default();
let eth_node_one_log = MockMiddleware::default();
let logs = vec![
Log {
address: Default::default(),
block_number: Some(3.into()),
..Default::default()
},
Log {
address: Default::default(),
block_number: Some(5.into()),
..Default::default()
},
];
eth_node_two_logs.update_data(|data| data.logs_batch = vec![logs.clone()]);
eth_node_one_log.update_data(|data| data.logs_batch = vec![vec![logs[0].clone()]]);

let eth_state = super::state::test_builder::TestDataSource {
eth_remote_finalized: 5,
eth_local_finalized: Some(1),
};
let eth_state = state::build_eth(&eth_state).await.unwrap();

// Given
let provider = Provider::new(
QuorumProvider::builder()
// 3 different providers with 3 different logs
// 2 logs
.add_provider(WeightedProvider::new(eth_node_two_logs))
// 0 logs
.add_provider(WeightedProvider::new(MockMiddleware::default()))
// 1 log
.add_provider(WeightedProvider::new(eth_node_one_log))
.quorum(Quorum::Percentage(70))
.build(),
);
let contracts = vec![Default::default()];

// When
let provider_error = download_logs(
&eth_state.needs_to_sync_eth().unwrap(),
contracts,
&provider,
DEFAULT_LOG_PAGE_SIZE,
)
.map_ok(|logs| logs.logs)
.try_concat()
.await;
// Then

match provider_error {
Err(ProviderError::JsonRpcClientError(e)) => {
assert_eq!(format!("{e}"), "No Quorum reached.");
}
_ => {
panic!("Expected a JsonRpcClientError")
}
}
}

#[tokio::test]
async fn deploy_height_does_not_override() {
let mut mock_db = crate::mock_db::MockDb::default();
Expand Down
20 changes: 19 additions & 1 deletion crates/services/relayer/src/test_helpers/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ impl JsonRpcClient for MockMiddleware {
type Error = ProviderError;

/// Sends a request with the provided JSON-RPC and parameters serialized as JSON
async fn request<T, R>(&self, method: &str, _params: T) -> Result<R, Self::Error>
async fn request<T, R>(&self, method: &str, params: T) -> Result<R, Self::Error>
where
T: Debug + Serialize + Send + Sync,
R: DeserializeOwned,
Expand Down Expand Up @@ -220,6 +220,24 @@ impl JsonRpcClient for MockMiddleware {
serde_json::from_value(res).map_err(Self::Error::SerdeJson)?;
Ok(res)
}
"eth_getLogs" => {
// Decode the filter if T is a vec and the first element is a filter
let filter =
match params.serialize(serde_json::value::Serializer).unwrap() {
serde_json::Value::Array(ref arr) => {
let filter = arr.first().unwrap();
serde_json::from_value(filter.clone()).unwrap()
}
_ => panic!("Expected an array"),
};

let res = serde_json::to_value(self.update_data(|data| {
take_logs_based_on_filter(&data.logs_batch, &filter)
}))?;
let res: R =
serde_json::from_value(res).map_err(Self::Error::SerdeJson)?;
Ok(res)
}
_ => panic!("Request not mocked: {method}"),
}
}
Expand Down
30 changes: 12 additions & 18 deletions tests/tests/relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,10 @@ async fn relayer_can_download_logs() {
let eth_node = Arc::new(eth_node);
let eth_node_handle = spawn_eth_node(eth_node).await;

relayer_config.relayer = Some(
format!("http://{}", eth_node_handle.address)
.as_str()
.try_into()
.unwrap(),
);
relayer_config.relayer = Some(vec![format!("http://{}", eth_node_handle.address)
.as_str()
.try_into()
.unwrap()]);
let db = Database::in_memory();

let srv = FuelService::from_database(db.clone(), config)
Expand Down Expand Up @@ -176,12 +174,10 @@ async fn messages_are_spendable_after_relayer_is_synced() {
let eth_node = Arc::new(eth_node);
let eth_node_handle = spawn_eth_node(eth_node).await;

relayer_config.relayer = Some(
format!("http://{}", eth_node_handle.address)
.as_str()
.try_into()
.unwrap(),
);
relayer_config.relayer = Some(vec![format!("http://{}", eth_node_handle.address)
.as_str()
.try_into()
.unwrap()]);

config.utxo_validation = true;

Expand Down Expand Up @@ -326,12 +322,10 @@ async fn can_restart_node_with_relayer_data() {
let eth_node = Arc::new(eth_node);
let eth_node_handle = spawn_eth_node(eth_node).await;

relayer_config.relayer = Some(
format!("http://{}", eth_node_handle.address)
.as_str()
.try_into()
.unwrap(),
);
relayer_config.relayer = Some(vec![format!("http://{}", eth_node_handle.address)
.as_str()
.try_into()
.unwrap()]);

let capacity = 1024 * 1024;
let tmp_dir = tempfile::TempDir::new().unwrap();
Expand Down
Loading