Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds auto-retry logic to watchtower-plugin #168

Merged
merged 4 commits into from
Jan 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion teos/src/api/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ async fn get_subscription_info(

fn router(
grpc_conn: PublicTowerServicesClient<Channel>,
) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
) -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone {
let register = warp::post()
.and(warp::path("register"))
.and(warp::body::content_length_limit(REGISTER_BODY_LEN).and(warp::body::json()))
Expand Down
6 changes: 1 addition & 5 deletions teos/src/api/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,11 +351,7 @@ impl PrivateTowerServices for Arc<InternalAPI> {
Some(info) => Ok(Response::new(msgs::GetUserResponse {
available_slots: info.available_slots,
subscription_expiry: info.subscription_expiry,
appointments: info
.appointments
.iter()
.map(|(uuid, _)| uuid.to_vec())
.collect(),
appointments: info.appointments.keys().map(|uuid| uuid.to_vec()).collect(),
})),
None => Err(Status::new(Code::NotFound, "User not found")),
}
Expand Down
2 changes: 1 addition & 1 deletion teos/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub fn data_dir_absolute_path(data_dir: String) -> PathBuf {
}

pub fn from_file<T: Default + serde::de::DeserializeOwned>(path: PathBuf) -> T {
match std::fs::read(&path) {
match std::fs::read(path) {
Ok(file_content) => toml::from_slice::<T>(&file_content).map_or_else(
|e| {
eprintln!("Couldn't parse config file: {}", e);
Expand Down
2 changes: 1 addition & 1 deletion teos/src/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1663,7 +1663,7 @@ mod tests {
watcher.last_known_block_height.load(Ordering::Relaxed),
chain.get_block_count()
);
watcher.block_connected(&chain.generate(None), chain.get_block_count() as u32);
watcher.block_connected(&chain.generate(None), chain.get_block_count());
assert_eq!(
watcher.last_known_block_height.load(Ordering::Relaxed),
chain.get_block_count()
Expand Down
14 changes: 14 additions & 0 deletions watchtower-plugin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,20 @@ impl TowerSummary {
self.status = status;
self
}

/// Updates the main information about the summary while preserving the appointment maps.
pub fn udpate(
&mut self,
net_addr: String,
available_slots: u32,
subscription_start: u32,
subscription_expiry: u32,
) {
self.net_addr = net_addr;
self.available_slots = available_slots;
self.subscription_start = subscription_start;
self.subscription_expiry = subscription_expiry;
}
}

impl From<TowerInfo> for TowerSummary {
Expand Down
123 changes: 82 additions & 41 deletions watchtower-plugin/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::convert::TryFrom;
use std::env;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, MutexGuard};

use home::home_dir;
use serde_json::json;
Expand All @@ -21,7 +21,7 @@ use watchtower_plugin::net::http::{
self, post_request, process_post_response, AddAppointmentError, ApiResponse, RequestError,
};
use watchtower_plugin::retrier::RetryManager;
use watchtower_plugin::wt_client::WTClient;
use watchtower_plugin::wt_client::{RevocationData, WTClient};
use watchtower_plugin::TowerStatus;

fn to_cln_error(e: RequestError) -> Error {
Expand All @@ -34,6 +34,27 @@ fn to_cln_error(e: RequestError) -> Error {
e
}

/// Sends fresh data to a retrier as long as is does not exist, or it does and its running.
fn send_to_retrier(state: &MutexGuard<WTClient>, tower_id: TowerId, locator: Locator) {
if if let Some(status) = state.get_retrier_status(&tower_id) {
// A retrier in the retriers map can only be running or idle
status.is_running()
} else {
true
} {
state
.unreachable_towers
.send((tower_id, RevocationData::Fresh(locator)))
.unwrap();
} else {
log::debug!(
"Not sending data to idle retrier ({}, {})",
tower_id,
locator
)
}
}

/// Registers the client to a given tower.
///
/// Accepted tower_id formats:
Expand Down Expand Up @@ -286,38 +307,52 @@ async fn get_tower_info(

/// Triggers a manual retry of a tower, tries to send all pending appointments to it.
///
/// Only works if the tower is unreachable or there's been a subscription error.
/// Only works if the tower is unreachable or there's been a subscription error (and the tower is not already being retried).
async fn retry_tower(
plugin: Plugin<Arc<Mutex<WTClient>>>,
v: serde_json::Value,
) -> Result<serde_json::Value, Error> {
let tower_id = TowerId::try_from(v).map_err(|e| anyhow!(e))?;
let state = plugin.state().lock().unwrap();
if let Some(status) = state.get_tower_status(&tower_id) {
if status.is_temporary_unreachable() {
return Err(anyhow!("{} is already being retried", tower_id));
} else if !status.is_retryable() {
return Err(anyhow!(
"Tower status must be unreachable or have a subscription issue to manually retry",
));
}

for locator in state
.towers
.get(&tower_id)
.unwrap()
.pending_appointments
.iter()
{
if let Some(tower_status) = state.get_tower_status(&tower_id) {
if let Some(retrier_status) = state.retriers.get(&tower_id) {
if retrier_status.is_idle() {
// We don't send any associated data in this case given the idle retrier already has it all.
state
.unreachable_towers
.send((tower_id, RevocationData::None))
.map_err(|e| anyhow!(e))?;
} else {
// Status can only be running or idle for data in the retriers map.
return Err(anyhow!("{} is already being retried", tower_id));
}
} else if tower_status.is_retryable() {
// We do send associated data here given there is no retrier associated to this tower.
state
.unreachable_towers
.send((tower_id, *locator))
.send((
tower_id,
RevocationData::Stale(
state
.towers
.get(&tower_id)
.unwrap()
.pending_appointments
.iter()
.cloned()
.collect(),
),
))
.map_err(|e| anyhow!(e))?;
} else {
return Err(anyhow!(
"Tower status must be unreachable or have a subscription issue to manually retry",
));
}
Ok(json!(format!("Retrying {}", tower_id)))
} else {
Err(anyhow!("Unknown tower {}", tower_id))
return Err(anyhow!("Unknown tower {}", tower_id));
}
Ok(json!(format!("Retrying {}", tower_id)))
}

/// Forgets about a tower wiping out all local data associated to it.
Expand Down Expand Up @@ -410,11 +445,7 @@ async fn on_commitment_revocation(
let mut state = plugin.state().lock().unwrap();
state.set_tower_status(tower_id, TowerStatus::TemporaryUnreachable);
state.add_pending_appointment(tower_id, &appointment);

state
.unreachable_towers
.send((tower_id, appointment.locator))
.unwrap();
send_to_retrier(&state, tower_id, appointment.locator);
}
}
AddAppointmentError::ApiError(e) => match e.error_code {
Expand All @@ -427,11 +458,7 @@ async fn on_commitment_revocation(
let mut state = plugin.state().lock().unwrap();
state.set_tower_status(tower_id, TowerStatus::SubscriptionError);
state.add_pending_appointment(tower_id, &appointment);

state
.unreachable_towers
.send((tower_id, appointment.locator))
.unwrap();
send_to_retrier(&state, tower_id, appointment.locator);
}

_ => {
Expand Down Expand Up @@ -482,11 +509,8 @@ async fn on_commitment_revocation(
let mut state = plugin.state().lock().unwrap();
state.add_pending_appointment(tower_id, &appointment);

if status.is_temporary_unreachable() {
state
.unreachable_towers
.send((tower_id, appointment.locator))
.unwrap();
if !status.is_unreachable() {
send_to_retrier(&state, tower_id, appointment.locator);
}
}
}
Expand Down Expand Up @@ -517,7 +541,11 @@ async fn main() -> Result<(), Error> {
"watchtower-proxy",
Value::OptString,
"Socks v5 proxy IP address and port for the watchtower client",
))
)).option(ConfigOption::new(
"watchtower-auto-retry-delay",
Value::Integer(86400),
"the time (in seconds) that a retrier will wait before auto-retrying a failed tower. Defaults to once a day",
))
.option(ConfigOption::new(
"dev-watchtower-max-retry-interval",
Value::Integer(60),
Expand Down Expand Up @@ -593,6 +621,13 @@ async fn main() -> Result<(), Error> {
// We will never end up here, but we need to define an else. Should be fixed alongside the previous fixme.
900
};
let auto_retry_delay =
if let Value::Integer(x) = midstate.option("watchtower-auto-retry-delay").unwrap() {
x as u16
} else {
// We will never end up here, but we need to define an else. Should be fixed alongside the previous fixme.
3600
};
let max_interval_time = if let Value::Integer(x) = midstate
.option("dev-watchtower-max-retry-interval")
.unwrap()
Expand All @@ -605,9 +640,15 @@ async fn main() -> Result<(), Error> {

let plugin = midstate.start(wt_client.clone()).await?;
tokio::spawn(async move {
RetryManager::new(wt_client, rx, max_elapsed_time, max_interval_time)
.manage_retry()
.await
RetryManager::new(
wt_client,
rx,
max_elapsed_time,
auto_retry_delay,
max_interval_time,
)
.manage_retry()
.await
});
plugin.join().await
}
Loading