Skip to content

Commit

Permalink
refactor(executor): thread local instead of global variable (#1280)
Browse files Browse the repository at this point in the history
  • Loading branch information
Eason Gao authored Aug 2, 2023
1 parent 6bcb903 commit de44cf5
Show file tree
Hide file tree
Showing 30 changed files with 459 additions and 322 deletions.
50 changes: 40 additions & 10 deletions core/api/src/adapter.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
use std::sync::Arc;

use core_executor::{
system_contract::metadata::MetadataHandle, AxonExecutor, AxonExecutorAdapter, MPTTrie,
};
use protocol::traits::{APIAdapter, Context, Executor, ExecutorAdapter, MemPool, Network, Storage};
use protocol::types::{
Account, BigEndianHash, Block, BlockNumber, Bytes, CkbRelatedInfo, ExecutorContext, Hash,
Header, Metadata, Proposal, Receipt, SignedTransaction, TxResp, H160, MAX_BLOCK_GAS_LIMIT,
NIL_DATA, RLP_NULL, U256,
Header, Metadata, Proposal, Receipt, SignedTransaction, TxResp, H160, H256,
MAX_BLOCK_GAS_LIMIT, NIL_DATA, RLP_NULL, U256,
};
use protocol::{async_trait, codec::ProtocolCodec, trie, ProtocolResult};

use core_executor::{
system_contract::metadata::MetadataHandle, AxonExecutor, AxonExecutorAdapter, MPTTrie,
};

use crate::APIError;

#[derive(Clone)]
Expand Down Expand Up @@ -237,14 +238,43 @@ where
block_number: Option<u64>,
) -> ProtocolResult<Metadata> {
if let Some(num) = block_number {
return MetadataHandle::default().get_metadata_by_block_number(num);
return MetadataHandle::new(self.get_metadata_root(ctx).await?)
.get_metadata_by_block_number(num);
}

let num = self.storage.get_latest_block_header(ctx).await?.number;
MetadataHandle::default().get_metadata_by_block_number(num)
let num = self
.storage
.get_latest_block_header(ctx.clone())
.await?
.number;
MetadataHandle::new(self.get_metadata_root(ctx).await?).get_metadata_by_block_number(num)
}

async fn get_ckb_related_info(&self, ctx: Context) -> ProtocolResult<CkbRelatedInfo> {
MetadataHandle::new(self.get_metadata_root(ctx).await?).get_ckb_related_info()
}

async fn get_ckb_related_info(&self, _ctx: Context) -> ProtocolResult<CkbRelatedInfo> {
MetadataHandle::default().get_ckb_related_info()
async fn get_image_cell_root(&self, ctx: Context) -> ProtocolResult<H256> {
let state_root = self.storage.get_latest_block_header(ctx).await?.state_root;

Ok(AxonExecutorAdapter::from_root(
state_root,
Arc::clone(&self.trie_db),
Arc::clone(&self.storage),
Default::default(),
)?
.get_image_cell_root())
}

async fn get_metadata_root(&self, ctx: Context) -> ProtocolResult<H256> {
let state_root = self.storage.get_latest_block_header(ctx).await?.state_root;

Ok(AxonExecutorAdapter::from_root(
state_root,
Arc::clone(&self.trie_db),
Arc::clone(&self.storage),
Default::default(),
)?
.get_metadata_root())
}
}
36 changes: 27 additions & 9 deletions core/api/src/jsonrpc/impl/ckb_light_client.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,31 @@
use std::sync::Arc;

use ckb_jsonrpc_types::{CellData, CellInfo, HeaderView as CkbHeaderView, JsonBytes, OutPoint};
use ckb_traits::HeaderProvider;
use ckb_types::core::cell::{CellProvider, CellStatus};
use ckb_types::{packed, prelude::Pack};

use core_executor::system_contract::DataProvider;

