Skip to content

Commit

Permalink
Add optional storage state command encryption (#1059)
Browse files Browse the repository at this point in the history
  • Loading branch information
spetz authored Jul 14, 2024
1 parent fd0e80a commit c45cbbd
Show file tree
Hide file tree
Showing 11 changed files with 194 additions and 104 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion configs/server.toml
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ size = "4 GB"

# Encryption configuration
[system.encryption]
# Determines whether server-side data encryption is enabled (boolean).
# Determines whether server-side data encryption for the messages payloads and state commands is enabled (boolean).
# `true` enables encryption for stored data using AES-256-GCM.
# `false` means data is stored without encryption.
enabled = false
Expand Down
90 changes: 51 additions & 39 deletions integration/tests/state/file.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use crate::state::StateSetup;
use bytes::Bytes;
use iggy::bytes_serializable::BytesSerializable;
use iggy::streams::create_stream::CreateStream;
use iggy::users::create_user::CreateUser;
use server::state::command::EntryCommand;
use server::state::entry::StateEntry;
use server::state::State;

#[tokio::test]
Expand All @@ -26,27 +29,37 @@ async fn should_apply_single_entry() {
status: Default::default(),
permissions: None,
});
let command_clone = EntryCommand::CreateUser(CreateUser {
let command_bytes = command.to_bytes();

state.apply(user_id, command).await.unwrap();

let mut entries = state.load_entries().await.unwrap();
assert_eq!(entries.len(), 1);
let entry = entries.remove(0);
assert_entry(entry, 0, setup.version(), user_id, command_bytes);
}

#[tokio::test]
async fn should_apply_encrypted_entry() {
let setup = StateSetup::init_with_encryptor().await;
let state = setup.state();
state.init().await.unwrap();

let user_id = 1;
let command = EntryCommand::CreateUser(CreateUser {
username: "test".to_string(),
password: "secret".to_string(),
status: Default::default(),
permissions: None,
});
let command_bytes = command.to_bytes();

state.apply(user_id, command).await.unwrap();

let mut entries = state.load_entries().await.unwrap();
assert_eq!(entries.len(), 1);
let entry = entries.remove(0);
assert_eq!(entry.index, 0);
assert_eq!(entry.term, 0);
assert_eq!(entry.version, setup.version());
assert_eq!(entry.flags, 0);
assert!(entry.checksum > 0);
assert!(entry.timestamp.as_micros() > 0);
assert_eq!(entry.user_id, user_id);
assert_eq!(entry.command, command_clone);
assert!(entry.context.is_empty());
assert_entry(entry, 0, setup.version(), user_id, command_bytes);
}

#[tokio::test]
Expand All @@ -67,12 +80,7 @@ async fn should_apply_multiple_entries() {
status: Default::default(),
permissions: None,
});
let create_user_clone = EntryCommand::CreateUser(CreateUser {
username: "test".to_string(),
password: "secret".to_string(),
status: Default::default(),
permissions: None,
});
let create_user_bytes = create_user.to_bytes();

state.apply(first_user_id, create_user).await.unwrap();

Expand All @@ -84,10 +92,7 @@ async fn should_apply_multiple_entries() {
stream_id: Some(1),
name: "test".to_string(),
});
let create_stream_clone = EntryCommand::CreateStream(CreateStream {
stream_id: Some(1),
name: "test".to_string(),
});
let create_stream_bytes = create_stream.to_bytes();

state.apply(second_user_id, create_stream).await.unwrap();

Expand All @@ -98,25 +103,32 @@ async fn should_apply_multiple_entries() {
assert_eq!(entries.len(), 2);

let create_user_entry = entries.remove(0);
assert_eq!(create_user_entry.index, 0);
assert_eq!(create_user_entry.term, 0);
assert_eq!(create_user_entry.version, setup.version());
assert_eq!(create_user_entry.flags, 0);
assert!(create_user_entry.checksum > 0);
assert!(create_user_entry.timestamp.as_micros() > 0);
assert_eq!(create_user_entry.user_id, 1);
assert!(create_user_entry.context.is_empty());
assert_eq!(create_user_entry.command, create_user_clone);
assert_entry(
create_user_entry,
0,
setup.version(),
first_user_id,
create_user_bytes,
);

let create_stream_entry = entries.remove(0);
assert_eq!(create_stream_entry.index, 1);
assert_eq!(create_stream_entry.term, 0);
assert_eq!(create_stream_entry.version, setup.version());
assert_eq!(create_stream_entry.flags, 0);
assert!(create_stream_entry.checksum > 0);
assert!(create_stream_entry.timestamp.as_micros() > 0);
assert!(create_stream_entry.timestamp.as_micros() > create_user_entry.timestamp.as_micros());
assert_eq!(create_stream_entry.user_id, 2);
assert!(create_stream_entry.context.is_empty());
assert_eq!(create_stream_clone, create_stream_entry.command);
assert_entry(
create_stream_entry,
1,
setup.version(),
second_user_id,
create_stream_bytes,
);
}

