diff --git a/crates/torii/core/src/processors/store_set_record.rs b/crates/torii/core/src/processors/store_set_record.rs index 8a2530292d..c5f70a2a54 100644 --- a/crates/torii/core/src/processors/store_set_record.rs +++ b/crates/torii/core/src/processors/store_set_record.rs @@ -76,7 +76,8 @@ where let mut entity = model.schema; entity.deserialize(&mut keys_and_unpacked)?; - db.set_entity(entity, event_id, block_timestamp, entity_id, model_id, &keys_str).await?; + db.set_entity(entity, event_id, block_timestamp, entity_id, model_id, Some(&keys_str)) + .await?; Ok(()) } } diff --git a/crates/torii/core/src/processors/store_update_member.rs b/crates/torii/core/src/processors/store_update_member.rs index 01f1c92c95..567e9e18d0 100644 --- a/crates/torii/core/src/processors/store_update_member.rs +++ b/crates/torii/core/src/processors/store_update_member.rs @@ -1,5 +1,6 @@ use anyhow::{Context, Error, Result}; use async_trait::async_trait; +use dojo_types::schema::{Struct, Ty}; use dojo_world::contracts::naming; use dojo_world::contracts::world::WorldContractReader; use num_traits::ToPrimitive; @@ -50,11 +51,11 @@ where event_id: &str, event: &Event, ) -> Result<(), Error> { - let selector = event.data[MODEL_INDEX]; + let model_id = event.data[MODEL_INDEX]; let entity_id = event.data[ENTITY_ID_INDEX]; let member_selector = event.data[MEMBER_INDEX]; - let model = db.model(selector).await?; + let model = db.model(model_id).await?; let schema = model.schema; let mut member = schema @@ -98,9 +99,9 @@ where } member.ty.deserialize(&mut values)?; + let wrapped_ty = Ty::Struct(Struct { name: schema.name(), children: vec![member] }); - db.set_model_member(&schema.name(), entity_id, false, &member, event_id, block_timestamp) - .await?; + db.set_entity(wrapped_ty, event_id, block_timestamp, entity_id, model_id, None).await?; Ok(()) } } diff --git a/crates/torii/core/src/processors/store_update_record.rs b/crates/torii/core/src/processors/store_update_record.rs index feab5765ec..374e6a5189 100644 --- a/crates/torii/core/src/processors/store_update_record.rs +++ b/crates/torii/core/src/processors/store_update_record.rs @@ -1,6 +1,6 @@ use anyhow::{Context, Error, Ok, Result}; use async_trait::async_trait; -use dojo_world::contracts::naming; +use dojo_types::schema::Ty; use dojo_world::contracts::world::WorldContractReader; use num_traits::ToPrimitive; use starknet::core::types::Event; @@ -9,7 +9,7 @@ use tracing::info; use super::EventProcessor; use crate::processors::{ENTITY_ID_INDEX, MODEL_INDEX}; -use crate::sql::{felts_sql_string, Sql}; +use crate::sql::Sql; pub(crate) const LOG_TARGET: &str = "torii_core::processors::store_update_record"; @@ -64,21 +64,21 @@ where values_start + event.data[values_start].to_usize().context("invalid usize")?; // Skip the length to only get the values as they will be deserialized. - let values = event.data[values_start + 1..=values_end].to_vec(); - - let tag = naming::get_tag(&model.namespace, &model.name); - - // Keys are read from the db, since we don't have access to them when only - // the entity id is passed. - let keys = db.get_entity_keys(entity_id, &tag).await?; - - let keys_str = felts_sql_string(&keys); - let mut keys_and_unpacked = [keys, values].concat(); + let mut values = event.data[values_start + 1..=values_end].to_vec(); let mut entity = model.schema; - entity.deserialize(&mut keys_and_unpacked)?; + match entity { + Ty::Struct(ref mut struct_) => { + // we do not need the keys. the entity Ty has the keys in its schema + // so we should get rid of them to avoid trying to deserialize them + struct_.children.retain(|field| !field.key); + } + _ => return Err(anyhow::anyhow!("Expected struct")), + } + + entity.deserialize(&mut values)?; - db.set_entity(entity, event_id, block_timestamp, entity_id, model_id, &keys_str).await?; + db.set_entity(entity, event_id, block_timestamp, entity_id, model_id, None).await?; Ok(()) } } diff --git a/crates/torii/core/src/sql.rs b/crates/torii/core/src/sql.rs index f97dedae1f..a2cfb731f0 100644 --- a/crates/torii/core/src/sql.rs +++ b/crates/torii/core/src/sql.rs @@ -5,12 +5,12 @@ use std::sync::Arc; use anyhow::{anyhow, Result}; use chrono::Utc; use dojo_types::primitive::Primitive; -use dojo_types::schema::{EnumOption, Member, Struct, Ty}; +use dojo_types::schema::{EnumOption, Member, Ty}; use dojo_world::contracts::abi::model::Layout; use dojo_world::contracts::naming::{compute_selector_from_names, compute_selector_from_tag}; use dojo_world::metadata::WorldMetadata; use sqlx::pool::PoolConnection; -use sqlx::{Pool, Row, Sqlite}; +use sqlx::{Pool, Sqlite}; use starknet::core::types::{Event, Felt, InvokeTransaction, Transaction}; use starknet_crypto::poseidon_hash_many; use tracing::debug; @@ -24,7 +24,7 @@ use crate::types::{ use crate::utils::{must_utc_datetime_from_timestamp, utc_dt_string_from_timestamp}; type IsEventMessage = bool; -type IsStoreUpdateMember = bool; +type IsStoreUpdate = bool; pub const WORLD_CONTRACT_TYPE: &str = "WORLD"; pub const FELT_DELIMITER: &str = "/"; @@ -169,28 +169,35 @@ impl Sql { block_timestamp: u64, entity_id: Felt, model_id: Felt, - keys_str: &str, + keys_str: Option<&str>, ) -> Result<()> { let namespaced_name = entity.name(); let entity_id = format!("{:#x}", entity_id); let model_id = format!("{:#x}", model_id); - let insert_entities = "INSERT INTO entities (id, keys, event_id, executed_at) VALUES (?, \ - ?, ?, ?) ON CONFLICT(id) DO UPDATE SET \ - updated_at=CURRENT_TIMESTAMP, executed_at=EXCLUDED.executed_at, \ - event_id=EXCLUDED.event_id RETURNING *"; + let insert_entities = if keys_str.is_some() { + "INSERT INTO entities (id, event_id, executed_at, keys) VALUES (?, ?, ?, ?) ON \ + CONFLICT(id) DO UPDATE SET updated_at=CURRENT_TIMESTAMP, \ + executed_at=EXCLUDED.executed_at, event_id=EXCLUDED.event_id, keys=EXCLUDED.keys \ + RETURNING *" + } else { + "INSERT INTO entities (id, event_id, executed_at) VALUES (?, ?, ?) ON CONFLICT(id) DO \ + UPDATE SET updated_at=CURRENT_TIMESTAMP, executed_at=EXCLUDED.executed_at, \ + event_id=EXCLUDED.event_id RETURNING *" + }; - self.query_queue.enqueue( - insert_entities, - vec![ - Argument::String(entity_id.clone()), - Argument::String(keys_str.to_string()), - Argument::String(event_id.to_string()), - Argument::String(utc_dt_string_from_timestamp(block_timestamp)), - ], - QueryType::SetEntity(entity.clone()), - ); + let mut arguments = vec![ + Argument::String(entity_id.clone()), + Argument::String(event_id.to_string()), + Argument::String(utc_dt_string_from_timestamp(block_timestamp)), + ]; + + if let Some(keys) = keys_str { + arguments.push(Argument::String(keys.to_string())); + } + + self.query_queue.enqueue(insert_entities, arguments, QueryType::SetEntity(entity.clone())); self.query_queue.enqueue( "INSERT INTO entity_model (entity_id, model_id) VALUES (?, ?) ON CONFLICT(entity_id, \ @@ -204,7 +211,7 @@ impl Sql { path, event_id, (&entity_id, false), - (&entity, false), + (&entity, keys_str.is_none()), block_timestamp, &vec![], ); @@ -271,48 +278,6 @@ impl Sql { Ok(()) } - pub async fn set_model_member( - &mut self, - model_tag: &str, - entity_id: Felt, - is_event_message: bool, - member: &Member, - event_id: &str, - block_timestamp: u64, - ) -> Result<()> { - let entity_id = format!("{:#x}", entity_id); - let path = vec![model_tag.to_string()]; - - let wrapped_ty = - Ty::Struct(Struct { name: model_tag.to_string(), children: vec![member.clone()] }); - - // update model member - self.build_set_entity_queries_recursive( - path, - event_id, - (&entity_id, is_event_message), - (&wrapped_ty, true), - block_timestamp, - &vec![], - ); - self.execute().await?; - - let update_query = "UPDATE entities SET updated_at=CURRENT_TIMESTAMP, executed_at=?, \ - event_id=? WHERE id = ? RETURNING *"; - - self.query_queue.enqueue( - update_query.to_string(), - vec![ - Argument::String(utc_dt_string_from_timestamp(block_timestamp)), - Argument::String(event_id.to_string()), - Argument::String(entity_id.clone()), - ], - QueryType::SetEntity(wrapped_ty), - ); - - Ok(()) - } - pub async fn delete_entity( &mut self, entity_id: Felt, @@ -419,46 +384,6 @@ impl Sql { self.model_cache.model(&selector).await.map_err(|e| e.into()) } - /// Retrieves the keys definition for a given model. - /// The key definition is currently implemented as (`name`, `type`). - pub async fn get_entity_keys_def(&self, model_tag: &str) -> Result> { - let query = sqlx::query_as::<_, (String, String)>( - "SELECT name, type FROM model_members WHERE id = ? AND key = true", - ) - .bind(model_tag); - - let mut conn: PoolConnection = self.pool.acquire().await?; - let rows: Vec<(String, String)> = query.fetch_all(&mut *conn).await?; - Ok(rows.iter().map(|(name, ty)| (name.to_string(), ty.to_string())).collect()) - } - - /// Retrieves the keys for a given entity. - /// The keys are returned in the same order as the keys definition. - pub async fn get_entity_keys(&self, entity_id: Felt, model_tag: &str) -> Result> { - let entity_id = format!("{:#x}", entity_id); - let keys_def = self.get_entity_keys_def(model_tag).await?; - - let keys_names = - keys_def.iter().map(|(name, _)| format!("external_{}", name)).collect::>(); - - let sql = format!("SELECT {} FROM [{}] WHERE id = ?", keys_names.join(", "), model_tag); - let query = sqlx::query(sql.as_str()).bind(entity_id); - - let mut conn: PoolConnection = self.pool.acquire().await?; - - let mut keys: Vec = vec![]; - let result = query.fetch_all(&mut *conn).await?; - - for row in result { - for (i, _) in row.columns().iter().enumerate() { - let value: String = row.try_get(i)?; - keys.push(Felt::from_hex(&value)?); - } - } - - Ok(keys) - } - pub async fn does_entity_exist(&self, model: String, key: Felt) -> Result { let sql = format!("SELECT COUNT(*) FROM [{model}] WHERE id = ?"); @@ -641,7 +566,7 @@ impl Sql { event_id: &str, // The id of the entity and if the entity is an event message entity_id: (&str, IsEventMessage), - entity: (&Ty, IsStoreUpdateMember), + entity: (&Ty, IsStoreUpdate), block_timestamp: u64, indexes: &Vec, ) { diff --git a/crates/torii/core/src/sql_test.rs b/crates/torii/core/src/sql_test.rs index cd7584812c..db60d738ec 100644 --- a/crates/torii/core/src/sql_test.rs +++ b/crates/torii/core/src/sql_test.rs @@ -22,6 +22,8 @@ use crate::processors::generate_event_processors_map; use crate::processors::register_model::RegisterModelProcessor; use crate::processors::store_del_record::StoreDelRecordProcessor; use crate::processors::store_set_record::StoreSetRecordProcessor; +use crate::processors::store_update_member::StoreUpdateMemberProcessor; +use crate::processors::store_update_record::StoreUpdateRecordProcessor; use crate::sql::Sql; pub async fn bootstrap_engine

