Skip to content

Commit

Permalink
Finalize disk archiver #290
Browse files Browse the repository at this point in the history
  • Loading branch information
spetz committed Jul 11, 2024
1 parent a51fc33 commit a83f433
Show file tree
Hide file tree
Showing 12 changed files with 125 additions and 53 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions configs/server.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[data_maintenance.archiver]
# Enables or disables the archiver process.
enabled = true
enabled = false

# Kind of archiver to use. Available options: "disk", "s3".
kind = "disk"
Expand All @@ -20,18 +20,18 @@ region = "eu-west-1"
bucket = "iggy"

[data_maintenance.messages]
# Enables or disables the archiver process.
archiver_enabled = true
# Enables or disables the archiver process for segments containing messages.
archiver_enabled = false

# Enables or disables the expired message cleaner process.
cleaner_enabled = true
cleaner_enabled = false

# Interval for running the message archiver and cleaner.
interval = "1 m"

[data_maintenance.state]
# Enables or disables the archiver process.
archiver_enabled = true
# Enables or disables the archiver process for state log.
archiver_enabled = false

# Sets whether the state archiver should overwrite existing log archive or always create a new one.
overwrite = true
Expand Down
2 changes: 1 addition & 1 deletion integration/tests/streaming/system.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::streaming::common::test_setup::TestSetup;
use iggy::identifier::Identifier;
use server::configs::server::{ArchiverConfig, DataMaintenanceConfig, PersonalAccessTokenConfig};
use server::configs::server::{DataMaintenanceConfig, PersonalAccessTokenConfig};
use server::streaming::session::Session;
use server::streaming::systems::system::System;
use std::net::{Ipv4Addr, SocketAddr};
Expand Down
2 changes: 1 addition & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "server"
version = "0.3.1"
version = "0.3.2"
edition = "2021"
build = "src/build.rs"

Expand Down
6 changes: 4 additions & 2 deletions server/server.http
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
@root_password = iggy
@user1_username = user1
@user1_password = secret
@access_token = secret
@access_token = eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJqdGkiOiI2YmIwYmU3YS01Mjc4LTRhNzItYmEyNC1iNjlmYjg0NzBhOWIiLCJpc3MiOiJpZ2d5LnJzIiwiYXVkIjoiaWdneS5ycyIsInN1YiI6MSwiaWF0IjoxNzIwNzI0NjQ1LCJleHAiOjE3MjA3MjgyNDUsIm5iZiI6MTcyMDcyNDY0NX0.37dSKODDzyJn7x-qGbA7PFKTjohNnglUUTex9aA83UM
@root_id = 1
@user1_id = 2
@pat_name = dev_token
Expand Down Expand Up @@ -227,6 +227,7 @@ Content-Type: application/json
"name": "topic1",
"compression_algorithm": "none",
"partitions_count": 3,
"max_topic_size": 0,
"message_expiry": 0
}

Expand All @@ -238,7 +239,8 @@ Content-Type: application/json
{
"name": "topic1",
"compression_algorithm": "none",
"message_expiry": 1000
"max_topic_size": 0,
"message_expiry": 0
}

###
Expand Down
23 changes: 15 additions & 8 deletions server/src/archiver/disk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::server_error::ServerError;
use async_trait::async_trait;
use std::path::Path;
use tokio::fs;
use tracing::info;
use tracing::{debug, info};

#[derive(Debug)]
pub struct DiskArchiver {
Expand All @@ -24,25 +24,32 @@ impl Archiver for DiskArchiver {
info!("Creating disk archiver directory: {}", self.config.path);
fs::create_dir_all(&self.config.path).await?;
}

Ok(())
}

async fn is_archived(&self, file: &str) -> Result<bool, ServerError> {
debug!("Checking if file: {file} is archived on disk.");
let path = Path::new(&self.config.path).join(file);
Ok(path.exists())
let is_archived = path.exists();
debug!("File: {file} is archived: {is_archived}");
Ok(is_archived)
}

