Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(torii-core): specify namespaces to exclusively index #2687

Merged
merged 5 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions bin/torii/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@
let (block_tx, block_rx) = tokio::sync::mpsc::channel(100);

let mut flags = IndexingFlags::empty();
if args.indexing.index_transactions {
if args.indexing.transactions {

Check warning on line 126 in bin/torii/src/main.rs

View check run for this annotation

Codecov / codecov/patch

bin/torii/src/main.rs#L126

Added line #L126 was not covered by tests
flags.insert(IndexingFlags::TRANSACTIONS);
}
if args.events.raw {
Expand All @@ -140,11 +140,12 @@
start_block: 0,
blocks_chunk_size: args.indexing.blocks_chunk_size,
events_chunk_size: args.indexing.events_chunk_size,
index_pending: args.indexing.index_pending,
index_pending: args.indexing.pending,

Check warning on line 143 in bin/torii/src/main.rs

View check run for this annotation

Codecov / codecov/patch

bin/torii/src/main.rs#L143

Added line #L143 was not covered by tests
polling_interval: Duration::from_millis(args.indexing.polling_interval),
flags,
event_processor_config: EventProcessorConfig {
historical_events: args.events.historical.unwrap_or_default().into_iter().collect(),
historical_events: args.events.historical.into_iter().collect(),
namespaces: args.indexing.namespaces.into_iter().collect(),

Check warning on line 148 in bin/torii/src/main.rs

View check run for this annotation

Codecov / codecov/patch

bin/torii/src/main.rs#L147-L148

Added lines #L147 - L148 were not covered by tests
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ohayo! Documentation needed for the new namespaces feature, sensei!

The addition of namespace filtering is a significant feature that deserves proper documentation. Please consider:

  1. Adding documentation comments explaining the purpose and usage of namespaces
  2. Including example configurations in the README
  3. Adding tests to verify namespace filtering behavior

Would you like me to help draft the documentation and test cases for this new feature?

},
},
shutdown_tx.clone(),
Expand Down
16 changes: 7 additions & 9 deletions crates/torii/cli/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ mod test {
assert_eq!(torii_args.rpc, Url::parse("http://0.0.0.0:6060").unwrap());
assert_eq!(torii_args.db_dir, Some(PathBuf::from("/tmp/torii-test2")));
assert!(!torii_args.events.raw);
assert_eq!(torii_args.events.historical, Some(vec!["a-A".to_string()]));
assert_eq!(torii_args.events.historical, vec!["a-A".to_string()]);
assert_eq!(torii_args.server, ServerOptions::default());
}

Expand All @@ -261,13 +261,14 @@ mod test {

[indexing]
events_chunk_size = 9999
index_pending = true
pending = true
max_concurrent_tasks = 1000
index_transactions = false
transactions = false
contracts = [
"erc20:0x1234",
"erc721:0x5678"
]
namespaces = []
"#;
let path = std::env::temp_dir().join("torii-config.json");
std::fs::write(&path, content).unwrap();
Expand All @@ -282,16 +283,13 @@ mod test {
assert_eq!(torii_args.rpc, Url::parse("http://0.0.0.0:2222").unwrap());
assert_eq!(torii_args.db_dir, Some(PathBuf::from("/tmp/torii-test")));
assert!(!torii_args.events.raw);
assert_eq!(
torii_args.events.historical,
Some(vec!["ns-E".to_string(), "ns-EH".to_string()])
);
assert_eq!(torii_args.events.historical, vec!["ns-E".to_string(), "ns-EH".to_string()]);
assert_eq!(torii_args.indexing.events_chunk_size, 9999);
assert_eq!(torii_args.indexing.blocks_chunk_size, 10240);
assert!(torii_args.indexing.index_pending);
assert!(torii_args.indexing.pending);
assert_eq!(torii_args.indexing.polling_interval, 500);
assert_eq!(torii_args.indexing.max_concurrent_tasks, 1000);
assert!(!torii_args.indexing.index_transactions);
assert!(!torii_args.indexing.transactions);
assert_eq!(
torii_args.indexing.contracts,
vec![
Expand Down
22 changes: 16 additions & 6 deletions crates/torii/cli/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@

/// Enable indexing pending blocks
#[arg(long = "indexing.pending", action = ArgAction::Set, default_value_t = true, help = "Whether or not to index pending blocks.")]
pub index_pending: bool,
pub pending: bool,

Check warning on line 103 in crates/torii/cli/src/options.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/cli/src/options.rs#L103

Added line #L103 was not covered by tests

/// Polling interval in ms
#[arg(
Expand All @@ -127,7 +127,7 @@
default_value_t = false,
help = "Whether or not to index world transactions and keep them in the database."
)]
pub index_transactions: bool,
pub transactions: bool,

Check warning on line 130 in crates/torii/cli/src/options.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/cli/src/options.rs#L130

Added line #L130 was not covered by tests

/// ERC contract addresses to index
#[arg(
Expand All @@ -138,18 +138,28 @@
)]
#[serde(deserialize_with = "deserialize_contracts")]
pub contracts: Vec<Contract>,

/// Namespaces to index
#[arg(
long = "indexing.namespaces",
value_delimiter = ',',
help = "The namespaces of the world that torii should index. If empty, all namespaces \
will be indexed."
)]
pub namespaces: Vec<String>,

