Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ All notable changes to this project will be documented in this file.
- Support configuring the scaler reconcile interval ([#61]).
- Add simple web-based dashboard that shows the current state and query counts of all clusters.
This makes it easier to debug state transitions of clusters ([#62]).
- Add `Unhealthy` cluster state.
This state is entered once the readiness check of a cluster in the `Ready` state fails.
The cluster will remain in the `Unhealthy` state until the scaler marks that cluster as `Ready` again.
`Unhealthy` clusters won't get any new queries; if all clusters are unhealthy, new queries will be queued.
The cluster health check interval can be configured using the scaler reconcile interval ([#63]).

### Changed

Expand All @@ -21,6 +26,7 @@ All notable changes to this project will be documented in this file.
[#57]: https://github.com/stackabletech/trino-lb/pull/57
[#61]: https://github.com/stackabletech/trino-lb/pull/61
[#62]: https://github.com/stackabletech/trino-lb/pull/62
[#63]: https://github.com/stackabletech/trino-lb/pull/63

## [0.3.2] - 2024-08-20

Expand Down
6 changes: 6 additions & 0 deletions trino-lb-core/src/trino_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ pub enum ClusterState {
Starting,
/// Up and running, ready to get queries
Ready,
/// Up, but not ready to accept queries. It should not be started or stopped, as it's healing itself.
Unhealthy,
/// No new queries should be submitted. Once all running queries are finished and a certain time period has passed
/// go to `Terminating`
Draining {
Expand All @@ -30,6 +32,7 @@ impl Display for ClusterState {
ClusterState::Stopped => f.write_str("Stopped"),
ClusterState::Starting => f.write_str("Starting"),
ClusterState::Ready => f.write_str("Ready"),
ClusterState::Unhealthy => f.write_str("Unhealthy"),
ClusterState::Draining { .. } => f.write_str("Draining"),
ClusterState::Terminating => f.write_str("Terminating"),
ClusterState::Deactivated => f.write_str("Deactivated"),
Expand All @@ -45,6 +48,7 @@ impl ClusterState {
| ClusterState::Starting
| ClusterState::Terminating => ClusterState::Starting,
ClusterState::Ready | ClusterState::Draining { .. } => ClusterState::Ready,
ClusterState::Unhealthy => ClusterState::Unhealthy,
ClusterState::Deactivated => ClusterState::Deactivated,
}
}
Expand All @@ -55,6 +59,7 @@ impl ClusterState {
ClusterState::Unknown
// No, because it is already started
| ClusterState::Starting
| ClusterState::Unhealthy
| ClusterState::Ready
| ClusterState::Terminating
| ClusterState::Deactivated => false,
Expand All @@ -64,6 +69,7 @@ impl ClusterState {
pub fn ready_to_accept_queries(&self) -> bool {
match self {
ClusterState::Unknown
| ClusterState::Unhealthy
| ClusterState::Stopped
| ClusterState::Draining { .. }
| ClusterState::Terminating
Expand Down
42 changes: 35 additions & 7 deletions trino-lb/src/maintenance/query_count_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,7 @@ impl QueryCountFetcher {
self.clusters
.iter()
.zip(cluster_states)
.filter_map(|(cluster, state)| match state{
ClusterState::Unknown | ClusterState::Stopped | ClusterState::Starting | ClusterState::Terminating | ClusterState::Deactivated => None,
ClusterState::Ready | ClusterState::Draining{ .. } => Some(cluster),
})
.map(|cluster| self.process_cluster(cluster)),
.map(|(cluster, state)| self.process_cluster(cluster, state))
)
.await;

Expand All @@ -152,8 +148,40 @@ impl QueryCountFetcher {
}
}

#[instrument(skip(self))]
async fn process_cluster(&self, cluster: &TrinoClusterConfig) {
/// Update the query count for the given cluster.
///
/// - In case the cluster is reachable, fetch the current query count and store it.
/// - In case we know the cluster is not reachable (e.g. the cluster is turned off or currently
/// starting), store a query count of zero (0) to avoid dangling clusters with non-zero query
/// counts.
#[instrument(skip(self, cluster), fields(cluster_name = cluster.name))]
async fn process_cluster(&self, cluster: &TrinoClusterConfig, state: ClusterState) {
match state {
ClusterState::Ready | ClusterState::Unhealthy | ClusterState::Draining { .. } => {
self.fetch_and_store_query_count(cluster).await;
}
ClusterState::Unknown
| ClusterState::Stopped
| ClusterState::Starting
| ClusterState::Terminating
| ClusterState::Deactivated => {
if let Err(err) = self
.persistence
.set_cluster_query_count(&cluster.name, 0)
.await
{
error!(
cluster = cluster.name,
?err,
"QueryCountFetcher: Failed to set current cluster query count to zero"
);
}
}
}
}

#[instrument(skip(self, cluster), fields(cluster_name = cluster.name))]
async fn fetch_and_store_query_count(&self, cluster: &TrinoClusterConfig) {
let cluster_info =
get_cluster_info(&cluster.endpoint, self.ignore_certs, &cluster.credentials).await;

Expand Down
35 changes: 26 additions & 9 deletions trino-lb/src/scaling/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tokio::{
task::{JoinError, JoinSet},
time,
};
use tracing::{debug, error, info, instrument, Instrument, Span};
use tracing::{debug, error, info, instrument, warn, Instrument, Span};
use trino_lb_core::{
config::{Config, ScalerConfig, ScalerConfigImplementation},
trino_cluster::ClusterState,
Expand Down Expand Up @@ -266,7 +266,7 @@ impl Scaler {
Ok(())
}

#[instrument(name = "Scaler::reconcile_cluster_group", skip(self))]
#[instrument(name = "Scaler::reconcile_cluster_group", skip(self, clusters))]
pub async fn reconcile_cluster_group(
self: Arc<Self>,
cluster_group: String,
Expand Down Expand Up @@ -435,7 +435,7 @@ impl Scaler {
Ok(())
}

#[instrument(name = "Scaler::get_current_state", skip(self))]
#[instrument(name = "Scaler::get_current_state", skip(self, scaling_config))]
async fn get_current_cluster_state(
self: Arc<Self>,
cluster_name: TrinoClusterName,
Expand All @@ -461,8 +461,8 @@ impl Scaler {
// State not known in persistence, so let's determine current state
match (activated, ready) {
(true, true) => ClusterState::Ready,
// It could also be Terminating, but in that case it would need to be stored as Terminating
// in the persistence
// It could also be Terminating or Unhealthy, but in that case it would need to be stored as
// Terminating or Unhealthy in the persistence
(true, false) => ClusterState::Starting,
// This might happen for very short time periods. E.g. for the Stackable scaler, this can be
// the case when spec.clusterOperation.stopped was just set to true, but trino-operator did
Expand All @@ -480,8 +480,18 @@ impl Scaler {
}
}
ClusterState::Ready => {
// In the future we might want to check if the cluster is healthy and have a state `Unhealthy`.
ClusterState::Ready
if ready {
ClusterState::Ready
} else {
ClusterState::Unhealthy
}
}
ClusterState::Unhealthy => {
if ready {
ClusterState::Ready
} else {
ClusterState::Unhealthy
}
}
ClusterState::Draining {
last_time_seen_with_queries,
Expand Down Expand Up @@ -540,7 +550,11 @@ impl Scaler {
Ok((cluster_name, current_state))
}

#[instrument(name = "Scaler::apply_target_states", skip(self))]
#[instrument(
name = "Scaler::apply_cluster_target_state",
skip(self, cluster),
fields(%cluster.name)
)]
async fn apply_cluster_target_state(
self: Arc<Self>,
cluster: TrinoCluster,
Expand All @@ -555,7 +569,10 @@ impl Scaler {
ClusterState::Stopped | ClusterState::Terminating => {
scaler.deactivate(&cluster.name).await?;
}
ClusterState::Starting | ClusterState::Ready | ClusterState::Draining { .. } => {
ClusterState::Starting
| ClusterState::Unhealthy
| ClusterState::Ready
| ClusterState::Draining { .. } => {
scaler.activate(&cluster.name).await?;
}
ClusterState::Deactivated => {
Expand Down
Loading