async fn archive(&self, files: &[&str]) -> Result<(), ServerError> {
info!("Archiving files on disk: {:?}", files);
async fn archive(
&self,
files: &[&str],
base_directory: Option<String>,
) -> Result<(), ServerError> {
debug!("Archiving files on disk: {:?}", files);
for file in files {
info!("Archiving file: {file}");
debug!("Archiving file: {file}");
let source = Path::new(file);
let destination = Path::new(&self.config.path).join(source);
let base_directory = base_directory.as_deref().unwrap_or_default();
let destination = Path::new(&self.config.path).join(base_directory).join(file);
let destination_path = destination.to_str().unwrap_or_default().to_owned();
fs::create_dir_all(destination.parent().unwrap()).await?;
fs::copy(source, destination).await?;
info!("Archived file: {file} at: {destination_path}");
debug!("Archived file: {file} at: {destination_path}");
}

Ok(())
Expand Down
6 changes: 5 additions & 1 deletion server/src/archiver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ impl FromStr for ArchiverKind {
pub trait Archiver: Sync + Send {
async fn init(&self) -> Result<(), ServerError>;
async fn is_archived(&self, file: &str) -> Result<bool, ServerError>;
async fn archive(&self, files: &[&str]) -> Result<(), ServerError>;
async fn archive(
&self,
files: &[&str],
base_directory: Option<String>,
) -> Result<(), ServerError>;
}

impl Debug for dyn Archiver {
Expand Down
14 changes: 9 additions & 5 deletions server/src/archiver/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ use tracing::info;

#[derive(Debug)]
pub struct S3Archiver {
config: S3ArchiverConfig,
_config: S3ArchiverConfig,
}

impl S3Archiver {
pub fn new(config: S3ArchiverConfig) -> Self {
S3Archiver { config }
S3Archiver { _config: config }
}
}

Expand All @@ -21,14 +21,18 @@ impl Archiver for S3Archiver {
Ok(())
}

async fn is_archived(&self, file: &str) -> Result<bool, ServerError> {
async fn is_archived(&self, _file: &str) -> Result<bool, ServerError> {
// TODO: Implement checking if file is archived on S3
Ok(false)
}

async fn archive(&self, files: &[&str]) -> Result<(), ServerError> {
async fn archive(
&self,
_files: &[&str],
_base_directory: Option<String>,
) -> Result<(), ServerError> {
// TODO: Implement archiving file on S3
info!("Archiving files on S3");
info!("Archiving files on S3...");
Ok(())
}
}
53 changes: 46 additions & 7 deletions server/src/channels/commands/archive_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,21 @@ use crate::streaming::systems::system::SharedSystem;
use async_trait::async_trait;
use flume::Sender;
use iggy::utils::duration::IggyDuration;
use iggy::utils::timestamp::IggyTimestamp;
use tokio::time;
use tracing::{error, info};
use tracing::{error, info, warn};

pub struct StateArchiver {
enabled: bool,
overwrite: bool,
interval: IggyDuration,
sender: Sender<ArchiveStateCommand>,
}

#[derive(Debug, Default, Clone)]
pub struct ArchiveStateCommand;
pub struct ArchiveStateCommand {
overwrite: bool,
}

#[derive(Debug, Default, Clone)]
pub struct ArchiveStateExecutor;
Expand All @@ -23,6 +27,7 @@ impl StateArchiver {
pub fn new(config: &StateMaintenanceConfig, sender: Sender<ArchiveStateCommand>) -> Self {
Self {
enabled: config.archiver_enabled,
overwrite: config.overwrite,
interval: config.interval,
sender,
}
Expand All @@ -34,26 +39,48 @@ impl StateArchiver {
return;
}

let overwrite = self.overwrite;
let interval = self.interval;
let sender = self.sender.clone();
info!("State archiver is enabled, state will be archived every: {interval}.");
tokio::spawn(async move {
let mut interval_timer = time::interval(interval.get_duration());
loop {
interval_timer.tick().await;
sender.send(ArchiveStateCommand).unwrap_or_else(|err| {
error!("Failed to send ArchiveStateCommand. Error: {}", err);
});
sender
.send(ArchiveStateCommand { overwrite })
.unwrap_or_else(|err| {
error!("Failed to send ArchiveStateCommand. Error: {}", err);
});
}
});
}
}

#[async_trait]
impl ServerCommand<ArchiveStateCommand> for ArchiveStateExecutor {
async fn execute(&mut self, system: &SharedSystem, _command: ArchiveStateCommand) {
async fn execute(&mut self, system: &SharedSystem, command: ArchiveStateCommand) {
let system = system.read();
if system.archiver.is_none() {
warn!("Archiver is disabled, state will not be archived.");
return;
}

let base_directory = if command.overwrite {
None
} else {
Some(format!("{}_state", IggyTimestamp::now().as_micros()))
};
let state_log_path = system.config.get_state_log_path();
let state_info_path = system.config.get_state_info_path();
info!("Archiving state...");
let archiver = system.archiver.as_ref().unwrap();
let files = [state_log_path.as_ref(), state_info_path.as_ref()];
if let Err(error) = archiver.archive(&files, base_directory).await {
error!("Failed to archive state. Error: {}", error);
return;
}
info!("State archived successfully.");
}

fn start_command_sender(
Expand All @@ -62,16 +89,28 @@ impl ServerCommand<ArchiveStateCommand> for ArchiveStateExecutor {
config: &crate::configs::server::ServerConfig,
sender: Sender<ArchiveStateCommand>,
) {
if !config.data_maintenance.archiver.enabled
|| !config.data_maintenance.state.archiver_enabled
{
return;
}

let state_archiver = StateArchiver::new(&config.data_maintenance.state, sender);
state_archiver.start();
}

fn start_command_consumer(
mut self,
system: SharedSystem,
_config: &crate::configs::server::ServerConfig,
config: &crate::configs::server::ServerConfig,
receiver: flume::Receiver<ArchiveStateCommand>,
) {
if !config.data_maintenance.archiver.enabled
|| !config.data_maintenance.state.archiver_enabled
{
return;
}

tokio::spawn(async move {
let system = system.clone();
while let Ok(command) = receiver.recv_async().await {
Expand Down
Loading

0 comments on commit a83f433

Please sign in to comment.