use core_executor::DataProvider;
use jsonrpsee::core::Error;
use protocol::traits::{APIAdapter, Context};
use protocol::{async_trait, ckb_blake2b_256, types::H256};

use crate::jsonrpc::{CkbLightClientRpcServer, RpcResult};

#[derive(Default, Clone, Debug)]
pub struct CkbLightClientRpcImpl {
data_provider: DataProvider,
#[derive(Clone, Debug)]
pub struct CkbLightClientRpcImpl<Adapter: APIAdapter> {
adapter: Arc<Adapter>,
}

#[async_trait]
impl CkbLightClientRpcServer for CkbLightClientRpcImpl {
impl<Adapter: APIAdapter + 'static> CkbLightClientRpcServer for CkbLightClientRpcImpl<Adapter> {
async fn get_block_header_by_hash(&self, hash: H256) -> RpcResult<Option<CkbHeaderView>> {
Ok(self
.data_provider
let root = self
.adapter
.get_image_cell_root(Context::new())
.await
.map_err(|e| Error::Custom(e.to_string()))?;
Ok(DataProvider::new(root)
.get_header(&(hash.0.pack()))
.map(Into::into))
}
Expand All @@ -29,8 +36,13 @@ impl CkbLightClientRpcServer for CkbLightClientRpcImpl {
with_data: bool,
) -> RpcResult<Option<CellInfo>> {
let out_point: packed::OutPoint = out_point.into();
let root = self
.adapter
.get_image_cell_root(Context::new())
.await
.map_err(|e| Error::Custom(e.to_string()))?;

match self.data_provider.cell(&out_point, false) {
match DataProvider::new(root).cell(&out_point, false) {
CellStatus::Live(c) => {
let data = with_data.then_some(c.mem_cell_data).flatten();
Ok(Some(CellInfo {
Expand All @@ -45,3 +57,9 @@ impl CkbLightClientRpcServer for CkbLightClientRpcImpl {
}
}
}

impl<Adapter: APIAdapter> CkbLightClientRpcImpl<Adapter> {
pub fn new(adapter: Arc<Adapter>) -> Self {
Self { adapter }
}
}
144 changes: 75 additions & 69 deletions core/api/src/jsonrpc/impl/web3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,80 @@ impl<Adapter: APIAdapter> Web3RpcImpl<Adapter> {
reward,
))
}

async fn extract_interoperation_tx_sender(
&self,
utx: &UnverifiedTransaction,
signature: &SignatureComponents,
) -> RpcResult<H160> {
// Call CKB-VM mode
if signature.r[0] == 0 {
let r = rlp::decode::<CellDepWithPubKey>(&signature.r[1..])
.map_err(|e| Error::Custom(e.to_string()))?;

return Ok(Hasher::digest(&r.pub_key).into());
}

// Verify by CKB-VM mode
let r = SignatureR::decode(&signature.r).map_err(|e| Error::Custom(e.to_string()))?;
let s = SignatureS::decode(&signature.s).map_err(|e| Error::Custom(e.to_string()))?;
let address_source = r.address_source();

let ckb_tx_view =
InteroperationImpl::dummy_transaction(r.clone(), s, Some(utx.signature_hash(true).0));
let dummy_input = r.dummy_input();

let input = ckb_tx_view
.inputs()
.get(address_source.index as usize)
.ok_or(Error::Custom("Invalid address source".to_string()))?;

log::debug!("[mempool]: verify interoperation tx sender \ntx view \n{:?}\ndummy input\n {:?}\naddress source\n{:?}\n", ckb_tx_view, dummy_input, address_source);

// Dummy input mode
if is_dummy_out_point(&input.previous_output()) {
log::debug!("[mempool]: verify interoperation tx dummy input mode.");

if let Some(cell) = dummy_input {
if address_source.type_ == 1 && cell.type_script.is_none() {
return Err(Error::Custom(
"Invalid address source in dummy input mode".to_string(),
));
}

let script_hash = if address_source.type_ == 0 {
cell.lock_script_hash()
} else {
cell.type_script_hash().unwrap()
};

return Ok(Hasher::digest(script_hash).into());
}

return Err(Error::Custom("No dummy input cell".to_string()));
}

// Reality input mode
let root = self
.adapter
.get_image_cell_root(Context::new())
.await
.map_err(|e| Error::Custom(e.to_string()))?;
match DataProvider::new(root).cell(&input.previous_output(), true) {
CellStatus::Live(cell) => {
let script_hash = if address_source.type_ == 0 {
ckb_blake2b_256(cell.cell_output.lock().as_slice())
} else if let Some(type_script) = cell.cell_output.type_().to_opt() {
ckb_blake2b_256(type_script.as_slice())
} else {
return Err(Error::Custom("Invalid address source".to_string()));
};

Ok(Hasher::digest(script_hash).into())
}
_ => Err(Error::Custom("Cannot find input cell in ICSC".to_string())),
}
}
}

#[async_trait]
Expand Down Expand Up @@ -231,7 +305,7 @@ impl<Adapter: APIAdapter + 'static> Web3RpcServer for Web3RpcImpl<Adapter> {
if sig.is_eth_sig() {
None
} else {
Some(extract_interoperation_tx_sender(&utx, sig)?)
Some(self.extract_interoperation_tx_sender(&utx, sig).await?)
}
} else {
return Err(Error::Custom("The transaction is not signed".to_string()));
Expand Down Expand Up @@ -1161,71 +1235,3 @@ pub fn from_receipt_to_web3_log(
}
}
}

fn extract_interoperation_tx_sender(
utx: &UnverifiedTransaction,
signature: &SignatureComponents,
) -> RpcResult<H160> {
// Call CKB-VM mode
if signature.r[0] == 0 {
let r = rlp::decode::<CellDepWithPubKey>(&signature.r[1..])
.map_err(|e| Error::Custom(e.to_string()))?;

return Ok(Hasher::digest(&r.pub_key).into());
}

// Verify by CKB-VM mode
let r = SignatureR::decode(&signature.r).map_err(|e| Error::Custom(e.to_string()))?;
let s = SignatureS::decode(&signature.s).map_err(|e| Error::Custom(e.to_string()))?;
let address_source = r.address_source();

let ckb_tx_view =
InteroperationImpl::dummy_transaction(r.clone(), s, Some(utx.signature_hash(true).0));
let dummy_input = r.dummy_input();

let input = ckb_tx_view
.inputs()
.get(address_source.index as usize)
.ok_or(Error::Custom("Invalid address source".to_string()))?;

log::debug!("[mempool]: verify interoperation tx sender \ntx view \n{:?}\ndummy input\n {:?}\naddress source\n{:?}\n", ckb_tx_view, dummy_input, address_source);

// Dummy input mode
if is_dummy_out_point(&input.previous_output()) {
log::debug!("[mempool]: verify interoperation tx dummy input mode.");

if let Some(cell) = dummy_input {
if address_source.type_ == 1 && cell.type_script.is_none() {
return Err(Error::Custom(
"Invalid address source in dummy input mode".to_string(),
));
}

let script_hash = if address_source.type_ == 0 {
cell.lock_script_hash()
} else {
cell.type_script_hash().unwrap()
};

return Ok(Hasher::digest(script_hash).into());
}

return Err(Error::Custom("No dummy input cell".to_string()));
}

// Reality input mode
match DataProvider.cell(&input.previous_output(), true) {
CellStatus::Live(cell) => {
let script_hash = if address_source.type_ == 0 {
ckb_blake2b_256(cell.cell_output.lock().as_slice())
} else if let Some(type_script) = cell.cell_output.type_().to_opt() {
ckb_blake2b_256(type_script.as_slice())
} else {
return Err(Error::Custom("Invalid address source".to_string()));
};

Ok(Hasher::digest(script_hash).into())
}
_ => Err(Error::Custom("Cannot find input cell in ICSC".to_string())),
}
}
2 changes: 1 addition & 1 deletion core/api/src/jsonrpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ pub async fn run_jsonrpc_server<Adapter: APIAdapter + 'static>(
let filter =
r#impl::filter_module(Arc::clone(&adapter), config.web3.log_filter_max_block_range)
.into_rpc();
let ckb_light_client_rpc = r#impl::CkbLightClientRpcImpl::default().into_rpc();
let ckb_light_client_rpc = r#impl::CkbLightClientRpcImpl::new(Arc::clone(&adapter)).into_rpc();

rpc.merge(node_rpc).unwrap();
rpc.merge(axon_rpc).unwrap();
Expand Down
36 changes: 27 additions & 9 deletions core/consensus/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ pub struct OverlordConsensusAdapter<

storage: Arc<S>,
trie_db: Arc<DB>,
metadata: Arc<MetadataHandle>,
overlord_handler: RwLock<Option<OverlordHandler<Proposal>>>,
crypto: Arc<OverlordCrypto>,
}
Expand Down Expand Up @@ -339,9 +338,10 @@ where
Arc::clone(&self.storage),
proposal.clone().into(),
)?;
let root = backend.get_metadata_root();
let metadata_handle = MetadataHandle::new(root);

let verifier_list = self
.metadata
let verifier_list = metadata_handle
.get_metadata_by_block_number(proposal.number)?
.verifier_list;

Expand All @@ -357,15 +357,21 @@ where
}

