Skip to content

Commit

Permalink
Refactor log
Browse files Browse the repository at this point in the history
Signed-off-by: terassyi <iscale821@gmail.com>
  • Loading branch information
terassyi committed May 31, 2024
1 parent 48b1995 commit 346865d
Show file tree
Hide file tree
Showing 13 changed files with 124 additions and 126 deletions.
1 change: 1 addition & 0 deletions sart/src/proto/sart.v1.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// This file is @generated by prost-build.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct HealthRequest {}
Expand Down
2 changes: 1 addition & 1 deletion sartd/src/kubernetes/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ serde_json = "1.0.108"
thiserror = "1.0.53"
tokio = { version = "1.35.1", features = ["rt-multi-thread", "macros", "net"] }
tracing = "0.1.40"
sartd-trace = { path = "../trace" }
serde_yaml = "0.9.29"
tonic = "0.10.2"

sartd-cert = { path = "../cert" }
sartd-proto = { path = "../proto" }
sartd-ipam = { path = "../ipam" }
sartd-mock = { path = "../mock" }
sartd-trace = { path = "../trace" }
futures = "0.3.30"
rtnetlink = "0.14.1"
actix-web = { version = "4.4.1", features = ["rustls-0_21"] }
Expand Down
17 changes: 6 additions & 11 deletions sartd/src/kubernetes/src/agent/reconciler/bgp_advertisement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use kube::{
runtime::{controller::Action, watcher::Config, Controller},
Api, Client, ResourceExt,
};
use tracing::{field, Span};

use crate::{
agent::{bgp::speaker, error::Error},
Expand All @@ -20,7 +21,7 @@ use crate::{

use super::node_bgp::{DEFAULT_SPEAKER_TIMEOUT, ENV_HOSTNAME};

#[tracing::instrument(skip_all)]
#[tracing::instrument(skip_all, fields(trace_id))]
pub async fn run(state: State, interval: u64) {
let client = Client::try_default()
.await
Expand Down Expand Up @@ -59,12 +60,6 @@ pub async fn reconciler(ba: Arc<BGPAdvertisement>, ctx: Arc<Context>) -> Result<

let bgp_advertisements = Api::<BGPAdvertisement>::namespaced(ctx.client.clone(), &ns);

tracing::info!(
name = ba.name_any(),
namespace = ns,
"Reconcile BGPAdvertisement"
);

reconcile(&bgp_advertisements, &ba, ctx).await
}

Expand All @@ -74,6 +69,9 @@ async fn reconcile(
ba: &BGPAdvertisement,
ctx: Arc<Context>,
) -> Result<Action, Error> {
let trace_id = sartd_trace::telemetry::get_trace_id();
Span::current().record("trace_id", &field::display(&trace_id));

let node_bgps = Api::<NodeBGP>::all(ctx.client.clone());
let node_name = std::env::var(ENV_HOSTNAME).map_err(Error::Var)?;

Expand Down Expand Up @@ -136,7 +134,6 @@ async fn reconcile(
})
.await
.map_err(Error::GotgPRC)?;
tracing::info!(name = ba.name_any(), namespace = ba.namespace(), status=?adv_status, response=?res,"Add path response");

*adv_status = AdvertiseStatus::Advertised;
need_update = true;
Expand All @@ -150,7 +147,6 @@ async fn reconcile(
})
.await
.map_err(Error::GotgPRC)?;
tracing::info!(name = ba.name_any(), namespace = ba.namespace(), status=?adv_status ,response=?res,"Add path response");
}
AdvertiseStatus::Withdraw => {
let res = speaker_client
Expand All @@ -160,7 +156,6 @@ async fn reconcile(
})
.await
.map_err(Error::GotgPRC)?;
tracing::info!(name = ba.name_any(), namespace = ba.namespace(), status=?adv_status, response=?res,"Delete path response");

