Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix undefined behavior on simultaneous write in daemon state file #326

Merged
merged 42 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
d6776da
undefined behavior on simultaneous read or write
Buckram123 Feb 12, 2024
24950a3
use handles
Buckram123 Feb 12, 2024
b40fcf5
let it finish
Buckram123 Feb 12, 2024
22d5f3f
POC
Buckram123 Feb 13, 2024
d743384
add force write
Buckram123 Feb 13, 2024
56da24f
drop guard if not used
Buckram123 Feb 14, 2024
0f301a6
should not be able to edit read-only
Buckram123 Feb 14, 2024
acd8d75
Merge remote-tracking branch 'origin/main' into daemon-state-ub
Buckram123 Apr 19, 2024
687c079
clean ups and more tests
Buckram123 Apr 19, 2024
fa74aaa
update comment on Global Write State
Buckram123 Apr 22, 2024
7d4e19f
update comment for global write state
Buckram123 Apr 22, 2024
0a856c7
Merge branch 'main' into daemon-state-ub
Buckram123 Apr 22, 2024
a2d860c
remove var to fix tests
Buckram123 Apr 22, 2024
8bdd5b9
Merge remote-tracking branch 'origin/main' into daemon-state-ub
Buckram123 May 3, 2024
a6b881f
Allow cloning locked state
Buckram123 May 6, 2024
eb23921
minimize mutex lock impact progress
Buckram123 May 6, 2024
ee5b4c2
move channel to sender
Buckram123 May 7, 2024
2c9b040
add some tests
Buckram123 May 7, 2024
b8259e8
remove unused code
Buckram123 May 7, 2024
f793b18
Merge remote-tracking branch 'origin/main' into daemon-state-ub
Buckram123 May 7, 2024
fafd27a
fix tests
Buckram123 May 7, 2024
d4f8e46
add error handling
Buckram123 May 7, 2024
d8d44ca
add drop tests
Buckram123 May 7, 2024
79a36b7
fix rest of the tests
Buckram123 May 7, 2024
d84d515
apply Robin review
Buckram123 May 9, 2024
e923332
restore cw-orch-core
Buckram123 May 9, 2024
9cdce4f
restore tube
Buckram123 May 9, 2024
ee25335
Update cw-orch-daemon/src/state.rs
Buckram123 May 15, 2024
2370365
Update cw-orch-daemon/src/state.rs
Buckram123 May 15, 2024
baed9b1
Update outdated comment
Buckram123 Jun 4, 2024
85de796
remove outdated comments
Buckram123 Jun 4, 2024
4eda129
Merge remote-tracking branch 'origin/main' into daemon-state-ub
Buckram123 Jun 4, 2024
c20f685
post-merge fixes
Buckram123 Jun 4, 2024
e3d489e
update changelog
Buckram123 Jun 4, 2024
4d84f51
Nicoco nits
Buckram123 Jun 5, 2024
9d259ed
private `state_path` and fix grpc connection
Buckram123 Jun 5, 2024
b9f54a1
need to unset state env
Buckram123 Jun 5, 2024
eadc089
fix write, for cases where new file smaller than original(for example…
Buckram123 Jun 5, 2024
0ff395f
add flush where it's needed
Buckram123 Jun 5, 2024
a795bbe
ignore toxic test
Buckram123 Jun 5, 2024
dfa14fa
Remove rt in clone testing mock state
Kayanski Jun 6, 2024
7eae16a
Changelog
Kayanski Jun 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion cw-orch-daemon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ async-recursion = "1.0.5"
flate2 = { version = "1.0.26" }
lazy_static = "1.4.0"

# Lock daemon
file-lock = { version = "2.1.10" }
once_cell = { version = "1.19.0" }

[dev-dependencies]
cw-orch-daemon = { path = "." }
uid = "0.1.7"
Expand All @@ -80,9 +84,12 @@ duct = "0.13"
mock-contract = { path = "../contracts/mock_contract", features = [
"interface",
] }
serial_test = "3.0.0"
serial_test = { version = "3.0.0" }

# Ethereum deps
ethers-signers = { version = "2.0.7" }
ethers-core = { version = "2.0.7" }
tokio-test = "0.4.3"

# File lock test
nix = { version = "0.28.0", features = ["process"] }
2 changes: 1 addition & 1 deletion cw-orch-daemon/examples/daemon-capabilities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub fn main() -> anyhow::Result<()> {
// We commit the tx (also resimulates the tx)
// ANCHOR: send_tx
let wallet = daemon.wallet();
let rt = daemon.rt_handle;
let rt = daemon.rt_handle.clone();
rt.block_on(wallet.bank_send("<address-of-my-sister>", coins(345, "ujunox")))?;
// ANCHOR_END: send_tx

Expand Down
63 changes: 54 additions & 9 deletions cw-orch-daemon/src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use crate::{
log::print_if_log_disabled, sender::SenderBuilder, sender::SenderOptions, DaemonAsync,
DaemonBuilder,
log::print_if_log_disabled,
sender::{SenderBuilder, SenderOptions},
DaemonAsync, DaemonBuilder, GrpcChannel,
};
use std::sync::Arc;

use bitcoin::secp256k1::All;

use super::{error::DaemonError, sender::Sender, state::DaemonState};
use cw_orch_core::environment::ChainInfoOwned;
use cw_orch_core::{environment::ChainInfoOwned, log::connectivity_target};

/// The default deployment id if none is provided
pub const DEFAULT_DEPLOYMENT: &str = "default";
Expand All @@ -30,13 +31,17 @@ pub struct DaemonAsyncBuilder {
pub(crate) chain: Option<ChainInfoOwned>,
// # Optional
pub(crate) deployment_id: Option<String>,
pub(crate) state_path: Option<String>,

/* Sender related options */
/// Wallet sender
/// Will be used in priority when set
pub(crate) sender: Option<SenderBuilder<All>>,
/// Specify Daemon Sender Options
pub(crate) sender_options: SenderOptions,

/* Rebuilder related options */
pub(crate) state: Option<DaemonState>,
}

impl DaemonAsyncBuilder {
Expand Down Expand Up @@ -87,32 +92,70 @@ impl DaemonAsyncBuilder {
self
}

/// Specifies path to the daemon state file
/// Defaults to env variable.
///
/// Variable: STATE_FILE_ENV_NAME.
///
/// This field is ignored for rebuilt daemon and path of the original daemon used instead
pub fn state_path(&mut self, path: impl ToString) -> &mut Self {
Kayanski marked this conversation as resolved.
Show resolved Hide resolved
self.state_path = Some(path.to_string());
self
}

/// Build a daemon
pub async fn build(&self) -> Result<DaemonAsync, DaemonError> {
let chain = self
let chain_info = self
.chain
.clone()
.ok_or(DaemonError::BuilderMissing("chain information".into()))?;
let deployment_id = self
.deployment_id
.clone()
.unwrap_or(DEFAULT_DEPLOYMENT.to_string());
let state = Arc::new(DaemonState::new(chain, deployment_id, false).await?);

if chain_info.grpc_urls.is_empty() {
Buckram123 marked this conversation as resolved.
Show resolved Hide resolved
return Err(DaemonError::GRPCListIsEmpty);
}

log::debug!(target: &connectivity_target(), "Found {} gRPC endpoints", chain_info.grpc_urls.len());
Buckram123 marked this conversation as resolved.
Show resolved Hide resolved

// find working grpc channel
let grpc_channel =
GrpcChannel::connect(&chain_info.grpc_urls, &chain_info.chain_id).await?;

let state = match &self.state {
Some(state) => state.clone(),
None => {
// If the path is relative, we dis-ambiguate it and take the root at $HOME/$CW_ORCH_STATE_FOLDER
Buckram123 marked this conversation as resolved.
Show resolved Hide resolved
let json_file_path = self
.state_path
.clone()
.unwrap_or(DaemonState::state_file_path()?);

DaemonState::new(json_file_path, chain_info.clone(), deployment_id, false)?
}
};
// if mnemonic provided, use it. Else use env variables to retrieve mnemonic
let sender_options = self.sender_options.clone();

let sender = match self.sender.clone() {
Some(sender) => match sender {
SenderBuilder::Mnemonic(mnemonic) => {
Sender::from_mnemonic_with_options(&state, &mnemonic, sender_options)?
}
SenderBuilder::Mnemonic(mnemonic) => Sender::from_mnemonic_with_options(
chain_info.clone(),
grpc_channel,
&mnemonic,
sender_options,
)?,
SenderBuilder::Sender(mut sender) => {
sender.set_options(self.sender_options.clone());
sender.grpc_channel = grpc_channel;
sender
}
},
None => Sender::new_with_options(&state, sender_options)?,
None => Sender::new_with_options(chain_info.clone(), grpc_channel, sender_options)?,
};

let daemon = DaemonAsync {
state,
sender: Arc::new(sender),
Expand All @@ -129,6 +172,8 @@ impl From<DaemonBuilder> for DaemonAsyncBuilder {
deployment_id: value.deployment_id,
sender_options: value.sender_options,
sender: value.sender,
state: value.state,
state_path: value.state_path,
}
}
}
19 changes: 10 additions & 9 deletions cw-orch-daemon/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use std::{
fmt::Debug,
io::Write,
str::{from_utf8, FromStr},
sync::Arc,
time::Duration,
};

Expand Down Expand Up @@ -67,7 +66,7 @@ pub struct DaemonAsync {
/// Sender to send transactions to the chain
pub sender: Wallet,
/// State of the daemon
pub state: Arc<DaemonState>,
pub state: DaemonState,
}

impl DaemonAsync {
Expand All @@ -78,12 +77,12 @@ impl DaemonAsync {

/// Get the channel configured for this DaemonAsync.
pub fn channel(&self) -> Channel {
self.state.grpc_channel.clone()
self.sender.grpc_channel.clone()
}
}

impl ChainState for DaemonAsync {
type Out = Arc<DaemonState>;
type Out = DaemonState;

fn state(&self) -> Self::Out {
self.state.clone()
Expand All @@ -100,11 +99,13 @@ impl DaemonAsync {
/// Returns a new [`DaemonAsyncBuilder`] with the current configuration.
/// Does not consume the original [`DaemonAsync`].
pub fn rebuild(&self) -> DaemonAsyncBuilder {
let mut builder = Self::builder();
let mut builder = DaemonAsyncBuilder {
state: Some(self.state()),
..Default::default()
};
builder
.chain(self.state().chain_data.clone())
.sender((*self.sender).clone())
.deployment_id(&self.state().deployment_id);
.chain(self.sender.chain_info.clone())
.sender((*self.sender).clone());
builder
}

Expand Down Expand Up @@ -281,7 +282,7 @@ impl DaemonAsync {
_uploadable: &T,
) -> Result<CosmTxResponse, DaemonError> {
let sender = &self.sender;
let wasm_path = <T as Uploadable>::wasm(&self.state.chain_data);
let wasm_path = <T as Uploadable>::wasm(&self.sender.chain_info);

log::debug!(target: &transaction_target(), "Uploading file at {:?}", wasm_path);

Expand Down
6 changes: 4 additions & 2 deletions cw-orch-daemon/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,14 @@ pub enum DaemonError {
InsufficientFee(String),
#[error("Not enough balance, expected {expected}, found {current}")]
NotEnoughBalance { expected: Coin, current: Coin },
#[error("Can't set the daemon state, it's read-only")]
StateReadOnly,
#[error("Can't set the daemon state, it's read-only {0}")]
StateReadOnly(String),
#[error("You need to pass a runtime to the querier object to do synchronous queries. Use daemon.querier instead")]
QuerierNeedRuntime,
#[error(transparent)]
Instantiate2Error(#[from] Instantiate2AddressError),
#[error("State file {0} already locked, use another state file or clone daemon which holds the lock")]
StateAlreadyLocked(String),
}

impl DaemonError {
Expand Down
59 changes: 0 additions & 59 deletions cw-orch-daemon/src/json_file.rs

This file was deleted.

104 changes: 104 additions & 0 deletions cw-orch-daemon/src/json_lock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
use crate::DaemonError;
use file_lock::{FileLock, FileOptions};
use serde_json::{from_reader, json, Value};
use std::{fs::File, io::Seek};

/// State file reader and writer
/// Mainly used by [`crate::Daemon`] and [`crate::DaemonAsync`], but could also be used for tests or custom edits of the state
#[derive(Debug)]
pub struct JsonLockedState {
lock: FileLock,
json: Value,
path: String,
}

impl JsonLockedState {
/// Lock a state files
/// Other process won't be able to lock it
pub fn new(path: &str) -> Self {
// open file pointer set read/write permissions to true
// create it if it does not exists
// don't truncate it

let options = FileOptions::new()
.create(true)
.read(true)
.write(true)
.truncate(false);

// Lock file, non blocking so it errors in case someone else already holding lock of it
let lock: FileLock = FileLock::lock(path, false, options)
.unwrap_or_else(|_| panic!("Was not able to receive {path} state lock"));

// return empty json object if file is empty
// return file content if not
let json: Value = if lock.file.metadata().unwrap().len().eq(&0) {
json!({})
} else {
from_reader(&lock.file).unwrap()
};

let filename = path.to_owned();

JsonLockedState {
lock,
json,
path: filename,
}
}

/// Prepare json for further writes
pub fn prepare(&mut self, chain_id: &str, network_id: &str, deploy_id: &str) {
let json = &mut self.json;
// check and add network_id path if it's missing
if json.get(network_id).is_none() {
json[network_id] = json!({});
}

// add deployment_id to chain_id path
if json[network_id].get(chain_id).is_none() {
json[network_id][chain_id] = json!({
deploy_id: {},
"code_ids": {}
});
}
}

pub fn state(&self) -> Value {
self.json.clone()
}

/// Get a value for read
pub fn get(&self, chain_id: &str, network_id: &str) -> &Value {
&self.json[network_id][chain_id]
}

/// Give a value to write
pub fn get_mut(&mut self, chain_id: &str, network_id: &str) -> &mut Value {
self.json[network_id].get_mut(chain_id).unwrap()
}

/// Force write to a file
pub fn force_write(&mut self) {
self.lock.file.rewind().unwrap();
serde_json::to_writer_pretty(&self.lock.file, &self.json).unwrap();
}

pub fn path(&self) -> &str {
&self.path
}
}

// Write json when dropping
impl Drop for JsonLockedState {
fn drop(&mut self) {
self.force_write()
}
}

pub fn read(filename: &String) -> Result<Value, DaemonError> {
let file =
File::open(filename).unwrap_or_else(|_| panic!("File should be present at {}", filename));
let json: serde_json::Value = from_reader(file)?;
Ok(json)
}
Loading