async fn is_last_block_in_current_epoch(&self, block_number: u64) -> ProtocolResult<bool> {
self.metadata.is_last_block_in_current_epoch(block_number)
self.get_metadata_handle(Context::new())
.await?
.is_last_block_in_current_epoch(block_number)
}

async fn get_metadata_by_block_number(&self, block_number: u64) -> ProtocolResult<Metadata> {
self.metadata.get_metadata_by_block_number(block_number)
self.get_metadata_handle(Context::new())
.await?
.get_metadata_by_block_number(block_number)
}

async fn get_metadata_by_epoch(&self, epoch: u64) -> ProtocolResult<Metadata> {
self.metadata.get_metadata_by_epoch(epoch)
self.get_metadata_handle(Context::new())
.await?
.get_metadata_by_epoch(epoch)
}

#[trace_span(kind = "consensus.adapter")]
Expand Down Expand Up @@ -460,7 +466,8 @@ where

// the auth_list for the target should comes from previous number
let metadata = self
.metadata
.get_metadata_handle(ctx.clone())
.await?
.get_metadata_by_block_number(block.header.number)?;

if !metadata.version.contains(block.header.number) {
Expand Down Expand Up @@ -620,14 +627,12 @@ where
mempool: Arc<M>,
storage: Arc<S>,
trie_db: Arc<DB>,
metadata: Arc<MetadataHandle>,
crypto: Arc<OverlordCrypto>,
) -> ProtocolResult<Self> {
Ok(OverlordConsensusAdapter {
network,
mempool,
storage,
metadata,
trie_db,
overlord_handler: RwLock::new(None),
crypto,
Expand All @@ -637,4 +642,17 @@ where
pub fn set_overlord_handler(&self, handler: OverlordHandler<Proposal>) {
*self.overlord_handler.write() = Some(handler)
}

async fn get_metadata_handle(&self, ctx: Context) -> ProtocolResult<MetadataHandle> {
let current_state_root = self.storage.get_latest_block_header(ctx).await?.state_root;
let root = AxonExecutorAdapter::from_root(
current_state_root,
Arc::clone(&self.trie_db),
Arc::clone(&self.storage),
Default::default(),
)?
.get_metadata_root();

Ok(MetadataHandle::new(root))
}
}
Loading

0 comments on commit de44cf5

Please sign in to comment.