Skip to content

Commit

Permalink
Add option --retry-on-all-errors
Browse files Browse the repository at this point in the history
Fixes #100
  • Loading branch information
pkolaczk committed Aug 11, 2024
1 parent 48f3c8e commit 0bb843f
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 25 deletions.
30 changes: 23 additions & 7 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::anyhow;
use chrono::Utc;
use clap::builder::PossibleValue;
use clap::{Parser, ValueEnum};
use clap::{Args, Parser, ValueEnum};
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
Expand Down Expand Up @@ -180,15 +180,31 @@ pub struct ConnectionConf {
#[clap(long("request-timeout"), default_value = "5s", value_name = "DURATION", value_parser = parse_duration::parse)]
pub request_timeout: Duration,

#[clap(flatten)]
pub retry_strategy: RetryStrategy,
}

#[derive(Args, Copy, Clone, Debug, Serialize, Deserialize)]
pub struct RetryStrategy {
/// Maximum number of times to retry a failed query
///
/// Unless `retry-on-all-errors` flag is set, retries happens only for
/// timeout / overload errors.
#[clap(long("retries"), default_value = "3", value_name = "COUNT")]
pub retries: u64,

#[clap(
long("retry-delay"),
default_value = "100ms,5s",
value_name = "MIN[,MAX]"
)]
pub retry_interval: RetryDelay,
/// Controls the delay to apply before each retry
///
/// If the maximum delay is unspecified, the same delay is used for each retry.
/// If the maximum delay is not specified, the retries use exponential back-off delay strategy.
/// The first retry uses the minimum delay, and each subsequent retry doubles the delay until
/// it reaches the max delay specified.
#[clap(long, default_value = "100ms,5s", value_name = "MIN[,MAX]")]
pub retry_delay: RetryDelay,

/// Retries queries after all errors, even it the error is considered non-recoverable.
#[clap(long)]
pub retry_on_all_errors: bool,
}

#[derive(Clone, Copy, Default, Debug, Eq, PartialEq, Serialize, Deserialize)]
Expand Down
31 changes: 16 additions & 15 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use tracing::error;
use try_lock::TryLock;
use uuid::{Variant, Version};

use crate::config::{ConnectionConf, RetryDelay};
use crate::config::{ConnectionConf, RetryStrategy};
use crate::latency::LatencyDistributionRecorder;
use crate::LatteError;

Expand Down Expand Up @@ -85,11 +85,7 @@ pub async fn connect(conf: &ConnectionConf) -> Result<Context, CassError> {
.build()
.await
.map_err(|e| CassError(CassErrorKind::FailedToConnect(conf.addresses.clone(), e)))?;
Ok(Context::new(
scylla_session,
conf.retries,
conf.retry_interval,
))
Ok(Context::new(scylla_session, conf.retry_strategy))
}

pub struct ClusterInfo {
Expand Down Expand Up @@ -399,8 +395,7 @@ pub struct Context {
session: Arc<scylla::Session>,
statements: HashMap<String, Arc<PreparedStatement>>,
stats: TryLock<SessionStats>,
retry_number: u64,
retry_interval: RetryDelay,
retry_strategy: RetryStrategy,
#[rune(get, set, add_assign, copy)]
pub load_cycle_count: u64,
#[rune(get)]
Expand All @@ -418,14 +413,13 @@ unsafe impl Send for Context {}
unsafe impl Sync for Context {}

impl Context {
pub fn new(session: scylla::Session, retry_number: u64, retry_interval: RetryDelay) -> Context {
pub fn new(session: scylla::Session, retry_strategy: RetryStrategy) -> Context {
Context {
start_time: TryLock::new(Instant::now()),
session: Arc::new(session),
statements: HashMap::new(),
stats: TryLock::new(SessionStats::new()),
retry_number,
retry_interval,
retry_strategy,
load_cycle_count: 0,
data: Value::Object(Shared::new(Object::new()).unwrap()),
rng: rand::thread_rng(),
Expand Down Expand Up @@ -521,11 +515,12 @@ impl Context {

let mut rs: Result<QueryResult, QueryError> = Err(QueryError::TimeoutError);
let mut attempts = 0;
while attempts <= self.retry_number && Self::should_retry(&rs) {
let retry_strategy = &self.retry_strategy;
while attempts <= retry_strategy.retries && Self::should_retry(&rs, retry_strategy) {
if attempts > 0 {
let current_retry_interval = get_exponential_retry_interval(
self.retry_interval.min,
self.retry_interval.max,
retry_strategy.retry_delay.min,
retry_strategy.retry_delay.max,
attempts,
);
tokio::time::sleep(current_retry_interval).await;
Expand All @@ -546,7 +541,13 @@ impl Context {
self.start_time.try_lock().unwrap().elapsed().as_secs_f64()
}

fn should_retry<R>(result: &Result<R, QueryError>) -> bool {
fn should_retry<R>(result: &Result<R, QueryError>, retry_strategy: &RetryStrategy) -> bool {
if !result.is_err() {
return false;
}
if retry_strategy.retry_on_all_errors {
return true;
}
matches!(
result,
Err(QueryError::RequestTimeout(_))
Expand Down
10 changes: 7 additions & 3 deletions src/report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,13 +565,17 @@ impl<'a> Display for RunConfigCmp<'a> {
Quantity::from(conf.connection.request_timeout.as_secs_f64())
}),
self.line("Retries", "", |conf| {
Quantity::from(conf.connection.retries)
Quantity::from(conf.connection.retry_strategy.retries)
}),
self.line("├─ min delay", "ms", |conf| {
Quantity::from(conf.connection.retry_interval.min.as_secs_f64() * 1000.0)
Quantity::from(
conf.connection.retry_strategy.retry_delay.min.as_secs_f64() * 1000.0,
)
}),
self.line("└─ max delay", "ms", |conf| {
Quantity::from(conf.connection.retry_interval.max.as_secs_f64() * 1000.0)
Quantity::from(
conf.connection.retry_strategy.retry_delay.max.as_secs_f64() * 1000.0,
)
}),
];

Expand Down

0 comments on commit 0bb843f

Please sign in to comment.