peers.remove(&p.name);
need_update = true;
Expand All @@ -182,7 +177,7 @@ async fn reconcile(
tracing::info!(
name = ba.name_any(),
namespace = ba.namespace(),
"Update BGPAdvertisement"
"update BGPAdvertisement"
);
return Ok(Action::requeue(Duration::from_secs(60)));
}
Expand Down
34 changes: 21 additions & 13 deletions sartd/src/kubernetes/src/agent/reconciler/bgp_peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{sync::Arc, time::Duration};
use futures::StreamExt;
use k8s_openapi::api::discovery::v1::EndpointSlice;
use kube::{
api::{ListParams, Patch, PatchParams, PostParams},
api::{ListParams, PostParams, Patch, PatchParams},
runtime::{
controller::Action,
finalizer::{finalizer, Event},
Expand All @@ -12,6 +12,7 @@ use kube::{
},
Api, Client, ResourceExt,
};
use tracing::{field, Span};

use crate::{
agent::{bgp::speaker, error::Error},
Expand Down Expand Up @@ -46,7 +47,8 @@ pub async fn reconciler(bp: Arc<BGPPeer>, ctx: Arc<Context>) -> Result<Action, E

#[tracing::instrument(skip_all, fields(trace_id))]
async fn reconcile(api: &Api<BGPPeer>, bp: &BGPPeer, ctx: Arc<Context>) -> Result<Action, Error> {
tracing::info!(name = bp.name_any(), "Reconcile BGPPeer");
let trace_id = sartd_trace::telemetry::get_trace_id();
Span::current().record("trace_id", &field::display(&trace_id));

let timeout = match bp.spec.speaker.timeout {
Some(t) => t,
Expand All @@ -64,7 +66,7 @@ async fn reconcile(api: &Api<BGPPeer>, bp: &BGPPeer, ctx: Arc<Context>) -> Resul
if info.asn == 0 {
tracing::warn!(
node_bgp = bp.spec.node_bgp_ref,
"Local BGP speaker is not configured"
"local BGP speaker is not configured"
);
return Ok(Action::requeue(Duration::from_secs(1)));
}
Expand All @@ -88,6 +90,11 @@ async fn reconcile(api: &Api<BGPPeer>, bp: &BGPPeer, ctx: Arc<Context>) -> Resul
let mut need_status_update = false;
match &peer.get_ref().peer {
Some(peer) => {
tracing::info!(
asn = bp.spec.asn,
addr = bp.spec.addr,
"peer already exists"
);
// update status
match new_bp.status.as_mut() {
Some(status) => match status.conditions.as_mut() {
Expand All @@ -104,7 +111,7 @@ async fn reconcile(api: &Api<BGPPeer>, bp: &BGPPeer, ctx: Arc<Context>) -> Resul
addr = bp.spec.addr,
old_state = ?cond.status,
new_state = ?new_state,
"Peer state is changed"
"peer state is changed"
);
conditions.push(BGPPeerCondition {
status: BGPPeerConditionStatus::try_from(status as i32)
Expand Down Expand Up @@ -140,7 +147,7 @@ async fn reconcile(api: &Api<BGPPeer>, bp: &BGPPeer, ctx: Arc<Context>) -> Resul
asn = bp.spec.asn,
addr = bp.spec.addr,
state = ?state,
"Peer state is initialized"
"peer state is initialized"
);
status.conditions = Some(vec![BGPPeerCondition {
status: state,
Expand All @@ -157,7 +164,7 @@ async fn reconcile(api: &Api<BGPPeer>, bp: &BGPPeer, ctx: Arc<Context>) -> Resul
asn = bp.spec.asn,
addr = bp.spec.addr,
state = ?state,
"Peer state is initialized"
"peer state is initialized"
);
new_bp.status = Some(BGPPeerStatus {
backoff: 0,
Expand All @@ -182,7 +189,7 @@ async fn reconcile(api: &Api<BGPPeer>, bp: &BGPPeer, ctx: Arc<Context>) -> Resul
name = bp.name_any(),
asn = bp.spec.asn,
addr = bp.spec.addr,
"Update BGPPeer status"
"update BGPPeer status"
);
let patch = Patch::Merge(&new_bp);
if let Err(e) = api
Expand All @@ -203,7 +210,7 @@ async fn reconcile(api: &Api<BGPPeer>, bp: &BGPPeer, ctx: Arc<Context>) -> Resul
tracing::info!(
asn = bp.spec.asn,
addr = bp.spec.addr,
"Peer doesn't exist yet"
"peer doesn't exist yet"
);

speaker_client
Expand Down Expand Up @@ -263,7 +270,7 @@ async fn reconcile(api: &Api<BGPPeer>, bp: &BGPPeer, ctx: Arc<Context>) -> Resul
name = bp.name_any(),
asn = bp.spec.asn,
addr = bp.spec.addr,
"Reflect the newly established peer to existing BGPAdvertisements"
"reflect the newly established peer to existing BGPAdvertisements"
);
let eps_api = Api::<EndpointSlice>::all(ctx.client.clone());
let mut eps_list = eps_api
Expand All @@ -290,7 +297,7 @@ async fn reconcile(api: &Api<BGPPeer>, bp: &BGPPeer, ctx: Arc<Context>) -> Resul
name = bp.name_any(),
asn = bp.spec.asn,
addr = bp.spec.addr,
"Reset BGPAdvertisements"
"reset BGPAdvertisements"
);
let ba_api = Api::<BGPAdvertisement>::all(ctx.client.clone());
let mut ba_list = ba_api
Expand Down Expand Up @@ -325,7 +332,8 @@ async fn reconcile(api: &Api<BGPPeer>, bp: &BGPPeer, ctx: Arc<Context>) -> Resul

#[tracing::instrument(skip_all, fields(trace_id))]
async fn cleanup(_api: &Api<BGPPeer>, bp: &BGPPeer, ctx: Arc<Context>) -> Result<Action, Error> {
tracing::info!(name = bp.name_any(), "Cleanup BGPPeer");
let trace_id = sartd_trace::telemetry::get_trace_id();
Span::current().record("trace_id", &field::display(&trace_id));

let timeout = match bp.spec.speaker.timeout {
Some(t) => t,
Expand All @@ -342,7 +350,7 @@ async fn cleanup(_api: &Api<BGPPeer>, bp: &BGPPeer, ctx: Arc<Context>) -> Result
.await
{
Ok(_peer) => {
tracing::info!(name = bp.name_any(), addr = bp.spec.addr, "Delete peer");
tracing::info!(name = bp.name_any(), addr = bp.spec.addr, "delete peer");
speaker_client
.delete_peer(sartd_proto::sart::DeletePeerRequest {
addr: bp.spec.addr.clone(),
Expand Down Expand Up @@ -377,7 +385,7 @@ async fn cleanup(_api: &Api<BGPPeer>, bp: &BGPPeer, ctx: Arc<Context>) -> Result
name = bp.name_any(),
asn = bp.spec.asn,
addr = bp.spec.addr,
"Clean up the peer from BGPAdvertisements"
"clean up the peer from BGPAdvertisements"
);
let ba_api = Api::<BGPAdvertisement>::all(ctx.client.clone());
let mut ba_list = ba_api
Expand Down
36 changes: 22 additions & 14 deletions sartd/src/kubernetes/src/agent/reconciler/node_bgp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use kube::{
},
Api, Client, ResourceExt,
};
use tracing::{field, Span};

use crate::{
agent::{bgp::speaker, error::Error},
Expand Down Expand Up @@ -43,9 +44,10 @@ pub async fn reconciler(nb: Arc<NodeBGP>, ctx: Arc<Context>) -> Result<Action, E
.map_err(|e| Error::Finalizer(Box::new(e)))
}

#[tracing::instrument(skip_all)]
#[tracing::instrument(skip_all, fields(trace_id))]
async fn reconcile(api: &Api<NodeBGP>, nb: &NodeBGP, ctx: Arc<Context>) -> Result<Action, Error> {
tracing::info!(name = nb.name_any(), "Reconcile NodeBGP");
let trace_id = sartd_trace::telemetry::get_trace_id();
Span::current().record("trace_id", &field::display(&trace_id));

// NodeBGP.spec.asn and routerId should be immutable

Expand Down Expand Up @@ -81,7 +83,7 @@ async fn reconcile(api: &Api<NodeBGP>, nb: &NodeBGP, ctx: Arc<Context>) -> Resul
name = nb.name_any(),
asn = nb.spec.asn,
router_id = nb.spec.router_id,
"Backoff BGP advertisement"
"backoff BGP advertisement"
);
backoff_advertisements(nb, &ctx.client.clone()).await?;
return Err(e);
Expand All @@ -107,14 +109,14 @@ async fn reconcile(api: &Api<NodeBGP>, nb: &NodeBGP, ctx: Arc<Context>) -> Resul
name = nb.name_any(),
asn = nb.spec.asn,
router_id = nb.spec.router_id,
"Backoff NodeBGP"
"backoff NodeBGP"
);
backoff_advertisements(nb, &ctx.client.clone()).await?;
tracing::warn!(
name = nb.name_any(),
asn = nb.spec.asn,
router_id = nb.spec.router_id,
"Backoff BGP advertisement"
"backoff BGP advertisement"
);
}
return Err(Error::GotgPRC(e));
Expand All @@ -129,8 +131,7 @@ async fn reconcile(api: &Api<NodeBGP>, nb: &NodeBGP, ctx: Arc<Context>) -> Resul
name = nb.name_any(),
asn = nb.spec.asn,
router_id = nb.spec.router_id,
multipath =? nb.spec.speaker.multipath,
"Configure local BGP settings"
"configure local BGP settings"
);
speaker_client
.set_as(sartd_proto::sart::SetAsRequest { asn: nb.spec.asn })
Expand Down Expand Up @@ -180,6 +181,12 @@ async fn reconcile(api: &Api<NodeBGP>, nb: &NodeBGP, ctx: Arc<Context>) -> Resul
backoff_advertisements(nb, &ctx.client.clone()).await?;
new_status.backoff += 1;
}
tracing::info!(
name = nb.name_any(),
asn = nb.spec.asn,
router_id = nb.spec.router_id,
"update NodeBGP status"
);
// update status
let mut new_nb = nb.clone();
new_nb.status = Some(new_status);
Expand Down Expand Up @@ -219,7 +226,7 @@ async fn reconcile(api: &Api<NodeBGP>, nb: &NodeBGP, ctx: Arc<Context>) -> Resul
tracing::info!(
name = nb.name_any(),
peer = bp.name_any(),
"Increment peer's backoff count"
"increment peer's backoff count"
);
bgp_peer_api
.replace_status(
Expand All @@ -244,7 +251,7 @@ async fn reconcile(api: &Api<NodeBGP>, nb: &NodeBGP, ctx: Arc<Context>) -> Resul
name = nb.name_any(),
asn = nb.spec.asn,
router_id = nb.spec.router_id,
"Local BGP settings are already configured"
"local BGP settings are already configured"
);

// patch status
Expand All @@ -254,7 +261,7 @@ async fn reconcile(api: &Api<NodeBGP>, nb: &NodeBGP, ctx: Arc<Context>) -> Resul
reason: NodeBGPConditionReason::Configured,
}
} else {
tracing::warn!("Local BGP speaker configuration and NodeBGP are mismatched");
tracing::warn!("local BGP speaker configuration and NodeBGP are mismatched");
NodeBGPCondition {
status: NodeBGPConditionStatus::Unavailable,
reason: NodeBGPConditionReason::InvalidConfiguration,
Expand Down Expand Up @@ -293,7 +300,7 @@ async fn reconcile(api: &Api<NodeBGP>, nb: &NodeBGP, ctx: Arc<Context>) -> Resul
name = nb.name_any(),
asn = nb.spec.asn,
router_id = nb.spec.router_id,
"Update NodeBGP status"
"update NodeBGP status"
);
// update status
new_nb.status = Some(new_status);
Expand Down Expand Up @@ -345,7 +352,7 @@ async fn reconcile(api: &Api<NodeBGP>, nb: &NodeBGP, ctx: Arc<Context>) -> Resul
}
if !errors.is_empty() {
for e in errors.iter() {
tracing::error!(error=?e, "Failed to reconcile BGPPeer associated with NodeBGP");
tracing::error!(error=?e, "failed to reconcile BGPPeer associated with NodeBGP");
}
// returns ok but, this should retry to reconcile
return Ok(Action::requeue(Duration::from_secs(10)));
Expand All @@ -356,9 +363,10 @@ async fn reconcile(api: &Api<NodeBGP>, nb: &NodeBGP, ctx: Arc<Context>) -> Resul
Ok(Action::requeue(Duration::from_secs(60)))
}

#[tracing::instrument(skip_all)]
#[tracing::instrument(skip_all, fields(trace_id))]
async fn cleanup(_api: &Api<NodeBGP>, nb: &NodeBGP, _ctx: Arc<Context>) -> Result<Action, Error> {
tracing::info!(name = nb.name_any(), "Cleanup NodeBGP");
let trace_id = sartd_trace::telemetry::get_trace_id();
Span::current().record("trace_id", &field::display(&trace_id));

let timeout = nb.spec.speaker.timeout.unwrap_or(DEFAULT_SPEAKER_TIMEOUT);
let mut speaker_client =
Expand Down
Loading

0 comments on commit 346865d

Please sign in to comment.