Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

opt-fix(torii-core): fix and optimize partial updates #2427

Merged
merged 8 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion crates/torii/core/src/processors/store_set_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ where
let mut entity = model.schema;
entity.deserialize(&mut keys_and_unpacked)?;

db.set_entity(entity, event_id, block_timestamp, entity_id, model_id, &keys_str).await?;
db.set_entity(entity, event_id, block_timestamp, entity_id, model_id, Some(&keys_str))
.await?;
Ok(())
}
}
9 changes: 5 additions & 4 deletions crates/torii/core/src/processors/store_update_member.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use anyhow::{Context, Error, Result};
use async_trait::async_trait;
use dojo_types::schema::{Struct, Ty};
use dojo_world::contracts::naming;
use dojo_world::contracts::world::WorldContractReader;
use num_traits::ToPrimitive;
Expand Down Expand Up @@ -50,11 +51,11 @@
event_id: &str,
event: &Event,
) -> Result<(), Error> {
let selector = event.data[MODEL_INDEX];
let model_id = event.data[MODEL_INDEX];

Check warning on line 54 in crates/torii/core/src/processors/store_update_member.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/processors/store_update_member.rs#L54

Added line #L54 was not covered by tests
let entity_id = event.data[ENTITY_ID_INDEX];
let member_selector = event.data[MEMBER_INDEX];

let model = db.model(selector).await?;
let model = db.model(model_id).await?;

Check warning on line 58 in crates/torii/core/src/processors/store_update_member.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/processors/store_update_member.rs#L58

Added line #L58 was not covered by tests
let schema = model.schema;

let mut member = schema
Expand Down Expand Up @@ -98,9 +99,9 @@
}

member.ty.deserialize(&mut values)?;
let wrapped_ty = Ty::Struct(Struct { name: schema.name(), children: vec![member] });

Check warning on line 102 in crates/torii/core/src/processors/store_update_member.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/processors/store_update_member.rs#L102

Added line #L102 was not covered by tests

db.set_model_member(&schema.name(), entity_id, false, &member, event_id, block_timestamp)
.await?;
db.set_entity(wrapped_ty, event_id, block_timestamp, entity_id, model_id, None).await?;

Check warning on line 104 in crates/torii/core/src/processors/store_update_member.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/processors/store_update_member.rs#L104

Added line #L104 was not covered by tests
Ok(())
}
}
28 changes: 14 additions & 14 deletions crates/torii/core/src/processors/store_update_record.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::{Context, Error, Ok, Result};
use async_trait::async_trait;
use dojo_world::contracts::naming;
use dojo_types::schema::Ty;
use dojo_world::contracts::world::WorldContractReader;
use num_traits::ToPrimitive;
use starknet::core::types::Event;
Expand All @@ -9,7 +9,7 @@ use tracing::info;

use super::EventProcessor;
use crate::processors::{ENTITY_ID_INDEX, MODEL_INDEX};
use crate::sql::{felts_sql_string, Sql};
use crate::sql::Sql;

pub(crate) const LOG_TARGET: &str = "torii_core::processors::store_update_record";

Expand Down Expand Up @@ -64,21 +64,21 @@ where
values_start + event.data[values_start].to_usize().context("invalid usize")?;

// Skip the length to only get the values as they will be deserialized.
let values = event.data[values_start + 1..=values_end].to_vec();

let tag = naming::get_tag(&model.namespace, &model.name);

// Keys are read from the db, since we don't have access to them when only
// the entity id is passed.
let keys = db.get_entity_keys(entity_id, &tag).await?;

let keys_str = felts_sql_string(&keys);
let mut keys_and_unpacked = [keys, values].concat();
let mut values = event.data[values_start + 1..=values_end].to_vec();

let mut entity = model.schema;
entity.deserialize(&mut keys_and_unpacked)?;
match entity {
Ty::Struct(ref mut struct_) => {
// we do not need the keys. the entity Ty has the keys in its schema
// so we should get rid of them to avoid trying to deserialize them
struct_.children.retain(|field| !field.key);
}
_ => return Err(anyhow::anyhow!("Expected struct")),
}

entity.deserialize(&mut values)?;

