Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Namespace stats #671

Merged
merged 1 commit into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions sqld/src/connection/libsql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub struct LibSqlDbFactory<W: WalHook + 'static> {
db_path: PathBuf,
hook: &'static WalMethodsHook<W>,
ctx_builder: Box<dyn Fn() -> W::Context + Sync + Send + 'static>,
stats: Stats,
stats: Arc<Stats>,
config_store: Arc<DatabaseConfigStore>,
extensions: Arc<[PathBuf]>,
max_response_size: u64,
Expand All @@ -49,7 +49,7 @@ where
db_path: PathBuf,
hook: &'static WalMethodsHook<W>,
ctx_builder: F,
stats: Stats,
stats: Arc<Stats>,
config_store: Arc<DatabaseConfigStore>,
extensions: Arc<[PathBuf]>,
max_response_size: u64,
Expand Down Expand Up @@ -168,7 +168,7 @@ impl LibSqlConnection {
extensions: Arc<[PathBuf]>,
wal_hook: &'static WalMethodsHook<W>,
hook_ctx: W::Context,
stats: Stats,
stats: Arc<Stats>,
config_store: Arc<DatabaseConfigStore>,
builder_config: QueryBuilderConfig,
) -> crate::Result<Self>
Expand Down Expand Up @@ -242,7 +242,7 @@ struct Connection<'a> {
timeout_deadline: Option<Instant>,
conn: sqld_libsql_bindings::Connection<'a>,
timed_out: bool,
stats: Stats,
stats: Arc<Stats>,
config_store: Arc<DatabaseConfigStore>,
builder_config: QueryBuilderConfig,
}
Expand All @@ -253,7 +253,7 @@ impl<'a> Connection<'a> {
extensions: Arc<[PathBuf]>,
wal_methods: &'static WalMethodsHook<W>,
hook_ctx: &'a mut W::Context,
stats: Stats,
stats: Arc<Stats>,
config_store: Arc<DatabaseConfigStore>,
builder_config: QueryBuilderConfig,
) -> Result<Self> {
Expand Down Expand Up @@ -612,7 +612,7 @@ mod test {
timeout_deadline: None,
conn: sqld_libsql_bindings::Connection::test(ctx),
timed_out: false,
stats: Stats::default(),
stats: Arc::new(Stats::default()),
config_store: Arc::new(DatabaseConfigStore::new_test()),
builder_config: QueryBuilderConfig::default(),
};
Expand Down
8 changes: 4 additions & 4 deletions sqld/src/connection/write_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub struct MakeWriteProxyConnection {
client: ProxyClient<Channel>,
db_path: PathBuf,
extensions: Arc<[PathBuf]>,
stats: Stats,
stats: Arc<Stats>,
config_store: Arc<DatabaseConfigStore>,
applied_frame_no_receiver: watch::Receiver<FrameNo>,
max_response_size: u64,
Expand All @@ -52,7 +52,7 @@ impl MakeWriteProxyConnection {
extensions: Arc<[PathBuf]>,
channel: Channel,
uri: tonic::transport::Uri,
stats: Stats,
stats: Arc<Stats>,
config_store: Arc<DatabaseConfigStore>,
applied_frame_no_receiver: watch::Receiver<FrameNo>,
max_response_size: u64,
Expand Down Expand Up @@ -110,7 +110,7 @@ pub struct WriteProxyConnection {
/// Notifier from the repliator of the currently applied frameno
applied_frame_no_receiver: watch::Receiver<FrameNo>,
builder_config: QueryBuilderConfig,
stats: Stats,
stats: Arc<Stats>,
/// bytes representing the namespace name
namespace: Bytes,
}
Expand Down Expand Up @@ -166,7 +166,7 @@ impl WriteProxyConnection {
write_proxy: ProxyClient<Channel>,
db_path: PathBuf,
extensions: Arc<[PathBuf]>,
stats: Stats,
stats: Arc<Stats>,
config_store: Arc<DatabaseConfigStore>,
applied_frame_no_receiver: watch::Receiver<FrameNo>,
builder_config: QueryBuilderConfig,
Expand Down
62 changes: 48 additions & 14 deletions sqld/src/heartbeat.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,62 @@
use bytes::Bytes;
use std::collections::HashMap;
use std::sync::Weak;
use std::time::Duration;
use tokio::time::sleep;
use tokio::sync::mpsc;
use url::Url;

use crate::http::stats::StatsResponse;
use crate::http::admin::stats::StatsResponse;
use crate::stats::Stats;

pub async fn server_heartbeat(
url: String,
url: Url,
auth: Option<String>,
update_period: Duration,
stats: Stats,
mut stats_subs: mpsc::Receiver<(Bytes, Weak<Stats>)>,
) {
let mut watched = HashMap::new();
let client = reqwest::Client::new();
let mut interval = tokio::time::interval(update_period);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
sleep(update_period).await;
let body = StatsResponse::from(&stats);
let request = client.post(&url);
let request = if let Some(ref auth) = auth {
request.header("Authorization", auth.clone())
} else {
request
tokio::select! {
Some((ns, stats)) = stats_subs.recv() => {
watched.insert(ns, stats);
}
_ = interval.tick() => {
send_stats(&mut watched, &client, &url, auth.as_deref()).await;
}
};
let request = request.json(&body);
if let Err(err) = request.send().await {
tracing::warn!("Error sending heartbeat: {}", err);
}
}

async fn send_stats(
watched: &mut HashMap<Bytes, Weak<Stats>>,
client: &reqwest::Client,
url: &Url,
auth: Option<&str>,
) {
// first send all the stats...
for (ns, stats) in watched.iter() {
if let Some(stats) = stats.upgrade() {
let body = StatsResponse::from(stats.as_ref());
let mut url = url.clone();
url.path_segments_mut()
.unwrap()
.push(std::str::from_utf8(ns).unwrap());
let request = client.post(url);
let request = if let Some(ref auth) = auth {
request.header("Authorization", auth.to_string())
} else {
request
};
let request = request.json(&body);
if let Err(err) = request.send().await {
tracing::warn!("Error sending heartbeat: {}", err);
}
}
}

// ..and then remove all expired subscription
watched.retain(|_, s| s.upgrade().is_some());
}
2 changes: 1 addition & 1 deletion sqld/src/hrana/ws/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use futures::{SinkExt as _, StreamExt as _};
use tokio_tungstenite::tungstenite;
use tungstenite::http;

use crate::http::db_factory::namespace_from_headers;
use crate::http::user::db_factory::namespace_from_headers;
use crate::net::Conn;

use super::super::{Encoding, Version};
Expand Down
5 changes: 4 additions & 1 deletion sqld/src/admin_api.rs → sqld/src/http/admin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ use crate::connection::config::{DatabaseConfig, DatabaseConfigStore};
use crate::error::LoadDumpError;
use crate::namespace::{DumpStream, MakeNamespace, NamespaceStore, RestoreOption};

pub mod stats;

struct AppState<M: MakeNamespace> {
db_config_store: Arc<DatabaseConfigStore>,
namespaces: NamespaceStore<M>,
}

pub async fn run_admin_api<M, A>(
pub async fn run<M, A>(
acceptor: A,
db_config_store: Arc<DatabaseConfigStore>,
namespaces: NamespaceStore<M>,
Expand All @@ -47,6 +49,7 @@ where
post(handle_restore_namespace),
)
.route("/v1/namespaces/:namespace", delete(handle_delete_namespace))
.route("/v1/namespaces/:namespace/stats", get(stats::handle_stats))
.with_state(Arc::new(AppState {
db_config_store,
namespaces,
Expand Down
29 changes: 13 additions & 16 deletions sqld/src/http/stats.rs → sqld/src/http/admin/stats.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use hyper::{Body, Response};
use std::sync::Arc;

use serde::Serialize;

use axum::extract::{FromRef, State as AxumState};
use axum::extract::{Path, State};
use axum::Json;

use crate::{namespace::MakeNamespace, stats::Stats};
use crate::namespace::MakeNamespace;
use crate::stats::Stats;

use super::AppState;

Expand Down Expand Up @@ -32,18 +35,12 @@ impl From<Stats> for StatsResponse {
}
}

impl<F: MakeNamespace> FromRef<AppState<F>> for Stats {
fn from_ref(input: &AppState<F>) -> Self {
input.stats.clone()
}
}

pub(crate) async fn handle_stats(AxumState(stats): AxumState<Stats>) -> Response<Body> {
let resp: StatsResponse = stats.into();
pub(super) async fn handle_stats<M: MakeNamespace>(
State(app_state): State<Arc<AppState<M>>>,
Path(namespace): Path<String>,
) -> crate::Result<Json<StatsResponse>> {
let stats = app_state.namespaces.stats(namespace.into()).await?;
let resp: StatsResponse = stats.as_ref().into();

let payload = serde_json::to_vec(&resp).unwrap();
Response::builder()
.header("Content-Type", "application/json")
.body(Body::from(payload))
.unwrap()
Ok(Json(resp))
}
Loading