Skip to content

Commit

Permalink
Merge branch 'main' into docs_and_nodejs
Browse files Browse the repository at this point in the history
  • Loading branch information
dirvine authored Dec 31, 2024
2 parents 5658aea + c4d769f commit 3374a15
Show file tree
Hide file tree
Showing 25 changed files with 213 additions and 225 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ uv.lock

/vendor/


# Node.js
node_modules/

Expand Down
1 change: 0 additions & 1 deletion ant-networking/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ version = "0.3.1"

[features]
default = []
encrypt-records = []
local = ["libp2p/mdns"]
loud = []
open-metrics = ["libp2p/metrics", "prometheus-client", "hyper", "sysinfo"]
Expand Down
4 changes: 2 additions & 2 deletions ant-networking/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -664,14 +664,14 @@ impl SwarmDriver {
RecordKind::Chunk => RecordType::Chunk,
RecordKind::Scratchpad => RecordType::Scratchpad,
RecordKind::Pointer => RecordType::Pointer,
RecordKind::LinkedList | RecordKind::Register => {
RecordKind::GraphEntry | RecordKind::Register => {
let content_hash = XorName::from_content(&record.value);
RecordType::NonChunk(content_hash)
}
RecordKind::ChunkWithPayment
| RecordKind::RegisterWithPayment
| RecordKind::PointerWithPayment
| RecordKind::LinkedListWithPayment
| RecordKind::GraphEntryWithPayment
| RecordKind::ScratchpadWithPayment => {
error!("Record {record_key:?} with payment shall not be stored locally.");
return Err(NetworkError::InCorrectRecordHeader);
Expand Down
9 changes: 4 additions & 5 deletions ant-networking/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

use ant_protocol::storage::LinkedListAddress;
use ant_protocol::storage::GraphEntryAddress;
use ant_protocol::{messages::Response, storage::RecordKind, NetworkAddress, PrettyPrintRecordKey};
use libp2p::{
kad::{self, QueryId, Record},
Expand Down Expand Up @@ -123,14 +123,13 @@ pub enum NetworkError {
#[error("Record header is incorrect")]
InCorrectRecordHeader,


// ---------- Chunk Errors
#[error("Failed to verify the ChunkProof with the provided quorum")]
FailedToVerifyChunkProof(NetworkAddress),

// ---------- LinkedList Errors
#[error("Linked list not found: {0:?}")]
NoLinkedListFoundInsideRecord(LinkedListAddress),
// ---------- Graph Errors
#[error("Graph entry not found: {0:?}")]
NoGraphEntryFoundInsideRecord(GraphEntryAddress),

// ---------- Store Error
#[error("No Store Cost Responses")]
Expand Down
10 changes: 5 additions & 5 deletions ant-networking/src/event/kad.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
// permissions and limitations relating to use of the SAFE Network Software.

use crate::{
driver::PendingGetClosestType, get_linked_list_from_record, get_quorum_value,
driver::PendingGetClosestType, get_graph_entry_from_record, get_quorum_value,
target_arch::Instant, GetRecordCfg, GetRecordError, NetworkError, Result, SwarmDriver,
CLOSE_GROUP_SIZE,
};
use ant_protocol::{
storage::{try_serialize_record, LinkedList, RecordKind},
storage::{try_serialize_record, GraphEntry, RecordKind},
NetworkAddress, PrettyPrintRecordKey,
};
use itertools::Itertools;
Expand Down Expand Up @@ -399,7 +399,7 @@ impl SwarmDriver {
debug!("For record {pretty_key:?} task {query_id:?}, fetch completed with split record");
let mut accumulated_transactions = BTreeSet::new();
for (record, _) in result_map.values() {
match get_linked_list_from_record(record) {
match get_graph_entry_from_record(record) {
Ok(transactions) => {
accumulated_transactions.extend(transactions);
}
Expand All @@ -412,11 +412,11 @@ impl SwarmDriver {
info!("For record {pretty_key:?} task {query_id:?}, found split record for a transaction, accumulated and sending them as a single record");
let accumulated_transactions = accumulated_transactions
.into_iter()
.collect::<Vec<LinkedList>>();
.collect::<Vec<GraphEntry>>();

let bytes = try_serialize_record(
&accumulated_transactions,
RecordKind::LinkedList,
RecordKind::GraphEntry,
)?;

let new_accumulated_record = Record {
Expand Down
20 changes: 10 additions & 10 deletions ant-networking/src/linked_list.rs → ant-networking/src/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@
// permissions and limitations relating to use of the SAFE Network Software.

use crate::{driver::GetRecordCfg, Network, NetworkError, Result};
use ant_protocol::storage::{LinkedList, LinkedListAddress};
use ant_protocol::storage::{GraphEntry, GraphEntryAddress};
use ant_protocol::{
storage::{try_deserialize_record, RecordHeader, RecordKind, RetryStrategy},
NetworkAddress, PrettyPrintRecordKey,
};
use libp2p::kad::{Quorum, Record};

impl Network {
/// Gets LinkedList at LinkedListAddress from the Network.
pub async fn get_linked_list(&self, address: LinkedListAddress) -> Result<Vec<LinkedList>> {
let key = NetworkAddress::from_linked_list_address(address).to_record_key();
/// Gets GraphEntry at GraphEntryAddress from the Network.
pub async fn get_graph_entry(&self, address: GraphEntryAddress) -> Result<Vec<GraphEntry>> {
let key = NetworkAddress::from_graph_entry_address(address).to_record_key();
let get_cfg = GetRecordCfg {
get_quorum: Quorum::All,
retry_strategy: Some(RetryStrategy::Quick),
Expand All @@ -31,20 +31,20 @@ impl Network {
PrettyPrintRecordKey::from(&record.key)
);

get_linked_list_from_record(&record)
get_graph_entry_from_record(&record)
}
}

pub fn get_linked_list_from_record(record: &Record) -> Result<Vec<LinkedList>> {
pub fn get_graph_entry_from_record(record: &Record) -> Result<Vec<GraphEntry>> {
let header = RecordHeader::from_record(record)?;
if let RecordKind::LinkedList = header.kind {
let transactions = try_deserialize_record::<Vec<LinkedList>>(record)?;
if let RecordKind::GraphEntry = header.kind {
let transactions = try_deserialize_record::<Vec<GraphEntry>>(record)?;
Ok(transactions)
} else {
warn!(
"RecordKind mismatch while trying to retrieve linked_list from record {:?}",
"RecordKind mismatch while trying to retrieve graph_entry from record {:?}",
PrettyPrintRecordKey::from(&record.key)
);
Err(NetworkError::RecordKindMismatch(RecordKind::LinkedList))
Err(NetworkError::RecordKindMismatch(RecordKind::GraphEntry))
}
}
16 changes: 8 additions & 8 deletions ant-networking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ mod error;
mod event;
mod external_address;
mod fifo_register;
mod linked_list;
mod graph;
mod log_markers;
#[cfg(feature = "open-metrics")]
mod metrics;
Expand All @@ -40,7 +40,7 @@ pub use self::{
},
error::{GetRecordError, NetworkError},
event::{MsgResponder, NetworkEvent},
linked_list::get_linked_list_from_record,
graph::get_graph_entry_from_record,
record_store::NodeRecordStore,
};
#[cfg(feature = "open-metrics")]
Expand Down Expand Up @@ -75,7 +75,7 @@ use tokio::sync::{
};
use tokio::time::Duration;
use {
ant_protocol::storage::LinkedList,
ant_protocol::storage::GraphEntry,
ant_protocol::storage::{
try_deserialize_record, try_serialize_record, RecordHeader, RecordKind,
},
Expand Down Expand Up @@ -634,17 +634,17 @@ impl Network {
match kind {
RecordKind::Chunk
| RecordKind::ChunkWithPayment
| RecordKind::LinkedListWithPayment
| RecordKind::GraphEntryWithPayment
| RecordKind::RegisterWithPayment
| RecordKind::PointerWithPayment
| RecordKind::ScratchpadWithPayment => {
error!("Encountered a split record for {pretty_key:?} with unexpected RecordKind {kind:?}, skipping.");
continue;
}
RecordKind::LinkedList => {
RecordKind::GraphEntry => {
info!("For record {pretty_key:?}, we have a split record for a transaction attempt. Accumulating transactions");

match get_linked_list_from_record(record) {
match get_graph_entry_from_record(record) {
Ok(transactions) => {
accumulated_transactions.extend(transactions);
}
Expand Down Expand Up @@ -730,10 +730,10 @@ impl Network {
info!("For record {pretty_key:?} task found split record for a transaction, accumulated and sending them as a single record");
let accumulated_transactions = accumulated_transactions
.into_iter()
.collect::<Vec<LinkedList>>();
.collect::<Vec<GraphEntry>>();
let record = Record {
key: key.clone(),
value: try_serialize_record(&accumulated_transactions, RecordKind::LinkedList)
value: try_serialize_record(&accumulated_transactions, RecordKind::GraphEntry)
.map_err(|err| {
error!(
"Error while serializing the accumulated transactions for {pretty_key:?}: {err:?}"
Expand Down
97 changes: 47 additions & 50 deletions ant-networking/src/record_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,24 +434,17 @@ impl NodeRecordStore {
key: &Key,
encryption_details: &(Aes256GcmSiv, [u8; 4]),
) -> Option<Cow<'a, Record>> {
let mut record = Record {
key: key.clone(),
value: bytes,
publisher: None,
expires: None,
};

// if we're not encrypting, lets just return the record
if !cfg!(feature = "encrypt-records") {
return Some(Cow::Owned(record));
}

let (cipher, nonce_starter) = encryption_details;
let nonce = generate_nonce_for_record(nonce_starter, key);

match cipher.decrypt(&nonce, record.value.as_ref()) {
match cipher.decrypt(&nonce, bytes.as_slice()) {
Ok(value) => {
record.value = value;
let record = Record {
key: key.clone(),
value,
publisher: None,
expires: None,
};
Some(Cow::Owned(record))
}
Err(error) => {
Expand Down Expand Up @@ -630,15 +623,11 @@ impl NodeRecordStore {
}

/// Prepare record bytes for storage
/// If feats are enabled, this will eg, encrypt the record for storage
/// This will encrypt the record for storage
fn prepare_record_bytes(
record: Record,
encryption_details: (Aes256GcmSiv, [u8; 4]),
) -> Option<Vec<u8>> {
if !cfg!(feature = "encrypt-records") {
return Some(record.value);
}

let (cipher, nonce_starter) = encryption_details;
let nonce = generate_nonce_for_record(&nonce_starter, &record.key);

Expand Down Expand Up @@ -1144,8 +1133,10 @@ mod tests {
..Default::default()
};
let self_id = PeerId::random();
let (network_event_sender, _) = mpsc::channel(1);
let (swarm_cmd_sender, _) = mpsc::channel(1);

// Create channels with proper receivers
let (network_event_sender, _network_event_receiver) = mpsc::channel(1);
let (swarm_cmd_sender, mut swarm_cmd_receiver) = mpsc::channel(1);

let mut store = NodeRecordStore::with_config(
self_id,
Expand All @@ -1172,31 +1163,46 @@ mod tests {
.put_verified(record.clone(), RecordType::Chunk)
.is_ok());

// Mark as stored (simulating the CompletedWrite event)
store.mark_as_stored(record.key.clone(), RecordType::Chunk);
// Wait for the async write operation to complete
if let Some(cmd) = swarm_cmd_receiver.recv().await {
match cmd {
LocalSwarmCmd::AddLocalRecordAsStored { key, record_type } => {
store.mark_as_stored(key, record_type);
}
_ => panic!("Unexpected command received"),
}
}

// Verify the chunk is stored
let stored_record = store.get(&record.key);
assert!(stored_record.is_some(), "Chunk should be stored");
assert!(stored_record.is_some(), "Chunk should be stored initially");

// Sleep a while to let OS completes the flush to disk
sleep(Duration::from_secs(5)).await;
sleep(Duration::from_secs(1)).await;

// Restart the store with same encrypt_seed
// Create new channels for the restarted store
let (new_network_event_sender, _new_network_event_receiver) = mpsc::channel(1);
let (new_swarm_cmd_sender, _new_swarm_cmd_receiver) = mpsc::channel(1);

// Restart the store with same encrypt_seed but new channels
drop(store);
let store = NodeRecordStore::with_config(
self_id,
store_config,
network_event_sender.clone(),
swarm_cmd_sender.clone(),
new_network_event_sender,
new_swarm_cmd_sender,
);

// Sleep a lit bit to let OS completes restoring
sleep(Duration::from_secs(1)).await;

// Verify the record still exists
let stored_record = store.get(&record.key);
assert!(stored_record.is_some(), "Chunk should be stored");
assert!(
stored_record.is_some(),
"Chunk should be stored after restart with same key"
);

// Create new channels for the different seed test
let (diff_network_event_sender, _diff_network_event_receiver) = mpsc::channel(1);
let (diff_swarm_cmd_sender, _diff_swarm_cmd_receiver) = mpsc::channel(1);

// Restart the store with different encrypt_seed
let self_id_diff = PeerId::random();
Expand All @@ -1208,25 +1214,16 @@ mod tests {
let store_diff = NodeRecordStore::with_config(
self_id_diff,
store_config_diff,
network_event_sender,
swarm_cmd_sender,
diff_network_event_sender,
diff_swarm_cmd_sender,
);

// Sleep a lit bit to let OS completes restoring (if has)
sleep(Duration::from_secs(1)).await;

// Verify the record existence, shall get removed when encryption enabled
if cfg!(feature = "encrypt-records") {
assert!(
store_diff.get(&record.key).is_none(),
"Chunk should be gone"
);
} else {
assert!(
store_diff.get(&record.key).is_some(),
"Chunk shall persists without encryption"
);
}
// When encryption is enabled, the record should be gone because it can't be decrypted
// with the different encryption seed
assert!(
store_diff.get(&record.key).is_none(),
"Chunk should be gone with different encryption key"
);

Ok(())
}
Expand Down Expand Up @@ -1557,7 +1554,7 @@ mod tests {
// via NetworkEvent::CompletedWrite)
store.mark_as_stored(record_key.clone(), RecordType::Chunk);

stored_records.push(record_key);
stored_records.push(record_key.clone());
stored_records.sort_by(|a, b| {
let a = NetworkAddress::from_record_key(a);
let b = NetworkAddress::from_record_key(b);
Expand Down
14 changes: 10 additions & 4 deletions ant-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@ name = "antnode"
path = "src/bin/antnode/main.rs"

[features]
default = ["metrics", "upnp", "open-metrics", "encrypt-records"]
encrypt-records = ["ant-networking/encrypt-records"]
default = ["metrics", "upnp", "open-metrics"]
extension-module = ["pyo3/extension-module"]
local = ["ant-networking/local", "ant-evm/local", "ant-bootstrap/local", "ant-logging/process-metrics"]
local = [
"ant-networking/local",
"ant-evm/local",
"ant-bootstrap/local",
"ant-logging/process-metrics",
]
loud = ["ant-networking/loud"] # loud mode: print important messages to console
metrics = []
nightly = []
Expand Down Expand Up @@ -83,7 +87,9 @@ walkdir = "~2.5.0"
xor_name = "5.0.0"

[dev-dependencies]
ant-protocol = { path = "../ant-protocol", version = "0.3.1", features = ["rpc"] }
ant-protocol = { path = "../ant-protocol", version = "0.3.1", features = [
"rpc",
] }
assert_fs = "1.0.0"
evmlib = { path = "../evmlib", version = "0.1.6" }
autonomi = { path = "../autonomi", version = "0.3.1", features = ["registers"] }
Expand Down
Loading

0 comments on commit 3374a15

Please sign in to comment.