db.set_entity(entity, event_id, block_timestamp, entity_id, model_id, &keys_str).await?;
db.set_entity(entity, event_id, block_timestamp, entity_id, model_id, None).await?;
Ok(())
}
}
129 changes: 27 additions & 102 deletions crates/torii/core/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ use std::sync::Arc;
use anyhow::{anyhow, Result};
use chrono::Utc;
use dojo_types::primitive::Primitive;
use dojo_types::schema::{EnumOption, Member, Struct, Ty};
use dojo_types::schema::{EnumOption, Member, Ty};
use dojo_world::contracts::abi::model::Layout;
use dojo_world::contracts::naming::{compute_selector_from_names, compute_selector_from_tag};
use dojo_world::metadata::WorldMetadata;
use sqlx::pool::PoolConnection;
use sqlx::{Pool, Row, Sqlite};
use sqlx::{Pool, Sqlite};
use starknet::core::types::{Event, Felt, InvokeTransaction, Transaction};
use starknet_crypto::poseidon_hash_many;
use tracing::debug;
Expand All @@ -24,7 +24,7 @@ use crate::types::{
use crate::utils::{must_utc_datetime_from_timestamp, utc_dt_string_from_timestamp};

type IsEventMessage = bool;
type IsStoreUpdateMember = bool;
type IsStoreUpdate = bool;

pub const WORLD_CONTRACT_TYPE: &str = "WORLD";
pub const FELT_DELIMITER: &str = "/";
Expand Down Expand Up @@ -169,28 +169,35 @@ impl Sql {
block_timestamp: u64,
entity_id: Felt,
model_id: Felt,
keys_str: &str,
keys_str: Option<&str>,
) -> Result<()> {
let namespaced_name = entity.name();

let entity_id = format!("{:#x}", entity_id);
let model_id = format!("{:#x}", model_id);

let insert_entities = "INSERT INTO entities (id, keys, event_id, executed_at) VALUES (?, \
?, ?, ?) ON CONFLICT(id) DO UPDATE SET \
updated_at=CURRENT_TIMESTAMP, executed_at=EXCLUDED.executed_at, \
event_id=EXCLUDED.event_id RETURNING *";
let insert_entities = if keys_str.is_some() {
"INSERT INTO entities (id, event_id, executed_at, keys) VALUES (?, ?, ?, ?) ON \
CONFLICT(id) DO UPDATE SET updated_at=CURRENT_TIMESTAMP, \
executed_at=EXCLUDED.executed_at, event_id=EXCLUDED.event_id, keys=EXCLUDED.keys \
RETURNING *"
} else {
"INSERT INTO entities (id, event_id, executed_at) VALUES (?, ?, ?) ON CONFLICT(id) DO \
UPDATE SET updated_at=CURRENT_TIMESTAMP, executed_at=EXCLUDED.executed_at, \
event_id=EXCLUDED.event_id RETURNING *"
};

self.query_queue.enqueue(
insert_entities,
vec![
Argument::String(entity_id.clone()),
Argument::String(keys_str.to_string()),
Argument::String(event_id.to_string()),
Argument::String(utc_dt_string_from_timestamp(block_timestamp)),
],
QueryType::SetEntity(entity.clone()),
);
let mut arguments = vec![
Argument::String(entity_id.clone()),
Argument::String(event_id.to_string()),
Argument::String(utc_dt_string_from_timestamp(block_timestamp)),
];

if let Some(keys) = keys_str {
arguments.push(Argument::String(keys.to_string()));
}

self.query_queue.enqueue(insert_entities, arguments, QueryType::SetEntity(entity.clone()));

self.query_queue.enqueue(
"INSERT INTO entity_model (entity_id, model_id) VALUES (?, ?) ON CONFLICT(entity_id, \
Expand All @@ -204,7 +211,7 @@ impl Sql {
path,
event_id,
(&entity_id, false),
(&entity, false),
(&entity, keys_str.is_none()),
block_timestamp,
&vec![],
);
Expand Down Expand Up @@ -271,48 +278,6 @@ impl Sql {
Ok(())
}

pub async fn set_model_member(
&mut self,
model_tag: &str,
entity_id: Felt,
is_event_message: bool,
member: &Member,
event_id: &str,
block_timestamp: u64,
) -> Result<()> {
let entity_id = format!("{:#x}", entity_id);
let path = vec![model_tag.to_string()];

let wrapped_ty =
Ty::Struct(Struct { name: model_tag.to_string(), children: vec![member.clone()] });

// update model member
self.build_set_entity_queries_recursive(
path,
event_id,
(&entity_id, is_event_message),
(&wrapped_ty, true),
block_timestamp,
&vec![],
);
self.execute().await?;

let update_query = "UPDATE entities SET updated_at=CURRENT_TIMESTAMP, executed_at=?, \
event_id=? WHERE id = ? RETURNING *";

self.query_queue.enqueue(
update_query.to_string(),
vec![
Argument::String(utc_dt_string_from_timestamp(block_timestamp)),
Argument::String(event_id.to_string()),
Argument::String(entity_id.clone()),
],
QueryType::SetEntity(wrapped_ty),
);

Ok(())
}

pub async fn delete_entity(
&mut self,
entity_id: Felt,
Expand Down Expand Up @@ -419,46 +384,6 @@ impl Sql {
self.model_cache.model(&selector).await.map_err(|e| e.into())
}

/// Retrieves the keys definition for a given model.
/// The key definition is currently implemented as (`name`, `type`).
pub async fn get_entity_keys_def(&self, model_tag: &str) -> Result<Vec<(String, String)>> {
let query = sqlx::query_as::<_, (String, String)>(
"SELECT name, type FROM model_members WHERE id = ? AND key = true",
)
.bind(model_tag);

let mut conn: PoolConnection<Sqlite> = self.pool.acquire().await?;
let rows: Vec<(String, String)> = query.fetch_all(&mut *conn).await?;
Ok(rows.iter().map(|(name, ty)| (name.to_string(), ty.to_string())).collect())
}

/// Retrieves the keys for a given entity.
/// The keys are returned in the same order as the keys definition.
pub async fn get_entity_keys(&self, entity_id: Felt, model_tag: &str) -> Result<Vec<Felt>> {
let entity_id = format!("{:#x}", entity_id);
let keys_def = self.get_entity_keys_def(model_tag).await?;

let keys_names =
keys_def.iter().map(|(name, _)| format!("external_{}", name)).collect::<Vec<String>>();

let sql = format!("SELECT {} FROM [{}] WHERE id = ?", keys_names.join(", "), model_tag);
let query = sqlx::query(sql.as_str()).bind(entity_id);

let mut conn: PoolConnection<Sqlite> = self.pool.acquire().await?;

let mut keys: Vec<Felt> = vec![];
let result = query.fetch_all(&mut *conn).await?;

for row in result {
for (i, _) in row.columns().iter().enumerate() {
let value: String = row.try_get(i)?;
keys.push(Felt::from_hex(&value)?);
}
}

Ok(keys)
}

pub async fn does_entity_exist(&self, model: String, key: Felt) -> Result<bool> {
let sql = format!("SELECT COUNT(*) FROM [{model}] WHERE id = ?");

Expand Down Expand Up @@ -641,7 +566,7 @@ impl Sql {
event_id: &str,
// The id of the entity and if the entity is an event message
entity_id: (&str, IsEventMessage),
entity: (&Ty, IsStoreUpdateMember),
entity: (&Ty, IsStoreUpdate),
block_timestamp: u64,
indexes: &Vec<i64>,
) {
Expand Down
49 changes: 32 additions & 17 deletions crates/torii/core/src/sql_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ use crate::processors::generate_event_processors_map;
use crate::processors::register_model::RegisterModelProcessor;
use crate::processors::store_del_record::StoreDelRecordProcessor;
use crate::processors::store_set_record::StoreSetRecordProcessor;
use crate::processors::store_update_member::StoreUpdateMemberProcessor;
use crate::processors::store_update_record::StoreUpdateRecordProcessor;
use crate::sql::Sql;

pub async fn bootstrap_engine<P>(
Expand All @@ -42,6 +44,8 @@ where
event: generate_event_processors_map(vec![
Box::new(RegisterModelProcessor),
Box::new(StoreSetRecordProcessor),
Box::new(StoreUpdateRecordProcessor),
Box::new(StoreUpdateMemberProcessor),
Box::new(StoreDelRecordProcessor),
])?,
..Processors::default()
Expand Down Expand Up @@ -292,24 +296,31 @@ async fn test_load_from_remote_del() {
db.execute().await.unwrap();
}

// Start of Selection
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Start of Selection

#[tokio::test(flavor = "multi_thread")]
async fn test_get_entity_keys() {
async fn test_update_with_set_record() {
// Initialize the SQLite in-memory database
let options =
SqliteConnectOptions::from_str("sqlite::memory:").unwrap().create_if_missing(true);
let pool = SqlitePoolOptions::new().max_connections(5).connect_with(options).await.unwrap();
sqlx::migrate!("../migrations").run(&pool).await.unwrap();

// Set up the compiler test environment
let setup = CompilerTestSetup::from_examples("../../dojo-core", "../../../examples/");
let config = setup.build_test_config("spawn-and-move", Profile::DEV);

let ws = scarb::ops::read_workspace(config.manifest_path(), &config).unwrap();
let manifest_path = Utf8PathBuf::from(config.manifest_path().parent().unwrap());
let target_dir = Utf8PathBuf::from(ws.target_dir().to_string()).join("dev");

// Configure and start the KatanaRunner
let seq_config = KatanaRunnerConfig { n_accounts: 10, ..Default::default() }
.with_db_dir(copy_spawn_and_move_db().as_str());

let sequencer = KatanaRunner::new_with_config(seq_config).expect("Failed to start runner.");
let account = sequencer.account(0);

// Prepare migration with world and seed
let (strat, _) = prepare_migration_with_world_and_seed(
manifest_path,
target_dir,
Expand All @@ -327,10 +338,9 @@ async fn test_get_entity_keys() {
strat.world_address,
);

let account = sequencer.account(0);

let world = WorldContract::new(strat.world_address, &account);

// Grant writer permissions
let res = world
.grant_writer(&compute_bytearray_hash("dojo_examples"), &ContractAddress(actions_address))
.send_with_cfg(&TxnConfig::init_wait())
Expand All @@ -339,8 +349,8 @@ async fn test_get_entity_keys() {

TransactionWaiter::new(res.transaction_hash, &account.provider()).await.unwrap();

// spawn
let res = account
// Send spawn transaction
let spawn_res = account
.execute_v1(vec![Call {
to: actions_address,
selector: get_selector_from_name("spawn").unwrap(),
Expand All @@ -350,23 +360,28 @@ async fn test_get_entity_keys() {
.await
.unwrap();

TransactionWaiter::new(res.transaction_hash, &account.provider()).await.unwrap();

let world_reader = WorldContractReader::new(strat.world_address, account.provider());

let mut db = Sql::new(pool.clone(), world_reader.address).await.unwrap();
TransactionWaiter::new(spawn_res.transaction_hash, &account.provider()).await.unwrap();

let _ = bootstrap_engine(world_reader, db.clone(), account.provider()).await;
// Send move transaction
let move_res = account
.execute_v1(vec![Call {
to: actions_address,
selector: get_selector_from_name("move").unwrap(),
calldata: vec![Felt::ZERO],
}])
.send_with_cfg(&TxnConfig::init_wait())
.await
.unwrap();

let keys = db.get_entity_keys_def("dojo_examples-Moves").await.unwrap();
assert_eq!(keys, vec![("player".to_string(), "ContractAddress".to_string()),]);
TransactionWaiter::new(move_res.transaction_hash, &account.provider()).await.unwrap();

let entity_id = poseidon_hash_many(&[account.address()]);
let world_reader = WorldContractReader::new(strat.world_address, account.provider());

let keys = db.get_entity_keys(entity_id, "dojo_examples-Moves").await.unwrap();
assert_eq!(keys, vec![account.address()]);
let db = Sql::new(pool.clone(), world_reader.address).await.unwrap();

db.execute().await.unwrap();
// Expect bootstrap_engine to error out due to the existing bug
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Expect bootstrap_engine to error out due to the existing bug
// Expect bootstrap_engine to not error out

stale comment from when i was trying to see if without the fixes it actually errors out, can be changed to this now

let result = bootstrap_engine(world_reader, db.clone(), account.provider()).await;
assert!(result.is_ok(), "bootstrap_engine should not fail");
}

/// Count the number of rows in a table.
Expand Down
Loading
Loading