Skip to content

Commit

Permalink
Fix undefined behavior on simultaneous write in daemon state file (#326)
Browse files Browse the repository at this point in the history
* undefined behavior on simultaneous read or write

* use handles

* let it finish

* POC

* add force write

* drop guard if not used

* should not be able to edit read-only

* clean ups and more tests

* update comment on Global Write State

* update comment for global write state

* remove var to fix tests

* Allow cloning locked state

* minimize mutex lock impact progress

* move channel to sender

* add some tests

* remove unused code

* fix tests

* add error handling

* add drop tests

* fix rest of the tests

* apply Robin review

* restore cw-orch-core

* restore tube

* Update cw-orch-daemon/src/state.rs

Co-authored-by: CyberHoward <88450409+CyberHoward@users.noreply.github.com>

* Update cw-orch-daemon/src/state.rs

Co-authored-by: CyberHoward <88450409+CyberHoward@users.noreply.github.com>

* Update outdated comment

* remove outdated comments

* post-merge fixes

* update changelog

* Nicoco nits

* private `state_path` and fix grpc connection

* need to unset state env

* fix write, for cases where new file smaller than original(for example flush)

* add flush where it's needed

* ignore toxic test

* Remove rt in clone testing mock state

* Changelog

---------

Co-authored-by: CyberHoward <88450409+CyberHoward@users.noreply.github.com>
Co-authored-by: Kayanski <kowalski.kowalskin@gmail.com>
  • Loading branch information
3 people authored Jun 6, 2024
1 parent 9c43514 commit 0a2f299
Show file tree
Hide file tree
Showing 24 changed files with 649 additions and 251 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,19 @@
- EXCITING FEATURE : Added an item and a map query method to be able to query cw-storage-plus structure outside of contracts easily
- Add `flush_state` method for Local Chain Daemons
- cw-orch-interchain now errors on checking transactions for IBC packets if NO packets were found
- `DaemonState` removed from `Sender`
- `Channel` moved from `DaemonState` to `Sender`
- `DaemonState` write-locks file unless it's read-only, meaning it will panic at initialization if other process holds lock of it
- `DaemonState` now can be safely used between threads and processes
- Two non-related Daemon's can't use same file for writing simultaneously (cloned or rebuilt are related Daemon)
- Writing to a file happens when all Daemon's that use same file dropped instead of hot writes
- `force_write` added to the `DaemonState` to allow force write of the state

### Breaking

- Daemon : Changed return types on daemon queriers to match Cosmwasm std types
- Cw-orch : Separate osmosis test tube from cw-orch. Its not available in its own crate `cw-orch-osmosis-test-tube`
- Clone-testing : Remove rt in Mock State creation (daemon doesn't need it anymore)

## 0.22.0

Expand Down
9 changes: 8 additions & 1 deletion cw-orch-daemon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ async-recursion = "1.0.5"
flate2 = { version = "1.0.26" }
lazy_static = "1.4.0"

# Lock daemon
file-lock = { version = "2.1.10" }
once_cell = { version = "1.19.0" }

[dev-dependencies]
cw-orch-daemon = { path = "." }
uid = "0.1.7"
Expand All @@ -80,9 +84,12 @@ duct = "0.13"
mock-contract = { path = "../contracts/mock_contract", features = [
"interface",
] }
serial_test = "3.0.0"
serial_test = { version = "3.0.0" }

# Ethereum deps
ethers-signers = { version = "2.0.7" }
ethers-core = { version = "2.0.7" }
tokio-test = "0.4.3"

# File lock test
nix = { version = "0.28.0", features = ["process"] }
4 changes: 2 additions & 2 deletions cw-orch-daemon/examples/daemon-capabilities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ pub fn main() -> anyhow::Result<()> {
std::env::set_var("LOCAL_MNEMONIC", LOCAL_MNEMONIC);

let network = networks::LOCAL_JUNO;
let daemon = DaemonBuilder::default().chain(network).build()?;
let mut daemon = DaemonBuilder::default().chain(network).build()?;

daemon.flush_state()?;

// We commit the tx (also resimulates the tx)
// ANCHOR: send_tx
let wallet = daemon.wallet();
let rt = daemon.rt_handle;
let rt = daemon.rt_handle.clone();
rt.block_on(wallet.bank_send("<address-of-my-sister>", coins(345, "ujunox")))?;
// ANCHOR_END: send_tx

Expand Down
57 changes: 49 additions & 8 deletions cw-orch-daemon/src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{
log::print_if_log_disabled, sender::SenderBuilder, sender::SenderOptions, DaemonAsync,
DaemonBuilder,
log::print_if_log_disabled,
sender::{SenderBuilder, SenderOptions},
DaemonAsync, DaemonBuilder, GrpcChannel,
};
use std::sync::Arc;

Expand Down Expand Up @@ -30,13 +31,17 @@ pub struct DaemonAsyncBuilder {
pub(crate) chain: Option<ChainInfoOwned>,
// # Optional
pub(crate) deployment_id: Option<String>,
pub(crate) state_path: Option<String>,

/* Sender related options */
/// Wallet sender
/// Will be used in priority when set
pub(crate) sender: Option<SenderBuilder<All>>,
/// Specify Daemon Sender Options
pub(crate) sender_options: SenderOptions,

/* Rebuilder related options */
pub(crate) state: Option<DaemonState>,
}

impl DaemonAsyncBuilder {
Expand Down Expand Up @@ -87,32 +92,66 @@ impl DaemonAsyncBuilder {
self
}

/// Specifies path to the daemon state file
/// Defaults to env variable.
///
/// Variable: STATE_FILE_ENV_NAME.
#[allow(unused)]
pub(crate) fn state_path(&mut self, path: impl ToString) -> &mut Self {
self.state_path = Some(path.to_string());
self
}

/// Build a daemon
pub async fn build(&self) -> Result<DaemonAsync, DaemonError> {
let chain = self
let chain_info = self
.chain
.clone()
.ok_or(DaemonError::BuilderMissing("chain information".into()))?;
let deployment_id = self
.deployment_id
.clone()
.unwrap_or(DEFAULT_DEPLOYMENT.to_string());
let state = Arc::new(DaemonState::new(chain, deployment_id, false).await?);

let state = match &self.state {
Some(state) => {
let mut state = state.clone();
state.chain_data = chain_info.clone();
state.deployment_id = deployment_id;
state
}
None => {
let json_file_path = self
.state_path
.clone()
.unwrap_or(DaemonState::state_file_path()?);

DaemonState::new(json_file_path, chain_info.clone(), deployment_id, false)?
}
};
// if mnemonic provided, use it. Else use env variables to retrieve mnemonic
let sender_options = self.sender_options.clone();

let sender = match self.sender.clone() {
Some(sender) => match sender {
SenderBuilder::Mnemonic(mnemonic) => {
Sender::from_mnemonic_with_options(&state, &mnemonic, sender_options)?
}
SenderBuilder::Mnemonic(mnemonic) => Sender::from_mnemonic_with_options(
chain_info.clone(),
GrpcChannel::connect(&chain_info.grpc_urls, &chain_info.chain_id).await?,
&mnemonic,
sender_options,
)?,
SenderBuilder::Sender(mut sender) => {
sender.set_options(self.sender_options.clone());
sender
}
},
None => Sender::new_with_options(&state, sender_options)?,
None => Sender::new_with_options(
chain_info.clone(),
GrpcChannel::connect(&chain_info.grpc_urls, &chain_info.chain_id).await?,
sender_options,
)?,
};

let daemon = DaemonAsync {
state,
sender: Arc::new(sender),
Expand All @@ -129,6 +168,8 @@ impl From<DaemonBuilder> for DaemonAsyncBuilder {
deployment_id: value.deployment_id,
sender_options: value.sender_options,
sender: value.sender,
state: value.state,
state_path: value.state_path,
}
}
}
6 changes: 6 additions & 0 deletions cw-orch-daemon/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ pub struct GrpcChannel {}
impl GrpcChannel {
/// Connect to any of the provided gRPC endpoints
pub async fn connect(grpc: &[String], chain_id: &str) -> Result<Channel, DaemonError> {
if grpc.is_empty() {
return Err(DaemonError::GRPCListIsEmpty);
}

let mut successful_connections = vec![];

for address in grpc.iter() {
Expand Down Expand Up @@ -98,6 +102,7 @@ mod tests {
use speculoos::prelude::*;

#[tokio::test]
#[serial_test::serial]
async fn no_connection() {
let mut chain = cw_orch_daemon::networks::LOCAL_JUNO;
let grpcs = &["https://127.0.0.1:99999"];
Expand All @@ -117,6 +122,7 @@ mod tests {
}

#[tokio::test]
#[serial_test::serial]
async fn network_grpcs_list_is_empty() {
let mut chain = cw_orch_daemon::networks::LOCAL_JUNO;
let grpcs = &[];
Expand Down
21 changes: 11 additions & 10 deletions cw-orch-daemon/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use std::{
fmt::Debug,
io::Write,
str::{from_utf8, FromStr},
sync::Arc,
time::Duration,
};

Expand Down Expand Up @@ -67,7 +66,7 @@ pub struct DaemonAsync {
/// Sender to send transactions to the chain
pub sender: Wallet,
/// State of the daemon
pub state: Arc<DaemonState>,
pub state: DaemonState,
}

impl DaemonAsync {
Expand All @@ -78,18 +77,18 @@ impl DaemonAsync {

/// Get the channel configured for this DaemonAsync.
pub fn channel(&self) -> Channel {
self.state.grpc_channel.clone()
self.sender.grpc_channel.clone()
}

/// Flushes all the state related to the current chain
/// Only works on Local networks
pub fn flush_state(&self) -> Result<(), DaemonError> {
pub fn flush_state(&mut self) -> Result<(), DaemonError> {
self.state.flush()
}
}

impl ChainState for DaemonAsync {
type Out = Arc<DaemonState>;
type Out = DaemonState;

fn state(&self) -> Self::Out {
self.state.clone()
Expand All @@ -106,11 +105,13 @@ impl DaemonAsync {
/// Returns a new [`DaemonAsyncBuilder`] with the current configuration.
/// Does not consume the original [`DaemonAsync`].
pub fn rebuild(&self) -> DaemonAsyncBuilder {
let mut builder = Self::builder();
let mut builder = DaemonAsyncBuilder {
state: Some(self.state()),
..Default::default()
};
builder
.chain(self.state().chain_data.clone())
.sender((*self.sender).clone())
.deployment_id(&self.state().deployment_id);
.chain(self.sender.chain_info.clone())
.sender((*self.sender).clone());
builder
}

Expand Down Expand Up @@ -287,7 +288,7 @@ impl DaemonAsync {
_uploadable: &T,
) -> Result<CosmTxResponse, DaemonError> {
let sender = &self.sender;
let wasm_path = <T as Uploadable>::wasm(&self.state.chain_data);
let wasm_path = <T as Uploadable>::wasm(&self.sender.chain_info);

log::debug!(target: &transaction_target(), "Uploading file at {:?}", wasm_path);

Expand Down
7 changes: 4 additions & 3 deletions cw-orch-daemon/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,16 @@ pub enum DaemonError {
InsufficientFee(String),
#[error("Not enough balance, expected {expected}, found {current}")]
NotEnoughBalance { expected: Coin, current: Coin },
#[error("Can't set the daemon state, it's read-only")]
StateReadOnly,
#[error("Can't set the daemon state, it's read-only {0}")]
StateReadOnly(String),
#[error("You need to pass a runtime to the querier object to do synchronous queries. Use daemon.querier instead")]
QuerierNeedRuntime,
#[error(transparent)]
Instantiate2Error(#[from] Instantiate2AddressError),

#[error("Error opening file {0},err: ({1})")]
OpenFile(String, String),
#[error("State file {0} already locked, use another state file or clone daemon which holds the lock")]
StateAlreadyLocked(String),
}

impl DaemonError {
Expand Down
59 changes: 0 additions & 59 deletions cw-orch-daemon/src/json_file.rs

This file was deleted.

Loading

0 comments on commit 0a2f299

Please sign in to comment.