From 346865db3764dc395feaa4dd4870396cc3ba10e2 Mon Sep 17 00:00:00 2001 From: terassyi Date: Wed, 29 May 2024 23:51:28 +0900 Subject: [PATCH] Refactor log Signed-off-by: terassyi --- sart/src/proto/sart.v1.rs | 1 + sartd/src/kubernetes/Cargo.toml | 2 +- .../src/agent/reconciler/bgp_advertisement.rs | 17 +++---- .../src/agent/reconciler/bgp_peer.rs | 34 ++++++++------ .../src/agent/reconciler/node_bgp.rs | 36 +++++++++------ .../controller/reconciler/address_block.rs | 21 +++++---- .../src/controller/reconciler/address_pool.rs | 12 +++-- .../reconciler/bgp_advertisement.rs | 22 +++------ .../src/controller/reconciler/cluster_bgp.rs | 27 +++++------ .../reconciler/endpointslice_watcher.rs | 20 ++++---- .../src/controller/reconciler/node_watcher.rs | 11 ++++- .../controller/reconciler/service_watcher.rs | 46 ++++++++----------- sartd/src/proto/src/sart.v1.rs | 1 - 13 files changed, 124 insertions(+), 126 deletions(-) diff --git a/sart/src/proto/sart.v1.rs b/sart/src/proto/sart.v1.rs index 03510c9..498500d 100644 --- a/sart/src/proto/sart.v1.rs +++ b/sart/src/proto/sart.v1.rs @@ -1,3 +1,4 @@ +// This file is @generated by prost-build. #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct HealthRequest {} diff --git a/sartd/src/kubernetes/Cargo.toml b/sartd/src/kubernetes/Cargo.toml index 925400b..e95e76d 100644 --- a/sartd/src/kubernetes/Cargo.toml +++ b/sartd/src/kubernetes/Cargo.toml @@ -21,7 +21,6 @@ serde_json = "1.0.108" thiserror = "1.0.53" tokio = { version = "1.35.1", features = ["rt-multi-thread", "macros", "net"] } tracing = "0.1.40" -sartd-trace = { path = "../trace" } serde_yaml = "0.9.29" tonic = "0.10.2" @@ -29,6 +28,7 @@ sartd-cert = { path = "../cert" } sartd-proto = { path = "../proto" } sartd-ipam = { path = "../ipam" } sartd-mock = { path = "../mock" } +sartd-trace = { path = "../trace" } futures = "0.3.30" rtnetlink = "0.14.1" actix-web = { version = "4.4.1", features = ["rustls-0_21"] } diff --git a/sartd/src/kubernetes/src/agent/reconciler/bgp_advertisement.rs b/sartd/src/kubernetes/src/agent/reconciler/bgp_advertisement.rs index 96c6520..7182d89 100644 --- a/sartd/src/kubernetes/src/agent/reconciler/bgp_advertisement.rs +++ b/sartd/src/kubernetes/src/agent/reconciler/bgp_advertisement.rs @@ -6,6 +6,7 @@ use kube::{ runtime::{controller::Action, watcher::Config, Controller}, Api, Client, ResourceExt, }; +use tracing::{field, Span}; use crate::{ agent::{bgp::speaker, error::Error}, @@ -20,7 +21,7 @@ use crate::{ use super::node_bgp::{DEFAULT_SPEAKER_TIMEOUT, ENV_HOSTNAME}; -#[tracing::instrument(skip_all)] +#[tracing::instrument(skip_all, fields(trace_id))] pub async fn run(state: State, interval: u64) { let client = Client::try_default() .await @@ -59,12 +60,6 @@ pub async fn reconciler(ba: Arc, ctx: Arc) -> Result< let bgp_advertisements = Api::::namespaced(ctx.client.clone(), &ns); - tracing::info!( - name = ba.name_any(), - namespace = ns, - "Reconcile BGPAdvertisement" - ); - reconcile(&bgp_advertisements, &ba, ctx).await } @@ -74,6 +69,9 @@ async fn reconcile( ba: &BGPAdvertisement, ctx: Arc, ) -> Result { + let trace_id = sartd_trace::telemetry::get_trace_id(); + Span::current().record("trace_id", &field::display(&trace_id)); + let node_bgps = Api::::all(ctx.client.clone()); let node_name = std::env::var(ENV_HOSTNAME).map_err(Error::Var)?; @@ -136,7 +134,6 @@ async fn reconcile( }) .await .map_err(Error::GotgPRC)?; - tracing::info!(name = ba.name_any(), namespace = ba.namespace(), status=?adv_status, response=?res,"Add path response"); *adv_status = AdvertiseStatus::Advertised; need_update = true; @@ -150,7 +147,6 @@ async fn reconcile( }) .await .map_err(Error::GotgPRC)?; - tracing::info!(name = ba.name_any(), namespace = ba.namespace(), status=?adv_status ,response=?res,"Add path response"); } AdvertiseStatus::Withdraw => { let res = speaker_client @@ -160,7 +156,6 @@ async fn reconcile( }) .await .map_err(Error::GotgPRC)?; - tracing::info!(name = ba.name_any(), namespace = ba.namespace(), status=?adv_status, response=?res,"Delete path response"); peers.remove(&p.name); need_update = true; @@ -182,7 +177,7 @@ async fn reconcile( tracing::info!( name = ba.name_any(), namespace = ba.namespace(), - "Update BGPAdvertisement" + "update BGPAdvertisement" ); return Ok(Action::requeue(Duration::from_secs(60))); } diff --git a/sartd/src/kubernetes/src/agent/reconciler/bgp_peer.rs b/sartd/src/kubernetes/src/agent/reconciler/bgp_peer.rs index 6f3daee..145d8dd 100644 --- a/sartd/src/kubernetes/src/agent/reconciler/bgp_peer.rs +++ b/sartd/src/kubernetes/src/agent/reconciler/bgp_peer.rs @@ -3,7 +3,7 @@ use std::{sync::Arc, time::Duration}; use futures::StreamExt; use k8s_openapi::api::discovery::v1::EndpointSlice; use kube::{ - api::{ListParams, Patch, PatchParams, PostParams}, + api::{ListParams, PostParams, Patch, PatchParams}, runtime::{ controller::Action, finalizer::{finalizer, Event}, @@ -12,6 +12,7 @@ use kube::{ }, Api, Client, ResourceExt, }; +use tracing::{field, Span}; use crate::{ agent::{bgp::speaker, error::Error}, @@ -46,7 +47,8 @@ pub async fn reconciler(bp: Arc, ctx: Arc) -> Result, bp: &BGPPeer, ctx: Arc) -> Result { - tracing::info!(name = bp.name_any(), "Reconcile BGPPeer"); + let trace_id = sartd_trace::telemetry::get_trace_id(); + Span::current().record("trace_id", &field::display(&trace_id)); let timeout = match bp.spec.speaker.timeout { Some(t) => t, @@ -64,7 +66,7 @@ async fn reconcile(api: &Api, bp: &BGPPeer, ctx: Arc) -> Resul if info.asn == 0 { tracing::warn!( node_bgp = bp.spec.node_bgp_ref, - "Local BGP speaker is not configured" + "local BGP speaker is not configured" ); return Ok(Action::requeue(Duration::from_secs(1))); } @@ -88,6 +90,11 @@ async fn reconcile(api: &Api, bp: &BGPPeer, ctx: Arc) -> Resul let mut need_status_update = false; match &peer.get_ref().peer { Some(peer) => { + tracing::info!( + asn = bp.spec.asn, + addr = bp.spec.addr, + "peer already exists" + ); // update status match new_bp.status.as_mut() { Some(status) => match status.conditions.as_mut() { @@ -104,7 +111,7 @@ async fn reconcile(api: &Api, bp: &BGPPeer, ctx: Arc) -> Resul addr = bp.spec.addr, old_state = ?cond.status, new_state = ?new_state, - "Peer state is changed" + "peer state is changed" ); conditions.push(BGPPeerCondition { status: BGPPeerConditionStatus::try_from(status as i32) @@ -140,7 +147,7 @@ async fn reconcile(api: &Api, bp: &BGPPeer, ctx: Arc) -> Resul asn = bp.spec.asn, addr = bp.spec.addr, state = ?state, - "Peer state is initialized" + "peer state is initialized" ); status.conditions = Some(vec![BGPPeerCondition { status: state, @@ -157,7 +164,7 @@ async fn reconcile(api: &Api, bp: &BGPPeer, ctx: Arc) -> Resul asn = bp.spec.asn, addr = bp.spec.addr, state = ?state, - "Peer state is initialized" + "peer state is initialized" ); new_bp.status = Some(BGPPeerStatus { backoff: 0, @@ -182,7 +189,7 @@ async fn reconcile(api: &Api, bp: &BGPPeer, ctx: Arc) -> Resul name = bp.name_any(), asn = bp.spec.asn, addr = bp.spec.addr, - "Update BGPPeer status" + "update BGPPeer status" ); let patch = Patch::Merge(&new_bp); if let Err(e) = api @@ -203,7 +210,7 @@ async fn reconcile(api: &Api, bp: &BGPPeer, ctx: Arc) -> Resul tracing::info!( asn = bp.spec.asn, addr = bp.spec.addr, - "Peer doesn't exist yet" + "peer doesn't exist yet" ); speaker_client @@ -263,7 +270,7 @@ async fn reconcile(api: &Api, bp: &BGPPeer, ctx: Arc) -> Resul name = bp.name_any(), asn = bp.spec.asn, addr = bp.spec.addr, - "Reflect the newly established peer to existing BGPAdvertisements" + "reflect the newly established peer to existing BGPAdvertisements" ); let eps_api = Api::::all(ctx.client.clone()); let mut eps_list = eps_api @@ -290,7 +297,7 @@ async fn reconcile(api: &Api, bp: &BGPPeer, ctx: Arc) -> Resul name = bp.name_any(), asn = bp.spec.asn, addr = bp.spec.addr, - "Reset BGPAdvertisements" + "reset BGPAdvertisements" ); let ba_api = Api::::all(ctx.client.clone()); let mut ba_list = ba_api @@ -325,7 +332,8 @@ async fn reconcile(api: &Api, bp: &BGPPeer, ctx: Arc) -> Resul #[tracing::instrument(skip_all, fields(trace_id))] async fn cleanup(_api: &Api, bp: &BGPPeer, ctx: Arc) -> Result { - tracing::info!(name = bp.name_any(), "Cleanup BGPPeer"); + let trace_id = sartd_trace::telemetry::get_trace_id(); + Span::current().record("trace_id", &field::display(&trace_id)); let timeout = match bp.spec.speaker.timeout { Some(t) => t, @@ -342,7 +350,7 @@ async fn cleanup(_api: &Api, bp: &BGPPeer, ctx: Arc) -> Result .await { Ok(_peer) => { - tracing::info!(name = bp.name_any(), addr = bp.spec.addr, "Delete peer"); + tracing::info!(name = bp.name_any(), addr = bp.spec.addr, "delete peer"); speaker_client .delete_peer(sartd_proto::sart::DeletePeerRequest { addr: bp.spec.addr.clone(), @@ -377,7 +385,7 @@ async fn cleanup(_api: &Api, bp: &BGPPeer, ctx: Arc) -> Result name = bp.name_any(), asn = bp.spec.asn, addr = bp.spec.addr, - "Clean up the peer from BGPAdvertisements" + "clean up the peer from BGPAdvertisements" ); let ba_api = Api::::all(ctx.client.clone()); let mut ba_list = ba_api diff --git a/sartd/src/kubernetes/src/agent/reconciler/node_bgp.rs b/sartd/src/kubernetes/src/agent/reconciler/node_bgp.rs index 1b0444c..c8387f8 100644 --- a/sartd/src/kubernetes/src/agent/reconciler/node_bgp.rs +++ b/sartd/src/kubernetes/src/agent/reconciler/node_bgp.rs @@ -11,6 +11,7 @@ use kube::{ }, Api, Client, ResourceExt, }; +use tracing::{field, Span}; use crate::{ agent::{bgp::speaker, error::Error}, @@ -43,9 +44,10 @@ pub async fn reconciler(nb: Arc, ctx: Arc) -> Result, nb: &NodeBGP, ctx: Arc) -> Result { - tracing::info!(name = nb.name_any(), "Reconcile NodeBGP"); + let trace_id = sartd_trace::telemetry::get_trace_id(); + Span::current().record("trace_id", &field::display(&trace_id)); // NodeBGP.spec.asn and routerId should be immutable @@ -81,7 +83,7 @@ async fn reconcile(api: &Api, nb: &NodeBGP, ctx: Arc) -> Resul name = nb.name_any(), asn = nb.spec.asn, router_id = nb.spec.router_id, - "Backoff BGP advertisement" + "backoff BGP advertisement" ); backoff_advertisements(nb, &ctx.client.clone()).await?; return Err(e); @@ -107,14 +109,14 @@ async fn reconcile(api: &Api, nb: &NodeBGP, ctx: Arc) -> Resul name = nb.name_any(), asn = nb.spec.asn, router_id = nb.spec.router_id, - "Backoff NodeBGP" + "backoff NodeBGP" ); backoff_advertisements(nb, &ctx.client.clone()).await?; tracing::warn!( name = nb.name_any(), asn = nb.spec.asn, router_id = nb.spec.router_id, - "Backoff BGP advertisement" + "backoff BGP advertisement" ); } return Err(Error::GotgPRC(e)); @@ -129,8 +131,7 @@ async fn reconcile(api: &Api, nb: &NodeBGP, ctx: Arc) -> Resul name = nb.name_any(), asn = nb.spec.asn, router_id = nb.spec.router_id, - multipath =? nb.spec.speaker.multipath, - "Configure local BGP settings" + "configure local BGP settings" ); speaker_client .set_as(sartd_proto::sart::SetAsRequest { asn: nb.spec.asn }) @@ -180,6 +181,12 @@ async fn reconcile(api: &Api, nb: &NodeBGP, ctx: Arc) -> Resul backoff_advertisements(nb, &ctx.client.clone()).await?; new_status.backoff += 1; } + tracing::info!( + name = nb.name_any(), + asn = nb.spec.asn, + router_id = nb.spec.router_id, + "update NodeBGP status" + ); // update status let mut new_nb = nb.clone(); new_nb.status = Some(new_status); @@ -219,7 +226,7 @@ async fn reconcile(api: &Api, nb: &NodeBGP, ctx: Arc) -> Resul tracing::info!( name = nb.name_any(), peer = bp.name_any(), - "Increment peer's backoff count" + "increment peer's backoff count" ); bgp_peer_api .replace_status( @@ -244,7 +251,7 @@ async fn reconcile(api: &Api, nb: &NodeBGP, ctx: Arc) -> Resul name = nb.name_any(), asn = nb.spec.asn, router_id = nb.spec.router_id, - "Local BGP settings are already configured" + "local BGP settings are already configured" ); // patch status @@ -254,7 +261,7 @@ async fn reconcile(api: &Api, nb: &NodeBGP, ctx: Arc) -> Resul reason: NodeBGPConditionReason::Configured, } } else { - tracing::warn!("Local BGP speaker configuration and NodeBGP are mismatched"); + tracing::warn!("local BGP speaker configuration and NodeBGP are mismatched"); NodeBGPCondition { status: NodeBGPConditionStatus::Unavailable, reason: NodeBGPConditionReason::InvalidConfiguration, @@ -293,7 +300,7 @@ async fn reconcile(api: &Api, nb: &NodeBGP, ctx: Arc) -> Resul name = nb.name_any(), asn = nb.spec.asn, router_id = nb.spec.router_id, - "Update NodeBGP status" + "update NodeBGP status" ); // update status new_nb.status = Some(new_status); @@ -345,7 +352,7 @@ async fn reconcile(api: &Api, nb: &NodeBGP, ctx: Arc) -> Resul } if !errors.is_empty() { for e in errors.iter() { - tracing::error!(error=?e, "Failed to reconcile BGPPeer associated with NodeBGP"); + tracing::error!(error=?e, "failed to reconcile BGPPeer associated with NodeBGP"); } // returns ok but, this should retry to reconcile return Ok(Action::requeue(Duration::from_secs(10))); @@ -356,9 +363,10 @@ async fn reconcile(api: &Api, nb: &NodeBGP, ctx: Arc) -> Resul Ok(Action::requeue(Duration::from_secs(60))) } -#[tracing::instrument(skip_all)] +#[tracing::instrument(skip_all, fields(trace_id))] async fn cleanup(_api: &Api, nb: &NodeBGP, _ctx: Arc) -> Result { - tracing::info!(name = nb.name_any(), "Cleanup NodeBGP"); + let trace_id = sartd_trace::telemetry::get_trace_id(); + Span::current().record("trace_id", &field::display(&trace_id)); let timeout = nb.spec.speaker.timeout.unwrap_or(DEFAULT_SPEAKER_TIMEOUT); let mut speaker_client = diff --git a/sartd/src/kubernetes/src/controller/reconciler/address_block.rs b/sartd/src/kubernetes/src/controller/reconciler/address_block.rs index d112b62..1f066f8 100644 --- a/sartd/src/kubernetes/src/controller/reconciler/address_block.rs +++ b/sartd/src/kubernetes/src/controller/reconciler/address_block.rs @@ -11,6 +11,7 @@ use kube::{ }; use sartd_ipam::manager::{AllocatorSet, Block, BlockAllocator}; +use tracing::{field, Span}; use crate::{ context::{error_policy, ContextWith, Ctx, State}, @@ -46,7 +47,7 @@ pub async fn reconciler( .map_err(|e| Error::Finalizer(Box::new(e))) } -#[tracing::instrument(skip_all)] +#[tracing::instrument(skip_all, fields(trace_id))] async fn reconcile( api: &Api, ab: &AddressBlock, @@ -78,7 +79,8 @@ async fn reconcile_service( if ab.spec.r#type.ne(&AddressType::Service) { return Ok(Action::await_change()); } - tracing::info!(name = ab.name_any(), "Reconcile AddressBlock"); + let trace_id = sartd_trace::telemetry::get_trace_id(); + Span::current().record("trace_id", &field::display(&trace_id)); let component = ctx.component.allocator_set.clone(); let mut alloc_set = component.inner.lock().map_err(|_| Error::FailedToGetLock)?; @@ -95,21 +97,21 @@ async fn reconcile_service( if ab.name_any().ne(name) { tracing::warn!( name = ab.name_any(), - "Auto assignable block already exists." + "auto assignable block already exists." ); return Err(Error::AutoAssignMustBeOne); } } None => { alloc_set.auto_assign = Some(ab.name_any()); - tracing::info!(name = ab.name_any(), "Enable auto assign."); + tracing::info!(name = ab.name_any(), "enable auto assign."); } } } false => { if let Some(name) = &alloc_set.auto_assign { if ab.name_any().eq(name) { - tracing::info!(name = ab.name_any(), "Disable auto assign."); + tracing::info!(name = ab.name_any(), "disable auto assign."); alloc_set.auto_assign = None; } } @@ -118,7 +120,7 @@ async fn reconcile_service( if let Some(auto_assign_name) = &alloc_set.auto_assign { // If disable auto assign if !ab.spec.auto_assign && auto_assign_name.eq(&ab.name_any()) { - tracing::info!(name = ab.name_any(), "Disable auto assign"); + tracing::info!(name = ab.name_any(), "disable auto assign"); } } } @@ -128,7 +130,7 @@ async fn reconcile_service( if ab.spec.auto_assign { match &alloc_set.auto_assign { Some(_a) => { - tracing::warn!(name = ab.name_any(), "Cannot override auto assign."); + tracing::warn!(name = ab.name_any(), "cannot override auto assign."); return Err(Error::FailedToEnableAutoAssign); } None => { @@ -142,7 +144,7 @@ async fn reconcile_service( Ok(Action::await_change()) } -#[tracing::instrument(skip_all)] +#[tracing::instrument(skip_all, fields(trace_id))] async fn cleanup( api: &Api, ab: &AddressBlock, @@ -183,7 +185,8 @@ async fn cleanup_service( ab: &AddressBlock, ctx: Arc>, ) -> Result { - tracing::info!(name = ab.name_any(), "clean up AddressBlock"); + let trace_id = sartd_trace::telemetry::get_trace_id(); + Span::current().record("trace_id", &field::display(&trace_id)); let component = ctx.component.allocator_set.clone(); let mut alloc_set = component.inner.lock().map_err(|_| Error::FailedToGetLock)?; diff --git a/sartd/src/kubernetes/src/controller/reconciler/address_pool.rs b/sartd/src/kubernetes/src/controller/reconciler/address_pool.rs index 789d58f..5ff1c4d 100644 --- a/sartd/src/kubernetes/src/controller/reconciler/address_pool.rs +++ b/sartd/src/kubernetes/src/controller/reconciler/address_pool.rs @@ -19,6 +19,7 @@ use kube::{ Api, Client, ResourceExt, }; use sartd_ipam::manager::{BlockAllocator, Pool}; +use tracing::{field, Span}; use crate::{ context::{error_policy, ContextWith, Ctx, State}, @@ -50,13 +51,14 @@ pub async fn reconciler( .map_err(|e| Error::Finalizer(Box::new(e))) } -#[tracing::instrument(skip_all)] +#[tracing::instrument(skip_all, fields(trace_id))] async fn reconcile( api: &Api, ap: &AddressPool, ctx: Arc>>, ) -> Result { - tracing::info!(name = ap.name_any(), "reconcile AddressPool"); + let trace_id = sartd_trace::telemetry::get_trace_id(); + Span::current().record("trace_id", &field::display(&trace_id)); match ap.spec.r#type { AddressType::Service => reconcile_service_pool(api, ap, ctx).await, @@ -64,7 +66,7 @@ async fn reconcile( } } -#[tracing::instrument(skip_all)] +#[tracing::instrument(skip_all, fields(trace_id))] async fn reconcile_service_pool( api: &Api, ap: &AddressPool, @@ -81,7 +83,7 @@ async fn reconcile_service_pool( .map_err(Error::Kube)? { Some(ab) => { - tracing::warn!(name = ab.name_any(), "AddressBlock already exists"); + tracing::warn!(name = ab.name_any(), "address block already exists"); } None => { let ab = AddressBlock { @@ -146,7 +148,7 @@ async fn reconcile_pod_pool( Ok(Action::await_change()) } -#[tracing::instrument(skip_all)] +#[tracing::instrument(skip_all, fields(trace_id))] async fn cleanup( _api: &Api, ap: &AddressPool, diff --git a/sartd/src/kubernetes/src/controller/reconciler/bgp_advertisement.rs b/sartd/src/kubernetes/src/controller/reconciler/bgp_advertisement.rs index d152e0e..84fbb35 100644 --- a/sartd/src/kubernetes/src/controller/reconciler/bgp_advertisement.rs +++ b/sartd/src/kubernetes/src/controller/reconciler/bgp_advertisement.rs @@ -11,6 +11,7 @@ use kube::{ }, Api, Client, ResourceExt, }; +use tracing::{field, Span}; use crate::{ context::{error_policy, Context, State}, @@ -45,13 +46,6 @@ async fn reconcile( ba: &BGPAdvertisement, _ctx: Arc, ) -> Result { - let ns = get_namespace::(ba).map_err(Error::KubeLibrary)?; - tracing::info!( - name = ba.name_any(), - namespace = ns, - "Reconcile BGPAdvertisement" - ); - Ok(Action::await_change()) } @@ -61,12 +55,10 @@ async fn cleanup( ba: &BGPAdvertisement, _ctx: Arc, ) -> Result { + let trace_id = sartd_trace::telemetry::get_trace_id(); + Span::current().record("trace_id", &field::display(&trace_id)); + let ns = get_namespace::(ba).map_err(Error::KubeLibrary)?; - tracing::info!( - name = ba.name_any(), - namespace = ns, - "Cleanup BGPAdvertisement" - ); let mut new_ba = ba.clone(); let mut need_update = false; @@ -79,7 +71,7 @@ async fn cleanup( tracing::info!( name = ba.name_any(), namespace = ns, - "Successfully delete BGPAdvertisement" + "successfully delete BGPAdvertisement" ); return Ok(Action::await_change()); } @@ -93,7 +85,7 @@ async fn cleanup( tracing::info!( name = ba.name_any(), namespace = ns, - "Successfully delete BGPAdvertisement" + "successfully delete BGPAdvertisement" ); return Ok(Action::await_change()); } @@ -110,7 +102,7 @@ async fn cleanup( tracing::info!( name = &ba.name_any(), namespace = ns, - "Submit withdraw request" + "submit withdraw request" ); } diff --git a/sartd/src/kubernetes/src/controller/reconciler/cluster_bgp.rs b/sartd/src/kubernetes/src/controller/reconciler/cluster_bgp.rs index ac806ba..2eea8e3 100644 --- a/sartd/src/kubernetes/src/controller/reconciler/cluster_bgp.rs +++ b/sartd/src/kubernetes/src/controller/reconciler/cluster_bgp.rs @@ -12,6 +12,7 @@ use kube::{ }, Api, Client, ResourceExt, }; +use tracing::{field, Span}; use crate::{ context::{error_policy, Context, State}, @@ -43,8 +44,11 @@ pub async fn reconciler(cb: Arc, ctx: Arc) -> Result) -> Result { + let trace_id = sartd_trace::telemetry::get_trace_id(); + Span::current().record("trace_id", &field::display(&trace_id)); + tracing::info!(name = cb.name_any(), "reconcile ClusterBGP"); let mut need_requeue = false; @@ -64,18 +68,13 @@ async fn reconcile(cb: &ClusterBGP, ctx: Arc) -> Result None => Vec::new(), }; - tracing::info!(name = cb.name_any(), actual=?actual_nodes, "actual nodes"); - let matched_nodes = nodes.list(&list_params).await.map_err(Error::Kube)?; let matched_node_names = matched_nodes .iter() .map(|n| n.name_any()) .collect::>(); - tracing::info!(name = cb.name_any(), matched=?matched_node_names, "matched nodes"); - - // let (added, remain, removed) = get_diff(&actual_nodes, &matched_node_names); - let (added, remain, removed) = diff::(&actual_nodes, &matched_node_names); + let (added, remain, removed) = diff(&actual_nodes, &matched_node_names); let node_bgps = Api::::all(ctx.client.clone()); @@ -122,7 +121,6 @@ async fn reconcile(cb: &ClusterBGP, ctx: Arc) -> Result let peer_templ_api = Api::::all(ctx.client.clone()); let peers = get_peers(cb, &new_nb, &peer_templ_api).await?; - tracing::info!(nb=nb.name_any(), label=?new_nb.labels(),"NodeBGP label"); match new_nb.spec.peers.as_mut() { Some(nb_peers) => { for peer in peers.iter() { @@ -138,14 +136,14 @@ async fn reconcile(cb: &ClusterBGP, ctx: Arc) -> Result } } if need_spec_update { - tracing::info!(node_bgp=nb.name_any(),asn=nb.spec.asn,router_id=?nb.spec.router_id,"Update existing NodeBGP's spec.peers"); + tracing::info!(node_bgp=nb.name_any(),asn=nb.spec.asn,router_id=?nb.spec.router_id,"update existing NodeBGP's spec.peers"); node_bgps .replace(&nb.name_any(), &PostParams::default(), &new_nb) .await .map_err(Error::Kube)?; } if need_status_update { - tracing::info!(node_bgp=nb.name_any(),asn=nb.spec.asn,router_id=?nb.spec.router_id,"Update existing NodeBGP's status"); + tracing::info!(node_bgp=nb.name_any(),asn=nb.spec.asn,router_id=?nb.spec.router_id,"update existing NodeBGP's status"); node_bgps .replace_status( &nb.name_any(), @@ -181,7 +179,7 @@ async fn reconcile(cb: &ClusterBGP, ctx: Arc) -> Result nb.spec.peers = Some(peers); - tracing::info!(node_bgp=node.name_any(),asn=asn,router_id=?router_id,"Create new NodeBGP resource"); + tracing::info!(node_bgp=node.name_any(),asn=asn,router_id=?router_id,"create new NodeBGP resource"); node_bgps .create(&PostParams::default(), &nb) .await @@ -220,7 +218,7 @@ async fn reconcile(cb: &ClusterBGP, ctx: Arc) -> Result } if need_spec_update { - tracing::info!(node_bgp=nb.name_any(),asn=nb.spec.asn,router_id=?nb.spec.router_id,"Update existing NodeBGP's spec.peers"); + tracing::info!(node_bgp=nb.name_any(),asn=nb.spec.asn,router_id=?nb.spec.router_id,"update existing NodeBGP's spec.peers"); node_bgps .replace(&nb.name_any(), &PostParams::default(), &new_nb) .await @@ -261,7 +259,6 @@ async fn reconcile(cb: &ClusterBGP, ctx: Arc) -> Result } }; - tracing::info!(name = cb.name_any(), "Update ClusterBGP Status"); cluster_bgp_api .replace_status( &cb.name_any(), @@ -280,10 +277,8 @@ async fn reconcile(cb: &ClusterBGP, ctx: Arc) -> Result } // cleanup() is called when a resource is deleted -#[tracing::instrument(skip_all)] +#[tracing::instrument(skip_all, fields(trace_id))] async fn cleanup(cb: &ClusterBGP, _ctx: Arc) -> Result { - tracing::info!(name = cb.name_any(), "clean up ClusterBGP"); - Ok(Action::await_change()) } diff --git a/sartd/src/kubernetes/src/controller/reconciler/endpointslice_watcher.rs b/sartd/src/kubernetes/src/controller/reconciler/endpointslice_watcher.rs index b834228..411b5e9 100644 --- a/sartd/src/kubernetes/src/controller/reconciler/endpointslice_watcher.rs +++ b/sartd/src/kubernetes/src/controller/reconciler/endpointslice_watcher.rs @@ -19,6 +19,7 @@ use kube::{ }, Api, Client, ResourceExt, }; +use tracing::{field, Span}; use crate::{ context::{error_policy, Context, Ctx, State}, @@ -66,11 +67,14 @@ pub async fn reconciler(eps: Arc, ctx: Arc) -> Result) -> Result { + let trace_id = sartd_trace::telemetry::get_trace_id(); + Span::current().record("trace_id", &field::display(&trace_id)); + let ns = get_namespace::(eps).map_err(Error::KubeLibrary)?; tracing::info!( name = eps.name_any(), namespace = ns, - "Reconcile Endpointslice" + "reconcile Endpointslice" ); let svc_name = match get_svc_name_from_eps(eps) { @@ -83,7 +87,7 @@ async fn reconcile(eps: &EndpointSlice, ctx: Arc) -> Result { tracing::warn!( name = svc_name, - "The Service resource associated with EndpointSlice is not found" + "the Service resource associated with EndpointSlice is not found" ); return Ok(Action::await_change()); } @@ -99,7 +103,7 @@ async fn reconcile(eps: &EndpointSlice, ctx: Arc) -> Result) -> Result = BTreeMap::new(); @@ -220,7 +224,7 @@ async fn reconcile(eps: &EndpointSlice, ctx: Arc) -> Result) -> Result) -> Result { let ns = get_namespace::(eps).map_err(Error::KubeLibrary)?; - tracing::info!( - name = eps.name_any(), - namespace = ns, - "Cleanup Endpointslice" - ); - Ok(Action::await_change()) } diff --git a/sartd/src/kubernetes/src/controller/reconciler/node_watcher.rs b/sartd/src/kubernetes/src/controller/reconciler/node_watcher.rs index 00e9fb7..56bfca7 100644 --- a/sartd/src/kubernetes/src/controller/reconciler/node_watcher.rs +++ b/sartd/src/kubernetes/src/controller/reconciler/node_watcher.rs @@ -12,6 +12,7 @@ use kube::{ }, Api, Client, ResourceExt, }; +use tracing::{field, Span}; use crate::{ context::{error_policy, Context, State}, @@ -39,7 +40,8 @@ pub async fn reconciler(node: Arc, ctx: Arc) -> Result, node: &Node, ctx: Arc) -> Result { - tracing::info!(name = node.name_any(), "Reconcile Node"); + let trace_id = sartd_trace::telemetry::get_trace_id(); + Span::current().record("trace_id", &field::display(&trace_id)); // sync node labels let node_bgp_api = Api::::all(ctx.client.clone()); @@ -71,7 +73,7 @@ async fn reconcile(_api: &Api, node: &Node, ctx: Arc) -> Result, node: &Node, ctx: Arc) -> Result, node: &Node, ctx: Arc) -> Result { + let trace_id = sartd_trace::telemetry::get_trace_id(); + Span::current().record("trace_id", &field::display(&trace_id)); + let node_bgp_api = Api::::all(ctx.client.clone()); + tracing::info!(node = node.name_any(), "delete the NodeBGP"); + node_bgp_api .delete(&node.name_any(), &DeleteParams::default()) .await diff --git a/sartd/src/kubernetes/src/controller/reconciler/service_watcher.rs b/sartd/src/kubernetes/src/controller/reconciler/service_watcher.rs index 0e4235d..bb52cd3 100644 --- a/sartd/src/kubernetes/src/controller/reconciler/service_watcher.rs +++ b/sartd/src/kubernetes/src/controller/reconciler/service_watcher.rs @@ -15,6 +15,7 @@ use kube::{ }, Api, Client, ResourceExt, }; +use tracing::{field, Span}; use crate::{ context::{error_policy, ContextWith, Ctx, State}, @@ -57,9 +58,11 @@ async fn reconcile( svc: &Service, ctx: Arc>>, ) -> Result { + let trace_id = sartd_trace::telemetry::get_trace_id(); + Span::current().record("trace_id", &field::display(&trace_id)); let ns = get_namespace::(svc).map_err(Error::KubeLibrary)?; - tracing::info!(name = svc.name_any(), namespace = ns, "Reconcile Service"); + tracing::info!(name = svc.name_any(), namespace = ns, "reconcile Service"); if !is_loadbalancer(svc) { // If Service is not LoadBalancer and it has allocated load balancer addresses, release these. @@ -70,7 +73,6 @@ async fn reconcile( // we check its annotation. // And if it has, we have to release its addresses. let releasable_addrs = get_releasable_addrs(svc); - tracing::info!(name = svc.name_any(), namespace = ns, releasable=?releasable_addrs, "not LoadBalancer"); if let Some(addrs) = releasable_addrs { let released = { let c = ctx.component.clone(); @@ -219,7 +221,11 @@ async fn reconcile( if actual_addrs.eq(&remained) { return Ok(Action::await_change()); } - tracing::info!(name = svc.name_any(), namespace = ns, remained=?remained, released=?removed, "Update allocation."); + tracing::info!( + name = svc.name_any(), + namespace = ns, + "update the allocation" + ); let new_svc = update_svc_lb_addrs(svc, &remained); api.replace_status( @@ -234,7 +240,7 @@ async fn reconcile( name = svc.name_any(), namespace = ns, lb_addrs=?remained, - "Update service status by the allocation lb address" + "update service status by the allocation lb address" ); let new_allocated_addrs = get_allocated_lb_addrs(&new_svc) @@ -276,8 +282,11 @@ async fn cleanup( svc: &Service, ctx: Arc>>, ) -> Result { + let trace_id = sartd_trace::telemetry::get_trace_id(); + Span::current().record("trace_id", &field::display(&trace_id)); + let ns = get_namespace::(svc).map_err(Error::KubeLibrary)?; - tracing::info!(name = svc.name_any(), namespace = ns, "Cleanup Service"); + tracing::info!(name = svc.name_any(), namespace = ns, "cleanup Service"); let allocated_addrs = match get_allocated_lb_addrs(svc) { Some(a) => a, @@ -303,7 +312,7 @@ async fn cleanup( name = svc.name_any(), namespace = ns, lb_addrs=?released, - "Update service status by the release lb address" + "update service status by the release lb address" ); Ok(Action::await_change()) @@ -316,7 +325,7 @@ pub async fn run(state: State, interval: u64, allocator_set: Arc) let services = Api::::all(client.clone()); - tracing::info!("Start Service watcher"); + tracing::info!("start Service watcher"); Controller::new(services, Config::default().any_semantic()) .shutdown_on_signal() @@ -514,7 +523,7 @@ fn release_lb_addrs( name = svc.name_any(), namespace = ns, desired_pool = pool, - "Desired AddressBlock doesn't exist" + "desired AddressBlock doesn't exist" ); continue; } @@ -531,7 +540,7 @@ fn release_lb_addrs( namespace = ns, pool = block.pool_name, address = a.to_string(), - "Relaese address from address pool" + "release the address from address pool" ) } Err(e) => { @@ -615,25 +624,6 @@ fn clear_svc_lb_addrs(svc: &Service, released: &[IpAddr]) -> Service { new_svc } -// fn get_diff(prev: &[String], now: &[String]) -> (Vec, Vec, Vec) { -// let removed = prev -// .iter() -// .filter(|p| !now.contains(p)) -// .cloned() -// .collect::>(); -// let added = now -// .iter() -// .filter(|n| !prev.contains(n) && !removed.contains(n)) -// .cloned() -// .collect::>(); -// let shared = prev -// .iter() -// .filter(|p| now.contains(p)) -// .cloned() -// .collect::>(); -// (added, shared, removed) -// } - fn merge_marked_allocation( actual_allocation: HashMap>, desired_allocation: HashMap>, diff --git a/sartd/src/proto/src/sart.v1.rs b/sartd/src/proto/src/sart.v1.rs index aec10d7..c219187 100644 --- a/sartd/src/proto/src/sart.v1.rs +++ b/sartd/src/proto/src/sart.v1.rs @@ -1,4 +1,3 @@ -// This file is @generated by prost-build. #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct HealthRequest {}