Check warning on line 149 in crates/torii/cli/src/options.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/cli/src/options.rs#L149

Added line #L149 was not covered by tests
}

impl Default for IndexingOptions {
fn default() -> Self {
Self {
events_chunk_size: DEFAULT_EVENTS_CHUNK_SIZE,
blocks_chunk_size: DEFAULT_BLOCKS_CHUNK_SIZE,
index_pending: true,
index_transactions: false,
pending: true,
transactions: false,
contracts: vec![],
polling_interval: DEFAULT_POLLING_INTERVAL,
max_concurrent_tasks: DEFAULT_MAX_CONCURRENT_TASKS,
namespaces: vec![],
}
}
}
Expand All @@ -168,12 +178,12 @@
value_delimiter = ',',
help = "Event messages that are going to be treated as historical during indexing."
)]
pub historical: Option<Vec<String>>,
pub historical: Vec<String>,
}

impl Default for EventsOptions {
fn default() -> Self {
Self { raw: true, historical: None }
Self { raw: true, historical: vec![] }
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/torii/core/src/processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const ENTITY_ID_INDEX: usize = 1;
#[derive(Clone, Debug, Default)]
pub struct EventProcessorConfig {
pub historical_events: HashSet<String>,
pub namespaces: HashSet<String>,
}

#[async_trait]
Expand Down
8 changes: 7 additions & 1 deletion crates/torii/core/src/processors/register_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ where
block_timestamp: u64,
_event_id: &str,
event: &Event,
_config: &EventProcessorConfig,
config: &EventProcessorConfig,
) -> Result<(), Error> {
// Torii version is coupled to the world version, so we can expect the event to be well
// formed.
Expand All @@ -58,6 +58,12 @@ where
let namespace = event.namespace.to_string().unwrap();
let name = event.name.to_string().unwrap();

// If the namespace is not in the list of namespaces to index, silently ignore it.
// If our config is empty, we index all namespaces.
if !config.namespaces.is_empty() && !config.namespaces.contains(&namespace) {
return Ok(());
}

// Called model here by language, but it's an event. Torii rework will make clear
// distinction.
let model = world.model_reader(&namespace, &name).await?;
Expand Down
8 changes: 7 additions & 1 deletion crates/torii/core/src/processors/register_model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ where
block_timestamp: u64,
_event_id: &str,
event: &Event,
_config: &EventProcessorConfig,
config: &EventProcessorConfig,
) -> Result<(), Error> {
// Torii version is coupled to the world version, so we can expect the event to be well
// formed.
Expand All @@ -58,6 +58,12 @@ where
let namespace = event.namespace.to_string().unwrap();
let name = event.name.to_string().unwrap();

// If the namespace is not in the list of namespaces to index, silently ignore it.
// If our config is empty, we index all namespaces.
if !config.namespaces.is_empty() && !config.namespaces.contains(&namespace) {
return Ok(());
}

let model = world.model_reader(&namespace, &name).await?;
let schema = model.schema().await?;
let layout = model.layout().await?;
Expand Down
14 changes: 12 additions & 2 deletions crates/torii/core/src/processors/store_del_record.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use anyhow::{Error, Ok, Result};
use anyhow::{Error, Result};
use async_trait::async_trait;
use dojo_world::contracts::abigen::world::Event as WorldEvent;
use dojo_world::contracts::world::WorldContractReader;
Expand Down Expand Up @@ -51,7 +51,17 @@
}
};

