Skip to content

Commit a5b6e20

Browse files
committed
feature: implement file upload retry and configure it in builder
1 parent 1f8458f commit a5b6e20

File tree

6 files changed

+138
-18
lines changed

6 files changed

+138
-18
lines changed

mithril-aggregator/src/dependency_injection/builder.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,8 @@ use crate::{
6767
entities::AggregatorEpochSettings,
6868
event_store::{EventMessage, EventStore, TransmitterService},
6969
file_uploaders::{
70-
CloudRemotePath, FileUploader, GcpBackendUploader, GcpUploader, LocalUploader,
70+
CloudRemotePath, FileUploadRetryPolicy, FileUploader, GcpBackendUploader, GcpUploader,
71+
LocalUploader,
7172
},
7273
http_server::{
7374
routes::router::{self, RouterConfig, RouterState},
@@ -1235,14 +1236,15 @@ impl DependenciesBuilder {
12351236
DependenciesBuilderError::MissingConfiguration("snapshot_bucket_name".to_string())
12361237
})?;
12371238

1238-
Ok(GcpUploader::new(
1239+
Ok(GcpUploader::with_retry_policy(
12391240
Arc::new(GcpBackendUploader::try_new(
12401241
bucket,
12411242
self.configuration.snapshot_use_cdn_domain,
12421243
logger.clone(),
12431244
)?),
12441245
remote_folder_path,
12451246
allow_overwrite,
1247+
FileUploadRetryPolicy::default(),
12461248
))
12471249
}
12481250

@@ -1279,9 +1281,10 @@ impl DependenciesBuilder {
12791281
}
12801282
})?;
12811283