fn assert_entry(entry: StateEntry, index: u64, version: u32, user_id: u32, command: Bytes) {
assert_eq!(entry.index, index);
assert_eq!(entry.term, 0);
assert_eq!(entry.version, version);
assert_eq!(entry.flags, 0);
assert!(entry.checksum > 0);
assert!(entry.timestamp.as_micros() > 0);
assert_eq!(entry.user_id, user_id);
assert_eq!(entry.command, command);
assert!(entry.context.is_empty());
}
15 changes: 14 additions & 1 deletion integration/tests/state/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use iggy::utils::crypto::{Aes256GcmEncryptor, Encryptor};
use server::state::file::FileState;
use server::streaming::persistence::persister::FilePersister;
use server::versioning::SemanticVersion;
Expand All @@ -17,13 +18,25 @@ pub struct StateSetup {

impl StateSetup {
pub async fn init() -> StateSetup {
StateSetup::create(None).await
}

pub async fn init_with_encryptor() -> StateSetup {
StateSetup::create(Some(&[1; 32])).await
}

pub async fn create(encryption_key: Option<&[u8]>) -> StateSetup {
let directory_path = format!("state_{}", Uuid::new_v4().to_u128_le());
let log_path = format!("{}/log", directory_path);
create_dir(&directory_path).await.unwrap();

let version = SemanticVersion::from_str("1.2.3").unwrap();
let persister = FilePersister {};
let state = FileState::new(&log_path, &version, Arc::new(persister));
let encryptor: Option<Arc<dyn Encryptor>> = match encryption_key {
Some(key) => Some(Arc::new(Aes256GcmEncryptor::new(key).unwrap())),
None => None,
};
let state = FileState::new(&log_path, &version, Arc::new(persister), encryptor);

Self {
directory_path,
Expand Down
2 changes: 1 addition & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "server"
version = "0.3.3"
version = "0.3.4"
edition = "2021"
build = "src/build.rs"

Expand Down
28 changes: 14 additions & 14 deletions server/src/state/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub struct StateEntry {
pub user_id: u32,
pub checksum: u32,
pub context: Bytes,
pub command: EntryCommand,
pub command: Bytes,
}

impl StateEntry {
Expand All @@ -42,8 +42,9 @@ impl StateEntry {
flags: u64,
timestamp: IggyTimestamp,
user_id: u32,
checksum: u32,
context: Bytes,
command: EntryCommand,
command: Bytes,
) -> Self {
Self {
index,
Expand All @@ -53,16 +54,18 @@ impl StateEntry {
flags,
timestamp,
user_id,
checksum: Self::calculate_checksum(
index, term, leader_id, version, flags, timestamp, user_id, &context, &command,
),
checksum,
context,
command,
}
}

pub fn command(&self) -> Result<EntryCommand, IggyError> {
EntryCommand::from_bytes(self.command.clone())
}

#[allow(clippy::too_many_arguments)]
fn calculate_checksum(
pub fn calculate_checksum(
index: u64,
term: u64,
leader_id: u32,
Expand All @@ -71,9 +74,8 @@ impl StateEntry {
timestamp: IggyTimestamp,
user_id: u32,
context: &Bytes,
command: &EntryCommand,
command: &Bytes,
) -> u32 {
let command = command.to_bytes();
let mut bytes =
BytesMut::with_capacity(8 + 8 + 4 + 4 + 8 + 8 + 4 + 4 + context.len() + command.len());
bytes.put_u64_le(index);
Expand All @@ -94,7 +96,7 @@ impl Display for StateEntry {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"StateEntry {{ index: {}, term: {}, leader ID: {}, version: {}, flags: {}, timestamp: {}, user ID: {}, checksum: {}, command: {:?} }}",
"StateEntry {{ index: {}, term: {}, leader ID: {}, version: {}, flags: {}, timestamp: {}, user ID: {}, checksum: {} }}",
self.index,
self.term,
self.leader_id,
Expand All @@ -103,16 +105,14 @@ impl Display for StateEntry {
self.timestamp,
self.user_id,
self.checksum,
self.command
)
}
}

impl BytesSerializable for StateEntry {
fn to_bytes(&self) -> Bytes {
let command = self.command.to_bytes();
let mut bytes = BytesMut::with_capacity(
8 + 8 + 4 + 4 + 8 + 8 + 4 + 4 + 4 + self.context.len() + command.len(),
8 + 8 + 4 + 4 + 8 + 8 + 4 + 4 + 4 + self.context.len() + self.command.len(),
);
bytes.put_u64_le(self.index);
bytes.put_u64_le(self.term);
Expand All @@ -124,7 +124,7 @@ impl BytesSerializable for StateEntry {
bytes.put_u32_le(self.checksum);
bytes.put_u32_le(self.context.len() as u32);
bytes.put_slice(&self.context);
bytes.extend(command);
bytes.extend(&self.command);
bytes.freeze()
}

Expand All @@ -142,7 +142,7 @@ impl BytesSerializable for StateEntry {
let checksum = bytes.slice(44..48).get_u32_le();
let context_length = bytes.slice(48..52).get_u32_le() as usize;
let context = bytes.slice(52..52 + context_length);
let command = EntryCommand::from_bytes(bytes.slice(52 + context_length..))?;
let command = bytes.slice(52 + context_length..);

Ok(StateEntry {
index,
Expand Down
Loading

0 comments on commit c45cbbd

Please sign in to comment.