Skip to content

Commit

Permalink
refactor: Configure coordinator via environment
Browse files Browse the repository at this point in the history
  • Loading branch information
morgsmccauley committed Jan 16, 2024
1 parent 81edb7e commit 0b06171
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 16 deletions.
4 changes: 2 additions & 2 deletions coordinator/src/block_streams_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ pub struct BlockStreamsHandlerImpl {

#[cfg_attr(test, mockall::automock)]
impl BlockStreamsHandlerImpl {
pub async fn connect() -> anyhow::Result<Self> {
let client = BlockStreamerClient::connect("http://[::1]:10000")
pub async fn connect(block_streamer_url: String) -> anyhow::Result<Self> {
let client = BlockStreamerClient::connect(block_streamer_url)
.await
.context("Unable to connect to Block Streamer")?;

Expand Down
4 changes: 2 additions & 2 deletions coordinator/src/executors_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ pub struct ExecutorsHandlerImpl {

#[cfg_attr(test, mockall::automock)]
impl ExecutorsHandlerImpl {
pub async fn connect() -> anyhow::Result<Self> {
let client = RunnerClient::connect("http://localhost:50007")
pub async fn connect(runner_url: String) -> anyhow::Result<Self> {
let client = RunnerClient::connect(runner_url)
.await
.context("Unable to connect to Runner")?;

Expand Down
18 changes: 14 additions & 4 deletions coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,20 @@ async fn main() -> anyhow::Result<()> {
.with(tracing_subscriber::EnvFilter::from_default_env())
.init();

let registry = Registry::connect("https://rpc.mainnet.near.org");
let redis_client = RedisClient::connect("redis://127.0.0.1").await?;
let mut block_streams_handler = BlockStreamsHandler::connect().await?;
let mut executors_handler = ExecutorsHandler::connect().await?;
let rpc_url = std::env::var("RPC_URL").expect("RPC_URL is not set");
let registry_contract_id = std::env::var("REGISTRY_CONTRACT_ID")
.expect("REGISTRY_CONTRACT_ID is not set")
.parse()
.expect("REGISTRY_CONTRACT_ID is not a valid account ID");
let redis_url = std::env::var("RPC_URL").expect("REDIS_URL is not set");
let block_streamer_url =
std::env::var("BLOCK_STREAMER_URL").expect("BLOCK_STREAMER_URL is not set");
let runner_url = std::env::var("RUNNER_URL").expect("RUNNER_URL is not set");

let registry = Registry::connect(registry_contract_id, &rpc_url);
let redis_client = RedisClient::connect(&redis_url).await?;
let mut block_streams_handler = BlockStreamsHandler::connect(block_streamer_url).await?;
let mut executors_handler = ExecutorsHandler::connect(runner_url).await?;

loop {
let indexer_registry = registry.fetch().await?;
Expand Down
4 changes: 2 additions & 2 deletions coordinator/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ pub struct RedisClientImpl {

#[cfg_attr(test, mockall::automock)]
impl RedisClientImpl {
pub async fn connect(redis_connection_str: &str) -> Result<Self, RedisError> {
let connection = redis::Client::open(redis_connection_str)?
pub async fn connect(redis_url: &str) -> Result<Self, RedisError> {
let connection = redis::Client::open(redis_url)?
.get_connection_manager()
.await?;

Expand Down
13 changes: 7 additions & 6 deletions coordinator/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,18 @@ pub use RegistryImpl as Registry;

pub struct RegistryImpl {
json_rpc_client: JsonRpcClient,
registry_contract_id: AccountId,
}

#[cfg_attr(test, mockall::automock)]
impl RegistryImpl {
pub fn connect(rpc_url: &str) -> Self {
pub fn connect(registry_contract_id: AccountId, rpc_url: &str) -> Self {
let json_rpc_client = JsonRpcClient::connect(rpc_url);

Self { json_rpc_client }
Self {
registry_contract_id,
json_rpc_client,
}
}

fn enrich_indexer_registry(
Expand Down Expand Up @@ -87,10 +91,7 @@ impl RegistryImpl {
block_reference: BlockReference::Finality(Finality::Final),
request: QueryRequest::CallFunction {
method_name: "list_indexer_functions".to_string(),
account_id: "dev-queryapi.dataplatform.near"
.to_string()
.try_into()
.unwrap(),
account_id: self.registry_contract_id.clone(),
args: FunctionArgs::from("{}".as_bytes().to_vec()),
},
})
Expand Down

0 comments on commit 0b06171

Please sign in to comment.