Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix undefined behavior on simultaneous write in daemon state file #326

Merged
merged 42 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
d6776da
undefined behavior on simultaneous read or write
Buckram123 Feb 12, 2024
24950a3
use handles
Buckram123 Feb 12, 2024
b40fcf5
let it finish
Buckram123 Feb 12, 2024
22d5f3f
POC
Buckram123 Feb 13, 2024
d743384
add force write
Buckram123 Feb 13, 2024
56da24f
drop guard if not used
Buckram123 Feb 14, 2024
0f301a6
should not be able to edit read-only
Buckram123 Feb 14, 2024
acd8d75
Merge remote-tracking branch 'origin/main' into daemon-state-ub
Buckram123 Apr 19, 2024
687c079
clean ups and more tests
Buckram123 Apr 19, 2024
fa74aaa
update comment on Global Write State
Buckram123 Apr 22, 2024
7d4e19f
update comment for global write state
Buckram123 Apr 22, 2024
0a856c7
Merge branch 'main' into daemon-state-ub
Buckram123 Apr 22, 2024
a2d860c
remove var to fix tests
Buckram123 Apr 22, 2024
8bdd5b9
Merge remote-tracking branch 'origin/main' into daemon-state-ub
Buckram123 May 3, 2024
a6b881f
Allow cloning locked state
Buckram123 May 6, 2024
eb23921
minimize mutex lock impact progress
Buckram123 May 6, 2024
ee5b4c2
move channel to sender
Buckram123 May 7, 2024
2c9b040
add some tests
Buckram123 May 7, 2024
b8259e8
remove unused code
Buckram123 May 7, 2024
f793b18
Merge remote-tracking branch 'origin/main' into daemon-state-ub
Buckram123 May 7, 2024
fafd27a
fix tests
Buckram123 May 7, 2024
d4f8e46
add error handling
Buckram123 May 7, 2024
d8d44ca
add drop tests
Buckram123 May 7, 2024
79a36b7
fix rest of the tests
Buckram123 May 7, 2024
d84d515
apply Robin review
Buckram123 May 9, 2024
e923332
restore cw-orch-core
Buckram123 May 9, 2024
9cdce4f
restore tube
Buckram123 May 9, 2024
ee25335
Update cw-orch-daemon/src/state.rs
Buckram123 May 15, 2024
2370365
Update cw-orch-daemon/src/state.rs
Buckram123 May 15, 2024
baed9b1
Update outdated comment
Buckram123 Jun 4, 2024
85de796
remove outdated comments
Buckram123 Jun 4, 2024
4eda129
Merge remote-tracking branch 'origin/main' into daemon-state-ub
Buckram123 Jun 4, 2024
c20f685
post-merge fixes
Buckram123 Jun 4, 2024
e3d489e
update changelog
Buckram123 Jun 4, 2024
4d84f51
Nicoco nits
Buckram123 Jun 5, 2024
9d259ed
private `state_path` and fix grpc connection
Buckram123 Jun 5, 2024
b9f54a1
need to unset state env
Buckram123 Jun 5, 2024
eadc089
fix write, for cases where new file smaller than original(for example…
Buckram123 Jun 5, 2024
0ff395f
add flush where it's needed
Buckram123 Jun 5, 2024
a795bbe
ignore toxic test
Buckram123 Jun 5, 2024
dfa14fa
Remove rt in clone testing mock state
Kayanski Jun 6, 2024
7eae16a
Changelog
Kayanski Jun 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cw-orch-daemon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ async-recursion = "1.0.5"
flate2 = { version = "1.0.26" }
lazy_static = "1.4.0"

# Lock daemon
file-lock = { version = "2.1.10" }
once_cell = { version = "1.19.0" }

[dev-dependencies]
cw-orch-daemon = { path = "." }
uid = "0.1.7"
Expand All @@ -87,3 +91,6 @@ serial_test = "3.0.0"
ethers-signers = { version = "2.0.7" }
ethers-core = { version = "2.0.7" }
tokio-test = "0.4.3"

# File lock test
nix = { version = "0.28.0", features = ["process"] }
119 changes: 76 additions & 43 deletions cw-orch-daemon/src/json_file.rs
Original file line number Diff line number Diff line change
@@ -1,54 +1,87 @@
use crate::DaemonError;
use file_lock::{FileLock, FileOptions};
use serde_json::{from_reader, json, Value};
use std::{
fs::{File, OpenOptions},
path::PathBuf,
str::FromStr,
};

pub fn write(filename: &String, chain_id: &String, network_id: &String, deploy_id: &String) {
// open file pointer set read/write permissions to true
// create it if it does not exists
// dont truncate it
// Create the directory if they do not exist
let file_buf = PathBuf::from_str(filename).unwrap();
if let Some(parent) = file_buf.parent() {
let _ = std::fs::create_dir_all(parent);
use std::{fs::File, io::Seek};

/// State file reader and writer
/// Mainly used by [`crate::Daemon`] and [`crate::DaemonAsync`], but could also be used for tests or custom edits of the state
pub struct JsonFileState {
lock: FileLock,
json: Value,
}

impl JsonFileState {
/// Lock a state files
/// Other process won't be able to lock it
pub fn new(filename: &str) -> Self {
// open file pointer set read/write permissions to true
// create it if it does not exists
// dont truncate it

let options = FileOptions::new()
.create(true)
.read(true)
.write(true)
.truncate(false);

// Lock file, non blocking so it errors in case someone else already holding lock of it
let lock: FileLock = FileLock::lock(filename, false, options)
.unwrap_or_else(|_| panic!("Was not able to receive {filename} state lock"));

// return empty json object if file is empty
// return file content if not
let json: Value = if lock.file.metadata().unwrap().len().eq(&0) {
json!({})
} else {
from_reader(&lock.file).unwrap()
};

JsonFileState { lock, json }
}

/// Prepare json for further writes
pub fn prepare(&mut self, chain_id: &str, network_id: &str, deploy_id: &str) {
let json = &mut self.json;
// check and add network_id path if it's missing
if json.get(network_id).is_none() {
json[network_id] = json!({});
}

// add deployment_id to chain_id path
if json[network_id].get(chain_id).is_none() {
json[network_id][chain_id] = json!({
deploy_id: {},
"code_ids": {}
});
}
}

pub fn state(&self) -> Value {
self.json.clone()
}

let file = OpenOptions::new()
.create(true)
.read(true)
.write(true)
.truncate(false)
.open(filename.clone())
.unwrap();

// return empty json object if file is empty
// return file content if not
let mut json: Value = if file.metadata().unwrap().len().eq(&0) {
json!({})
} else {
from_reader(file).unwrap()
};

// check and add network_id path if it's missing
if json.get(network_id).is_none() {
json[network_id] = json!({});
/// Get a value for read
Buckram123 marked this conversation as resolved.
Show resolved Hide resolved
pub fn get(&self, chain_id: &str, network_id: &str) -> &Value {
&self.json[network_id][chain_id]
}

// add deployment_id to chain_id path
if json[network_id].get(chain_id).is_none() {
json[network_id][chain_id] = json!({
deploy_id: {},
"code_ids": {}
});
/// Give a value to write
pub fn get_mut(&mut self, chain_id: &str, network_id: &str) -> &mut Value {
self.json[network_id].get_mut(chain_id).unwrap()
}

// write JSON data
// use File::create so we dont append data to the file
// but rather write all (because we have read the data before)
serde_json::to_writer_pretty(File::create(filename).unwrap(), &json).unwrap();
/// Force write to a file
pub fn force_write(&mut self) {
self.lock.file.rewind().unwrap();
serde_json::to_writer_pretty(&self.lock.file, &self.json).unwrap();
}
}

// Write json when dropping
impl Drop for JsonFileState {
fn drop(&mut self) {
self.force_write()
}
}

pub fn read(filename: &String) -> Result<Value, DaemonError> {
Expand Down
2 changes: 1 addition & 1 deletion cw-orch-daemon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ pub mod builder;
pub mod channel;
pub mod core;
pub mod error;
pub(crate) mod json_file;
pub mod json_file;
/// Proto types for different blockchains
pub mod proto;
pub mod sender;
Expand Down
119 changes: 100 additions & 19 deletions cw-orch-daemon/src/state.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::error::DaemonError;
use crate::{channel::GrpcChannel, networks::ChainKind};
use crate::{channel::GrpcChannel, json_file::JsonFileState, networks::ChainKind};

use cosmwasm_std::Addr;
use cw_orch_core::{
Expand All @@ -9,25 +9,51 @@ use cw_orch_core::{
CwEnvError, CwOrchEnvVars,
};
use ibc_chain_registry::chain::ChainData;
use once_cell::sync::Lazy;
use serde::Serialize;
use serde_json::{json, Value};
use std::{collections::HashMap, fs::File, path::Path};
use std::{collections::HashMap, path::Path, sync::Mutex};
use tonic::transport::Channel;

pub(crate) static GLOBAL_WRITE_STATE: Lazy<Mutex<HashMap<String, (u64, JsonFileState)>>> =
Buckram123 marked this conversation as resolved.
Show resolved Hide resolved
Lazy::new(|| Mutex::new(HashMap::new()));

/// Stores the chain information and deployment state.
/// Uses a simple JSON file to store the deployment information locally.
#[derive(Clone, Debug)]
#[derive(Debug)]
pub struct DaemonState {
/// this is passed via env var STATE_FILE
pub json_file_path: String,
json_file_path: String,
/// Deployment identifier
pub deployment_id: String,
/// gRPC channel
pub grpc_channel: Channel,
/// Information about the chain
pub chain_data: ChainData,
/// Flag to set the daemon state readonly and not pollute the env file
pub read_only: bool,
read_only: bool,
Buckram123 marked this conversation as resolved.
Show resolved Hide resolved
}

// On clone increase lock count so we track how many daemon states using the file
impl Clone for DaemonState {
fn clone(&self) -> Self {
let new_self = Self {
json_file_path: self.json_file_path.clone(),
deployment_id: self.deployment_id.clone(),
grpc_channel: self.grpc_channel.clone(),
chain_data: self.chain_data.clone(),
read_only: self.read_only,
};

// Increase DaemonStates count for this file
if !self.read_only {
let mut lock = GLOBAL_WRITE_STATE.lock().unwrap();
let (count, _) = lock.get_mut(&self.json_file_path).unwrap();
*count += 1;
}

new_self
}
}

impl DaemonState {
Expand Down Expand Up @@ -85,7 +111,7 @@ impl DaemonState {

// build daemon state
let state = DaemonState {
json_file_path,
json_file_path: json_file_path.clone(),
deployment_id,
grpc_channel,
chain_data,
Expand All @@ -99,10 +125,23 @@ impl DaemonState {
state.json_file_path
);

// write json state file
crate::json_file::write(
&state.json_file_path,
&state.chain_data.chain_id.to_string(),
let mut lock = GLOBAL_WRITE_STATE.lock().unwrap();
// Lock file if first time write
let file_state = match lock.entry(json_file_path.clone()) {
// Increase count if already locked this file
std::collections::hash_map::Entry::Occupied(o) => {
let (count, lock) = o.into_mut();
*count += 1;
lock
}
// Insert as 1 count of DaemonStates pointing to this file if it's first open
std::collections::hash_map::Entry::Vacant(v) => {
let (_, lock) = v.insert((1, JsonFileState::new(&json_file_path)));
lock
}
};
file_state.prepare(
state.chain_data.chain_id.as_str(),
&state.chain_data.chain_name,
&state.deployment_id,
);
Expand Down Expand Up @@ -146,15 +185,30 @@ impl DaemonState {

Ok(state_file_path)
}
/// Get the state filepath and read it as json
fn read_state(&self) -> Result<serde_json::Value, DaemonError> {
crate::json_file::read(&self.json_file_path)

/// Get the chain state as json
fn chain_state(&self) -> Result<serde_json::Value, DaemonError> {
// Check if already open in write mode {
let lock = GLOBAL_WRITE_STATE.lock().unwrap();
Buckram123 marked this conversation as resolved.
Show resolved Hide resolved
if let Some((_, j)) = lock.get(&self.json_file_path) {
Ok(j.get(
&self.chain_data.chain_name,
self.chain_data.chain_id.as_str(),
)
.clone())
} else {
// drop guard if not found, since reading may take a while
drop(lock);
// Or just read it from a file
crate::json_file::read(&self.json_file_path)
.map(|j| j[self.chain_data.chain_id.as_str()][&self.chain_data.chain_name].clone())
}
}

/// Retrieve a stateful value using the chainId and networkId
pub fn get(&self, key: &str) -> Result<Value, DaemonError> {
let json = self.read_state()?;
Ok(json[&self.chain_data.chain_name][&self.chain_data.chain_id.to_string()][key].clone())
let json = self.chain_state()?;
Ok(json[key].clone())
Buckram123 marked this conversation as resolved.
Show resolved Hide resolved
}

/// Set a stateful value using the chainId and networkId
Expand All @@ -168,16 +222,43 @@ impl DaemonState {
return Err(DaemonError::StateReadOnly);
}

let mut json = self.read_state()?;
let mut lock = GLOBAL_WRITE_STATE.lock().unwrap();

let (_, file_state) = lock.get_mut(&self.json_file_path).unwrap();
let val = file_state.get_mut(
self.chain_data.chain_id.as_str(),
&self.chain_data.chain_name,
);
val[key][contract_id] = json!(value);

json[&self.chain_data.chain_name][&self.chain_data.chain_id.to_string()][key]
[contract_id] = json!(value);
Ok(())
}

serde_json::to_writer_pretty(File::create(&self.json_file_path).unwrap(), &json)?;
/// Forcefully write current json to a file
pub fn force_write(&self) -> Result<(), DaemonError> {
let mut lock = GLOBAL_WRITE_STATE.lock().unwrap();
let (_, file_state) = lock.get_mut(&self.json_file_path).unwrap();
file_state.force_write();
Ok(())
}
}

// Manual drop implementation to write state when no daemon states uses the file
impl Drop for DaemonState {
fn drop(&mut self) {
let mut lock = GLOBAL_WRITE_STATE.lock().unwrap();

// Decrease open count
let (count, _) = lock.get_mut(&self.json_file_path).unwrap();
*count -= 1;

// If we get to zero count - write to a file
if *count == 0 {
lock.remove(&self.json_file_path);
}
}
}

impl StateInterface for DaemonState {
/// Read address for contract in deployment id from state file
fn get_address(&self, contract_id: &str) -> Result<Addr, CwEnvError> {
Expand Down
Loading
Loading