( @@ -42,6 +44,8 @@ where event: generate_event_processors_map(vec![ Box::new(RegisterModelProcessor), Box::new(StoreSetRecordProcessor), + Box::new(StoreUpdateRecordProcessor), + Box::new(StoreUpdateMemberProcessor), Box::new(StoreDelRecordProcessor), ])?, ..Processors::default() @@ -292,13 +296,16 @@ async fn test_load_from_remote_del() { db.execute().await.unwrap(); } +// Start of Selection #[tokio::test(flavor = "multi_thread")] -async fn test_get_entity_keys() { +async fn test_update_with_set_record() { + // Initialize the SQLite in-memory database let options = SqliteConnectOptions::from_str("sqlite::memory:").unwrap().create_if_missing(true); let pool = SqlitePoolOptions::new().max_connections(5).connect_with(options).await.unwrap(); sqlx::migrate!("../migrations").run(&pool).await.unwrap(); + // Set up the compiler test environment let setup = CompilerTestSetup::from_examples("../../dojo-core", "../../../examples/"); let config = setup.build_test_config("spawn-and-move", Profile::DEV); @@ -306,10 +313,14 @@ async fn test_get_entity_keys() { let manifest_path = Utf8PathBuf::from(config.manifest_path().parent().unwrap()); let target_dir = Utf8PathBuf::from(ws.target_dir().to_string()).join("dev"); + // Configure and start the KatanaRunner let seq_config = KatanaRunnerConfig { n_accounts: 10, ..Default::default() } .with_db_dir(copy_spawn_and_move_db().as_str()); + let sequencer = KatanaRunner::new_with_config(seq_config).expect("Failed to start runner."); + let account = sequencer.account(0); + // Prepare migration with world and seed let (strat, _) = prepare_migration_with_world_and_seed( manifest_path, target_dir, @@ -327,10 +338,9 @@ async fn test_get_entity_keys() { strat.world_address, ); - let account = sequencer.account(0); - let world = WorldContract::new(strat.world_address, &account); + // Grant writer permissions let res = world .grant_writer(&compute_bytearray_hash("dojo_examples"), &ContractAddress(actions_address)) .send_with_cfg(&TxnConfig::init_wait()) @@ -339,8 +349,8 @@ async fn test_get_entity_keys() { TransactionWaiter::new(res.transaction_hash, &account.provider()).await.unwrap(); - // spawn - let res = account + // Send spawn transaction + let spawn_res = account .execute_v1(vec![Call { to: actions_address, selector: get_selector_from_name("spawn").unwrap(), @@ -350,23 +360,28 @@ async fn test_get_entity_keys() { .await .unwrap(); - TransactionWaiter::new(res.transaction_hash, &account.provider()).await.unwrap(); - - let world_reader = WorldContractReader::new(strat.world_address, account.provider()); - - let mut db = Sql::new(pool.clone(), world_reader.address).await.unwrap(); + TransactionWaiter::new(spawn_res.transaction_hash, &account.provider()).await.unwrap(); - let _ = bootstrap_engine(world_reader, db.clone(), account.provider()).await; + // Send move transaction + let move_res = account + .execute_v1(vec![Call { + to: actions_address, + selector: get_selector_from_name("move").unwrap(), + calldata: vec![Felt::ZERO], + }]) + .send_with_cfg(&TxnConfig::init_wait()) + .await + .unwrap(); - let keys = db.get_entity_keys_def("dojo_examples-Moves").await.unwrap(); - assert_eq!(keys, vec![("player".to_string(), "ContractAddress".to_string()),]); + TransactionWaiter::new(move_res.transaction_hash, &account.provider()).await.unwrap(); - let entity_id = poseidon_hash_many(&[account.address()]); + let world_reader = WorldContractReader::new(strat.world_address, account.provider()); - let keys = db.get_entity_keys(entity_id, "dojo_examples-Moves").await.unwrap(); - assert_eq!(keys, vec![account.address()]); + let db = Sql::new(pool.clone(), world_reader.address).await.unwrap(); - db.execute().await.unwrap(); + // Expect bootstrap_engine to error out due to the existing bug + let result = bootstrap_engine(world_reader, db.clone(), account.provider()).await; + assert!(result.is_ok(), "bootstrap_engine should not fail"); } /// Count the number of rows in a table. diff --git a/crates/torii/graphql/src/tests/subscription_test.rs b/crates/torii/graphql/src/tests/subscription_test.rs index f7dc04d1f1..363082878a 100644 --- a/crates/torii/graphql/src/tests/subscription_test.rs +++ b/crates/torii/graphql/src/tests/subscription_test.rs @@ -115,7 +115,7 @@ mod tests { block_timestamp, entity_id, model_id, - &keys_str, + Some(&keys_str), ) .await .unwrap(); @@ -233,7 +233,7 @@ mod tests { block_timestamp, entity_id, model_id, - &keys_str, + Some(&keys_str), ) .await .unwrap(); diff --git a/crates/torii/libp2p/src/server/mod.rs b/crates/torii/libp2p/src/server/mod.rs index 0d8f7c22b2..9bc1e25ce3 100644 --- a/crates/torii/libp2p/src/server/mod.rs +++ b/crates/torii/libp2p/src/server/mod.rs @@ -528,7 +528,7 @@ async fn set_entity( model_id: Felt, keys: &str, ) -> anyhow::Result<()> { - db.set_entity(ty, message_id, block_timestamp, entity_id, model_id, keys).await?; + db.set_entity(ty, message_id, block_timestamp, entity_id, model_id, Some(keys)).await?; db.execute().await?; Ok(()) }