1282-
Ok(vec![Arc::new(LocalUploader::new(
1284+
Ok(vec![Arc::new(LocalUploader::with_retry_policy(
12831285
ancillary_url_prefix,
12841286
&target_dir,
1287+
FileUploadRetryPolicy::default(),
12851288
logger,
12861289
)?)])
12871290
}
@@ -1317,9 +1320,10 @@ impl DependenciesBuilder {
13171320
.join(CARDANO_DB_ARTIFACTS_DIR)
13181321
.join("immutable");
13191322

1320-
Ok(vec![Arc::new(LocalUploader::new(
1323+
Ok(vec![Arc::new(LocalUploader::with_retry_policy(
13211324
immutable_url_prefix,
13221325
&target_dir,
1326+
FileUploadRetryPolicy::default(),
13231327
logger,
13241328
)?)])
13251329
}
@@ -1360,9 +1364,10 @@ impl DependenciesBuilder {
13601364
}
13611365
})?;
13621366

1363-
Ok(vec![Arc::new(LocalUploader::new(
1367+
Ok(vec![Arc::new(LocalUploader::with_retry_policy(
13641368
digests_url_prefix,
13651369
&target_dir,
1370+
FileUploadRetryPolicy::default(),
13661371
logger,
13671372
)?)])
13681373
}

mithril-aggregator/src/file_uploaders/dumb_uploader.rs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,35 @@
11
use anyhow::anyhow;
22
use async_trait::async_trait;
33
use mithril_common::{entities::FileUri, StdResult};
4-
use std::{path::Path, sync::RwLock};
4+
use std::{path::Path, sync::RwLock, time::Duration};
55

66
use crate::file_uploaders::FileUploader;
77

8+
use super::interface::FileUploadRetryPolicy;
9+
810
/// Dummy uploader for test purposes.
911
///
1012
/// It actually does NOT upload any file but remembers the last file it
1113
/// was asked to upload. This is intended to by used by integration tests.
1214
pub struct DumbUploader {
1315
last_uploaded: RwLock<Option<FileUri>>,
16+
retry_policy: FileUploadRetryPolicy,
1417
}
1518

1619
impl DumbUploader {
1720
/// Create a new instance.
1821
pub fn new() -> Self {
1922
Self {
2023
last_uploaded: RwLock::new(None),
24+
retry_policy: FileUploadRetryPolicy::never(),
25+
}
26+
}
27+
28+
/// Create a new instance with a custom retry policy.
29+
pub fn with_retry_policy(retry_policy: FileUploadRetryPolicy) -> Self {
30+
Self {
31+
last_uploaded: RwLock::new(None),
32+
retry_policy,
2133
}
2234
}
2335

@@ -52,6 +64,10 @@ impl FileUploader for DumbUploader {
5264

5365
Ok(location)
5466
}
67+
68+
fn retry_policy(&self) -> FileUploadRetryPolicy {
69+
self.retry_policy.clone()
70+
}
5571
}
5672

5773
#[cfg(test)]
@@ -77,4 +93,17 @@ mod tests {
7793
.expect("getting dumb uploader last value after a fake download should not fail")
7894
);
7995
}
96+
97+
#[tokio::test]
98+
async fn retry_policy_from_file_uploader_trait_should_be_implemented() {
99+
let expected_policy = FileUploadRetryPolicy {
100+
attempts: 10,
101+
delay_between_attempts: Duration::from_millis(123),
102+
};
103+
104+
let uploader: Box<dyn FileUploader> =
105+
Box::new(DumbUploader::with_retry_policy(expected_policy.clone()));
106+
107+
assert_eq!(expected_policy, uploader.retry_policy());
108+
}
80109
}

mithril-aggregator/src/file_uploaders/gcp_uploader.rs

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ use mithril_common::{entities::FileUri, logging::LoggerExtensions, StdResult};
1717

1818
use crate::FileUploader;
1919

20+
use super::FileUploadRetryPolicy;
21+
2022
/// CloudRemotePath represents a cloud remote path
2123
#[derive(Debug, Clone, PartialEq)]
2224
pub struct CloudRemotePath(PathBuf);
@@ -198,6 +200,7 @@ pub struct GcpUploader {
198200
cloud_backend_uploader: Arc<dyn CloudBackendUploader>,
199201
remote_folder: CloudRemotePath,
200202
allow_overwrite: bool,
203+
retry_policy: FileUploadRetryPolicy,
201204
}
202205

203206
impl GcpUploader {
@@ -211,6 +214,22 @@ impl GcpUploader {
211214
cloud_backend_uploader,
212215
remote_folder,
213216
allow_overwrite,
217+
retry_policy: FileUploadRetryPolicy::never(),
218+
}
219+
}
220+
221+
/// Create a new instance with a custom retry policy.
222+
pub fn with_retry_policy(
223+
cloud_backend_uploader: Arc<dyn CloudBackendUploader>,
224+
remote_folder: CloudRemotePath,
225+
allow_overwrite: bool,
226+
retry_policy: FileUploadRetryPolicy,
227+
) -> Self {
228+
Self {
229+
cloud_backend_uploader,
230+
remote_folder,
231+
allow_overwrite,
232+
retry_policy,
214233
}
215234
}
216235
}
@@ -242,11 +261,17 @@ impl FileUploader for GcpUploader {
242261

243262
Ok(file_uri)
244263
}
264+
265+
fn retry_policy(&self) -> FileUploadRetryPolicy {
266+
self.retry_policy.clone()
267+
}
245268
}
246269

247270
#[cfg(test)]
248271
mod tests {
249-
use crate::test_tools::TestLogger;
272+
use std::time::Duration;
273+
274+
use crate::{file_uploaders::FileUploadRetryPolicy, test_tools::TestLogger};
250275

251276
use super::*;
252277

@@ -480,4 +505,21 @@ mod tests {
480505
assert_eq!(FileUri(expected_location), location);
481506
}
482507
}
508+
509+
#[tokio::test]
510+
async fn retry_policy_from_file_uploader_trait_should_be_implemented() {
511+
let expected_policy = FileUploadRetryPolicy {
512+
attempts: 10,
513+
delay_between_attempts: Duration::from_millis(123),
514+
};
515+
516+
let file_uploader: Box<dyn FileUploader> = Box::new(GcpUploader::with_retry_policy(
517+
Arc::new(MockCloudBackendUploader::new()),
518+
CloudRemotePath::new("remote_folder"),
519+
true,
520+
expected_policy.clone(),
521+
));
522+
523+
assert_eq!(expected_policy, file_uploader.retry_policy());
524+
}
483525
}

mithril-aggregator/src/file_uploaders/interface.rs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,19 @@
11
use async_trait::async_trait;
22
use mithril_common::{entities::FileUri, StdResult};
3-
use std::{
4-
any::{Any, TypeId},
5-
path::Path,
6-
time::Duration,
7-
};
3+
use std::{any::Any, path::Path, time::Duration};
84

95
/// Policy for retrying file uploads.
6+
#[derive(Debug, PartialEq, Clone)]
107
pub struct FileUploadRetryPolicy {
11-
attempts: usize,
12-
delay_between_attempts: Duration,
8+
/// Number of attempts to upload a file.
9+
pub attempts: usize,
10+
/// Delay between two attempts.
11+
pub delay_between_attempts: Duration,
1312
}
1413

1514
impl FileUploadRetryPolicy {
16-
fn never() -> Self {
15+
/// Create a policy that never retries.
16+
pub fn never() -> Self {
1717
Self {
1818
attempts: 1,
1919
delay_between_attempts: Duration::from_secs(0),
@@ -22,6 +22,7 @@ impl FileUploadRetryPolicy {
2222
}
2323

2424
impl Default for FileUploadRetryPolicy {
25+
/// Create a default retry policy.
2526
fn default() -> Self {
2627
Self {
2728
attempts: 3,
@@ -38,11 +39,12 @@ pub trait FileUploader: Any + Sync + Send {
3839
/// Try to upload once.
3940
async fn upload_without_retry(&self, filepath: &Path) -> StdResult<FileUri>;
4041

42+
/// Get the retry policy for this uploader.
4143
fn retry_policy(&self) -> FileUploadRetryPolicy {
4244
FileUploadRetryPolicy::never()
4345
}
4446

45-
// Upload a file
47+
/// Upload a file with retries according to the retry policy.
4648
async fn upload(&self, filepath: &Path) -> StdResult<FileUri> {
4749
let retry_policy = self.retry_policy();
4850

mithril-aggregator/src/file_uploaders/local_uploader.rs

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::path::{Path, PathBuf};
66
use mithril_common::StdResult;
77
use mithril_common::{entities::FileUri, logging::LoggerExtensions};
88

9-
use crate::file_uploaders::FileUploader;
9+
use crate::file_uploaders::{FileUploadRetryPolicy, FileUploader};
1010
use crate::tools::url_sanitizer::SanitizedUrlWithTrailingSlash;
1111

1212
/// LocalUploader is a file uploader working using local files
@@ -17,6 +17,7 @@ pub struct LocalUploader {
1717
/// Target folder where to store files archive
1818
target_location: PathBuf,
1919

20+
retry_policy: FileUploadRetryPolicy,
2021
logger: Logger,
2122
}
2223

@@ -26,13 +27,28 @@ impl LocalUploader {
2627
server_url_prefix: SanitizedUrlWithTrailingSlash,
2728
target_location: &Path,
2829
logger: Logger,
30+
) -> StdResult<Self> {
31+
Self::with_retry_policy(
32+
server_url_prefix,
33+
target_location,
34+
FileUploadRetryPolicy::never(),
35+
logger,
36+
)
37+
}
38+
39+
pub(crate) fn with_retry_policy(
40+
server_url_prefix: SanitizedUrlWithTrailingSlash,
41+
target_location: &Path,
42+
retry_policy: FileUploadRetryPolicy,
43+
logger: Logger,
2944
) -> StdResult<Self> {
3045
let logger = logger.new_with_component_name::<Self>();
3146
debug!(logger, "New LocalUploader created"; "server_url_prefix" => &server_url_prefix.as_str());
3247

3348
Ok(Self {
3449
server_url_prefix,
3550
target_location: target_location.to_path_buf(),
51+
retry_policy,
3652
logger,
3753
})
3854
}
@@ -56,13 +72,18 @@ impl FileUploader for LocalUploader {
5672
);
5773
Ok(FileUri(location))
5874
}
75+
76+
fn retry_policy(&self) -> FileUploadRetryPolicy {
77+
self.retry_policy.clone()
78+
}
5979
}
6080

6181
#[cfg(test)]
6282
mod tests {
6383
use std::fs::File;
6484
use std::io::Write;
6585
use std::path::{Path, PathBuf};
86+
use std::time::Duration;
6687

6788
use mithril_common::test_utils::TempDir;
6889

@@ -152,4 +173,25 @@ mod tests {
152173
.await
153174
.expect_err("Uploading a directory should fail");
154175
}
176+
177+
#[tokio::test]
178+
async fn retry_policy_from_file_uploader_trait_should_be_implemented() {
179+
let target_dir = TempDir::create("local_uploader", "test_retry_policy");
180+
let expected_policy = FileUploadRetryPolicy {
181+
attempts: 10,
182+
delay_between_attempts: Duration::from_millis(123),
183+
};
184+
185+
let uploader: Box<dyn FileUploader> = Box::new(
186+
LocalUploader::with_retry_policy(
187+
SanitizedUrlWithTrailingSlash::parse("http://test.com:8080/base-root/").unwrap(),
188+
&target_dir,
189+
expected_policy.clone(),
190+
TestLogger::stdout(),
191+
)
192+
.unwrap(),
193+
);
194+
195+
assert_eq!(expected_policy, uploader.retry_policy());
196+
}
155197
}

mithril-aggregator/src/file_uploaders/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ mod local_uploader;
66

77
pub use dumb_uploader::*;
88
pub use gcp_uploader::{CloudRemotePath, GcpBackendUploader, GcpUploader};
9-
pub use interface::FileUploader;
9+
pub use interface::{FileUploadRetryPolicy, FileUploader};
1010
pub use local_snapshot_uploader::LocalSnapshotUploader;
1111
pub use local_uploader::LocalUploader;
1212

0 commit comments

Comments
 (0)