let model = db.model(event.selector).await?;
// If the model does not exist, silently ignore it.
// This can happen if only specific namespaces are indexed.
let model = match db.model(event.selector).await {
Ok(m) => m,
Err(e) => {
if e.to_string().contains("no rows") {

Check warning on line 59 in crates/torii/core/src/processors/store_del_record.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/processors/store_del_record.rs#L59

Added line #L59 was not covered by tests
return Ok(());
}
return Err(e);

Check warning on line 62 in crates/torii/core/src/processors/store_del_record.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/processors/store_del_record.rs#L62

Added line #L62 was not covered by tests
}
};

info!(
target: LOG_TARGET,
Expand Down
14 changes: 12 additions & 2 deletions crates/torii/core/src/processors/store_set_record.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use anyhow::{Error, Ok, Result};
use anyhow::{Error, Result};
use async_trait::async_trait;
use dojo_world::contracts::abigen::world::Event as WorldEvent;
use dojo_world::contracts::world::WorldContractReader;
Expand Down Expand Up @@ -52,7 +52,17 @@
}
};

let model = db.model(event.selector).await?;
// If the model does not exist, silently ignore it.
// This can happen if only specific namespaces are indexed.
let model = match db.model(event.selector).await {
Ok(m) => m,
Err(e) => {
if e.to_string().contains("no rows") {

Check warning on line 60 in crates/torii/core/src/processors/store_set_record.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/processors/store_set_record.rs#L60

Added line #L60 was not covered by tests
return Ok(());
}
return Err(e);

Check warning on line 63 in crates/torii/core/src/processors/store_set_record.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/processors/store_set_record.rs#L63

Added line #L63 was not covered by tests
}
};

info!(
target: LOG_TARGET,
Expand Down
13 changes: 12 additions & 1 deletion crates/torii/core/src/processors/store_update_member.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,18 @@
let entity_id = event.data[ENTITY_ID_INDEX];
let member_selector = event.data[MEMBER_INDEX];

let model = db.model(model_id).await?;
// If the model does not exist, silently ignore it.
// This can happen if only specific namespaces are indexed.
let model = match db.model(model_id).await {
Ok(m) => m,
Err(e) => {
if e.to_string().contains("no rows") {
return Ok(());
}
return Err(e);
}
};

Check warning on line 70 in crates/torii/core/src/processors/store_update_member.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/processors/store_update_member.rs#L59-L70

Added lines #L59 - L70 were not covered by tests
let schema = model.schema;

let mut member = schema
Expand Down
14 changes: 12 additions & 2 deletions crates/torii/core/src/processors/store_update_record.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use anyhow::{Error, Ok, Result};
use anyhow::{Error, Result};
use async_trait::async_trait;
use dojo_types::schema::Ty;
use dojo_world::contracts::abigen::world::Event as WorldEvent;
Expand Down Expand Up @@ -55,7 +55,17 @@
let model_selector = event.selector;
let entity_id = event.entity_id;

let model = db.model(model_selector).await?;
// If the model does not exist, silently ignore it.
// This can happen if only specific namespaces are indexed.
let model = match db.model(event.selector).await {
Ok(m) => m,
Err(e) => {
if e.to_string().contains("no rows") {

Check warning on line 63 in crates/torii/core/src/processors/store_update_record.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/processors/store_update_record.rs#L63

Added line #L63 was not covered by tests
return Ok(());
}
return Err(e);

Check warning on line 66 in crates/torii/core/src/processors/store_update_record.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/processors/store_update_record.rs#L66

Added line #L66 was not covered by tests
}
};

info!(
target: LOG_TARGET,
Expand Down
Loading