Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add testing framework for events that update Account #210

Merged
merged 19 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
689 changes: 374 additions & 315 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion crates/actors/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[features]
default = ["local"]
default = ["local", "mock_storage"]
local = []
remote = []
mock_storage = []

[dependencies]
ethereum-types = "0.14.1"
Expand Down
27 changes: 17 additions & 10 deletions crates/actors/src/account_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use lasr_messages::{
AccountCacheMessage, ActorName, ActorType, RpcMessage, RpcResponseError, SupervisorType,
TransactionResponse,
};
use lasr_types::{Account, AccountType, Address};
use lasr_types::{Account, AccountType, Address, MockPersistenceStore, PersistenceStore};
use ractor::{
concurrency::OneshotReceiver, Actor, ActorCell, ActorProcessingErr, ActorRef, SupervisionEvent,
};
Expand All @@ -14,12 +14,15 @@ use std::{
time::{Duration, Instant},
};
use thiserror::Error;
#[cfg(not(feature = "mock_storage"))]
use tikv_client::RawClient as TikvClient;
use tokio::sync::mpsc::Sender;

#[derive(Debug, Clone, Default)]
pub struct AccountCacheActor;

pub type StorageRef = <AccountCacheActor as Actor>::Arguments;

impl ActorName for AccountCacheActor {
fn name(&self) -> ractor::ActorName {
ActorType::AccountCache.to_string()
Expand All @@ -44,15 +47,15 @@ impl Default for AccountCacheError {
}
}

pub struct AccountCache {
pub struct AccountCache<S: PersistenceStore> {
inner: AccountCacheInner,
tikv_client: TikvClient,
storage: S,
}
impl AccountCache {
pub fn new(tikv_client: TikvClient) -> Self {
impl<S: PersistenceStore> AccountCache<S> {
pub fn new(storage: S) -> Self {
Self {
inner: AccountCacheInner::new(),
tikv_client,
storage,
}
}
}
Expand Down Expand Up @@ -192,8 +195,11 @@ impl AccountCacheActor {
#[async_trait]
impl Actor for AccountCacheActor {
type Msg = AccountCacheMessage;
type State = AccountCache;
type State = AccountCache<Self::Arguments>;
#[cfg(not(feature = "mock_storage"))]
type Arguments = TikvClient;
#[cfg(feature = "mock_storage")]
type Arguments = MockPersistenceStore<String, Vec<u8>>;

async fn pre_start(
&self,
Expand Down Expand Up @@ -243,9 +249,10 @@ impl Actor for AccountCacheActor {
let acc_key = address.to_full_string();

// Pull `Account` data from persistence store
state
.tikv_client
.get(acc_key.to_owned())
PersistenceStore::get(
&state.storage,
<Self::Arguments as PersistenceStore>::Key::from(acc_key.to_owned())
eureka-cpu marked this conversation as resolved.
Show resolved Hide resolved
)
.await
.typecast()
.log_err(|e| AccountCacheError::Custom(format!("failed to find Account with address: {hex_address} in persistence store: {e:?}")))
Expand Down
28 changes: 16 additions & 12 deletions crates/actors/src/batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ use web3::types::BlockNumber;

use crate::{
account_cache, get_account, get_actor_ref, handle_actor_response, process_group_changed,
AccountCacheError, ActorExt, Coerce, DaClientError, EoClientError, PendingTransactionError,
SchedulerError, StaticFuture, UnorderedFuturePool,
AccountCacheActor, AccountCacheError, ActorExt, Coerce, DaClientError, EoClientError,
PendingTransactionError, SchedulerError, StaticFuture, StorageRef, UnorderedFuturePool,
};
use lasr_messages::{
AccountCacheMessage, ActorName, ActorType, BatcherMessage, DaClientMessage, EoMessage,
Expand All @@ -55,8 +55,9 @@ use lasr_contract::create_program_id;
use lasr_types::{
Account, AccountBuilder, AccountType, Address, AddressOrNamespace, ArbitraryData,
BurnInstruction, ContractLogType, CreateInstruction, Instruction, Metadata, MetadataValue,
Namespace, Outputs, ProgramAccount, ProgramUpdate, TokenDistribution, TokenOrProgramUpdate,
TokenUpdate, Transaction, TransactionType, TransferInstruction, UpdateInstruction, U256,
Namespace, Outputs, PersistenceStore, ProgramAccount, ProgramUpdate, TokenDistribution,
TokenOrProgramUpdate, TokenUpdate, Transaction, TransactionType, TransferInstruction,
UpdateInstruction, U256,
};

use derive_builder::Builder;
Expand Down Expand Up @@ -1702,7 +1703,7 @@ impl Batcher {

async fn handle_next_batch_request(
batcher: Arc<Mutex<Batcher>>,
tikv_client: TikvClient,
storage_ref: StorageRef,
) -> Result<(), BatcherError> {
if let Some(blob_response) = {
let mut guard = batcher.lock().await;
Expand All @@ -1721,7 +1722,10 @@ impl Batcher {
let acc_val = AccountValue { account: data };
// Serialize `Account` data to be stored.
if let Some(val) = bincode::serialize(&acc_val).ok() {
if tikv_client.put(addr.clone(), val).await.is_ok() {
if PersistenceStore::put(&storage_ref, addr.clone(), val)
.await
.is_ok()
{
log::warn!(
"Inserted Account with address of {addr:?} to persistence layer",
)
Expand Down Expand Up @@ -1902,8 +1906,8 @@ impl Actor for BatcherActor {
) -> Result<(), ActorProcessingErr> {
let batcher_ptr = Arc::clone(state);
match message {
BatcherMessage::GetNextBatch { tikv_client } => {
Batcher::handle_next_batch_request(batcher_ptr, tikv_client).await?;
BatcherMessage::GetNextBatch { storage_ref } => {
Batcher::handle_next_batch_request(batcher_ptr, storage_ref).await?;
// let mut guard = self.future_pool.lock().await;
// guard.push(fut.boxed());
}
Expand Down Expand Up @@ -2052,7 +2056,7 @@ impl Actor for BatcherSupervisor {

pub async fn batch_requestor(
mut stopper: tokio::sync::mpsc::Receiver<u8>,
tikv_client: TikvClient,
storage_ref: StorageRef,
) {
if let Some(batcher) = ractor::registry::where_is(ActorType::Batcher.to_string()) {
let batcher: ActorRef<BatcherMessage> = batcher.into();
Expand All @@ -2064,7 +2068,7 @@ pub async fn batch_requestor(
log::info!("SLEEPING THEN REQUESTING NEXT BATCH");
tokio::time::sleep(tokio::time::Duration::from_secs(batch_interval_secs)).await;
let message = BatcherMessage::GetNextBatch {
tikv_client: tikv_client.clone(),
storage_ref: storage_ref.clone(),
};
log::warn!("requesting next batch");
if let Err(err) = batcher.cast(message) {
Expand Down Expand Up @@ -2106,8 +2110,8 @@ mod batcher_tests {
async fn handle(&self, message: Self::Msg, state: &mut Self::State) -> Result<()> {
let batcher_ptr = Arc::clone(state);
match message {
BatcherMessage::GetNextBatch { tikv_client } => {
let fut = Batcher::handle_next_batch_request(batcher_ptr, tikv_client);
BatcherMessage::GetNextBatch { storage_ref } => {
let fut = Batcher::handle_next_batch_request(batcher_ptr, storage_ref);
let mut guard = self.future_pool.lock().await;
guard.push(fut.boxed());
}
Expand Down
7 changes: 3 additions & 4 deletions crates/actors/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use lasr_messages::{
use ractor::{concurrency::JoinHandle, Actor, ActorRef};
use std::sync::Arc;
use thiserror::Error;
use tikv_client::RawClient as TikvClient;
use tokio::sync::Mutex;

use crate::{
Expand All @@ -16,7 +15,7 @@ use crate::{
DaClientSupervisorError, EngineActor, EngineSupervisorError, EoClient, EoClientActor,
EoClientSupervisorError, EoServerActor, EoServerSupervisorError, ExecutionEngine,
ExecutorActor, ExecutorSupervisorError, LasrRpcServerActor, LasrRpcServerSupervisorError,
PendingTransactionActor, PendingTransactionSupervisorError, TaskScheduler,
PendingTransactionActor, PendingTransactionSupervisorError, StorageRef, TaskScheduler,
TaskSchedulerSupervisorError, ValidatorActor, ValidatorCore, ValidatorSupervisorError,
};

Expand Down Expand Up @@ -87,7 +86,7 @@ impl ActorManager {
actor_manager: Arc<Mutex<ActorManager>>,
actor_name: ractor::ActorName,
handler: AccountCacheActor,
startup_args: TikvClient,
startup_args: StorageRef,
) -> Result<(), ActorManagerError> {
if let Some(supervisor) = get_actor_ref::<AccountCacheMessage, AccountCacheSupervisorError>(
SupervisorType::AccountCache,
Expand Down Expand Up @@ -438,7 +437,7 @@ impl ActorManagerBuilder {
pub async fn account_cache(
self,
account_cache_actor: AccountCacheActor,
startup_args: TikvClient,
startup_args: StorageRef,
account_cache_supervisor: ActorRef<AccountCacheMessage>,
) -> Result<Self, Box<dyn std::error::Error>> {
let mut new = self;
Expand Down
49 changes: 49 additions & 0 deletions crates/actors/tests/account.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#![cfg(test)]
//! Test events that make changes to `Account`s.

use std::collections::{BTreeMap, BTreeSet};

use lasr_types::{
Account, AccountBuilder, AccountType, Address, AddressOrNamespace, ArbitraryData, Metadata,
Namespace, Status, TokenBuilder, U256,
};

pub fn default_test_account(hex_address: String) -> Account {
let owner_address =
Address::from_hex(&hex_address).expect("failed to produce address from hex str");
let namespace = Namespace::from("TEST_NAMESPACE".to_string());
let program_namespace = AddressOrNamespace::Namespace(namespace);
let mut test_program_set = BTreeSet::new();
test_program_set.insert(program_namespace.clone());
let program_address = Address::new([4; 20]);
let token = TokenBuilder::default()
.program_id(program_address.clone())
.owner_id(owner_address.clone())
.balance(U256::from(666))
.metadata(Metadata::new())
.token_ids(vec![U256::from(69)])
.allowance(BTreeMap::new())
.approvals(BTreeMap::new())
.data(ArbitraryData::new())
.status(Status::Free)
.build()
.expect("failed to build test token");
let mut programs = BTreeMap::new();
programs.insert(program_address, token);
AccountBuilder::default()
.account_type(AccountType::User)
.program_namespace(Some(program_namespace))
.owner_address(owner_address)
.programs(programs)
.nonce(U256::from(0))
.program_account_data(ArbitraryData::new())
.program_account_metadata(Metadata::new())
.program_account_linked_programs(test_program_set)
.build()
.expect("failed to build test account")
}

#[test]
fn bridge_in_event() {
let account = default_test_account(Address::default().to_full_string());
}
2 changes: 1 addition & 1 deletion crates/compute/src/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::os::unix::prelude::PermissionsExt;
use std::path::Path;
use std::process::Stdio;
use std::{ffi::OsStr, fmt::Display};
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::process::Command;
use web3_pkg::web3_store::Web3Store;

Expand Down
4 changes: 3 additions & 1 deletion crates/messages/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ name = "lasr_messages"
version = "0.9.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
default = ["mock_storage"]
mock_storage = []

[dependencies]
uuid = { version = "1.3", features = ["v4", "serde"] }
Expand Down
6 changes: 5 additions & 1 deletion crates/messages/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use ractor::RpcReplyPort;
use ractor_cluster::RactorMessage;
use std::collections::{HashMap, HashSet};
use std::fmt::Display;
#[cfg(not(feature = "mock_storage"))]
use tikv_client::RawClient as TikvClient;
use web3::ethabi::{Address as EthereumAddress, FixedBytes};
use web3::types::TransactionReceipt;
Expand Down Expand Up @@ -448,7 +449,10 @@ pub enum BatcherMessage {
outputs: Option<Outputs>,
},
GetNextBatch {
tikv_client: TikvClient,
#[cfg(not(feature = "mock_storage"))]
storage_ref: TikvClient,
#[cfg(feature = "mock_storage")]
storage_ref: lasr_types::MockPersistenceStore<String, Vec<u8>>,
},
BlobVerificationProof {
request_id: String,
Expand Down
3 changes: 2 additions & 1 deletion crates/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[features]
default = ["local"]
default = ["local", "mock_storage"]
local = []
remote = []
mock_storage = []

[[bin]]
name = "lasr_node"
Expand Down
23 changes: 16 additions & 7 deletions crates/node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ use lasr_actors::{
use lasr_compute::{OciBundler, OciBundlerBuilder, OciManager};
use lasr_messages::{ActorName, ActorType, ToActorType};
use lasr_rpc::LasrRpcServer;
use lasr_types::Address;
#[cfg(feature = "mock_storage")]
use lasr_types::MockPersistenceStore;
use lasr_types::{Address, PersistenceStore};
use ractor::{Actor, ActorCell, ActorStatus};
use secp256k1::Secp256k1;
#[cfg(not(feature = "mock_storage"))]
use tikv_client::RawClient as TikvClient;
use tokio::sync::{
mpsc::{Receiver, Sender},
Expand Down Expand Up @@ -70,8 +73,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let inner_eo_server =
setup_eo_server(web3_instance.clone(), &block_processed_path).map_err(Box::new)?;

const TIKV_CLIENT_PD_ENDPOINT: &str = "127.0.0.1:2379";
let tikv_client = TikvClient::new(vec![TIKV_CLIENT_PD_ENDPOINT]).await?;
#[cfg(not(feature = "mock_storage"))]
let persistence_storage = <TikvClient as PersistenceStore>::new().await?;
#[cfg(feature = "mock_storage")]
let persistence_storage =
<MockPersistenceStore<String, Vec<u8>> as PersistenceStore>::new().await?;

#[cfg(feature = "local")]
let bundler: OciBundler<String, String> = OciBundlerBuilder::default()
Expand Down Expand Up @@ -218,7 +224,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.await?
.account_cache(
account_cache_actor.clone(),
tikv_client.clone(),
persistence_storage.clone(),
account_cache_supervisor,
)
.await?
Expand Down Expand Up @@ -274,7 +280,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let batcher_clone = batcher.clone();
let executor_actor_clone = executor_actor.clone();
let execution_engine_clone = execution_engine.clone();
let tikv_client_clone = tikv_client.clone();
let persistence_storage_clone = persistence_storage.clone();

tokio::spawn(async move {
while let Some(actor) = panic_rx.recv().await {
Expand All @@ -297,7 +303,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
manager_ptr,
actor_name,
account_cache_actor.clone(),
tikv_client_clone.clone(),
persistence_storage_clone.clone(),
)
.await
.typecast()
Expand Down Expand Up @@ -430,7 +436,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
tokio::spawn(graph_cleaner());
tokio::spawn(eo_server_wrapper.run());
tokio::spawn(server_handle.stopped());
tokio::spawn(lasr_actors::batch_requestor(stop_rx, tikv_client.clone()));
tokio::spawn(lasr_actors::batch_requestor(
stop_rx,
persistence_storage.clone(),
));

let future_thread_pool = tokio_rayon::rayon::ThreadPoolBuilder::new()
.num_threads(num_cpus::get())
Expand Down
2 changes: 2 additions & 0 deletions crates/types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,5 @@ log = "0.4.20"
typetag = "0.2.14"
uint = "0.9.5"
schemars = "0.8.16"
tikv-client = "0.3.0"
tokio = { version = "1.34.0", features = ["full"] }
2 changes: 1 addition & 1 deletion crates/types/src/account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl<'de> Deserialize<'de> for Address {
/// This structure is used to store Ethereum Compatible addresses, which are
/// derived from the public key. It implements traits like Clone, Copy, Debug,
/// Serialize, Deserialize, etc., for ease of use across various contexts.
#[derive(Clone, Copy, JsonSchema, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[derive(Clone, Copy, JsonSchema, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
#[serde(rename_all = "camelCase")]
pub struct Address([u8; 20]);

Expand Down
Loading
Loading