Skip to content

Commit

Permalink
refactor: rewrite events publishing
Browse files Browse the repository at this point in the history
  • Loading branch information
leroyguillaume committed Aug 7, 2024
1 parent 1b47b4d commit 1827458
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 135 deletions.
97 changes: 72 additions & 25 deletions src/kube/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use k8s_openapi::api::{
};
use kube::{
api::{DeleteParams, ListParams, Patch, PatchParams},
runtime::events::{EventType, Recorder, Reporter},
runtime::events::{Event, EventType, Recorder, Reporter},
Api, Client, Resource,
};
use regex::Regex;
Expand All @@ -22,7 +22,7 @@ use crate::{
};

use super::{
AppFilter, DomainUsage, KubeClient, KubeEvent, KubeEventKind, KubeEventPublisher, Result,
AppEvent, AppFilter, DomainUsage, InvitationEvent, KubeClient, KubeEventPublisher, Result,
ServicePod, ServicePodStatus, LABEL_APP, LABEL_SERVICE,
};

Expand Down Expand Up @@ -328,34 +328,34 @@ impl DefaultKubeEventPublisher {
}
}

async fn publish(event: KubeEvent, recorder: Recorder) {
if let Err(err) = recorder.publish(event.into()).await {
async fn publish(event: Event, recorder: Recorder) {
if let Err(err) = recorder.publish(event).await {
warn!("failed to publish event: {err}");
}
}
}

impl KubeEventPublisher for DefaultKubeEventPublisher {
#[instrument(skip(self, app, event))]
async fn publish_app_event(&self, app: &App, event: KubeEvent) {
async fn publish_app_event(&self, app: &App, event: AppEvent) {
debug!("publishing app event");
let recorder = Recorder::new(
self.client.clone(),
self.reporter.clone(),
app.object_ref(&()),
);
Self::publish(event, recorder).await;
Self::publish(event.into(), recorder).await;
}

#[instrument(skip(self, invit, event))]
async fn publish_invitation_event(&self, invit: &Invitation, event: KubeEvent) {
async fn publish_invitation_event(&self, invit: &Invitation, event: InvitationEvent) {
debug!("publishing invitation event");
let recorder = Recorder::new(
self.client.clone(),
self.reporter.clone(),
invit.object_ref(&()),
);
Self::publish(event, recorder).await;
Self::publish(event.into(), recorder).await;
}
}

Expand Down Expand Up @@ -385,27 +385,74 @@ impl From<regex::Error> for super::Error {
}
}

// ::kube::runtime::events::Event

impl From<KubeEvent> for ::kube::runtime::events::Event {
fn from(event: KubeEvent) -> Self {
Self {
action: event.action.into(),
note: Some(event.note),
reason: event.reason.into(),
secondary: None,
type_: event.kind.into(),
// Event

impl From<AppEvent> for Event {
fn from(event: AppEvent) -> Self {
match event {
AppEvent::Deployed => Self {
action: "Deploying".into(),
type_: EventType::Normal,
reason: "Deployed".into(),
note: Some("App deployed successfully".into()),
secondary: None,
},
AppEvent::Deploying => Self {
action: "Deploying".into(),
type_: EventType::Normal,
reason: "Deploying".into(),
note: Some("Deployment started".into()),
secondary: None,
},
AppEvent::DeploymentFailed(err) => Self {
action: "Deploying".into(),
type_: EventType::Warning,
reason: "Failed".into(),
note: Some(format!("Deployment failed: {err}")),
secondary: None,
},
AppEvent::MonitoringFailed(err) => Self {
action: "Monitoring".into(),
type_: EventType::Warning,
reason: "Failed".into(),
note: Some(format!("Monitoring failed: {err}")),
secondary: None,
},
AppEvent::Undeploying => Self {
action: "Undeploying".into(),
type_: EventType::Normal,
reason: "Undeploying".into(),
note: Some("Undeployment started".into()),
secondary: None,
},
AppEvent::UndeploymentFailed(err) => Self {
action: "Undeploying".into(),
type_: EventType::Warning,
reason: "Failed".into(),
note: Some(format!("Undeployment failed: {err}")),
secondary: None,
},
}
}
}

// EventType

impl From<KubeEventKind> for EventType {
fn from(kind: KubeEventKind) -> Self {
match kind {
KubeEventKind::Normal => Self::Normal,
KubeEventKind::Warn => Self::Warning,
impl From<InvitationEvent> for Event {
fn from(event: InvitationEvent) -> Self {
match event {
InvitationEvent::SendingFailed(err) => Self {
action: "Sending".into(),
type_: EventType::Warning,
reason: "Failed".into(),
note: Some(format!("Sending email failed: {err}")),
secondary: None,
},
InvitationEvent::Sent => Self {
action: "Sending".into(),
type_: EventType::Normal,
reason: "Sent".into(),
note: Some("Email sent".into()),
secondary: None,
},
}
}
}
Expand Down
36 changes: 20 additions & 16 deletions src/kube/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,6 @@ pub struct DomainUsage {
pub domain: String,
}

#[derive(Clone, Debug, Eq, PartialEq)]
pub struct KubeEvent {
pub action: &'static str,
pub kind: KubeEventKind,
pub note: String,
pub reason: &'static str,
}

#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum KubeEventKind {
Normal,
Warn,
}

#[derive(Clone, Debug, Eq, PartialEq)]
pub struct ServicePod {
pub name: String,
Expand All @@ -71,6 +57,24 @@ pub enum ServicePodStatus {
Stopped,
}

// Events

#[derive(Clone, Debug, Eq, PartialEq)]
pub enum AppEvent {
Deployed,
Deploying,
DeploymentFailed(String),
MonitoringFailed(String),
Undeploying,
UndeploymentFailed(String),
}

#[derive(Clone, Debug, Eq, PartialEq)]
pub enum InvitationEvent {
SendingFailed(String),
Sent,
}

// Traits

pub trait KubeClient: Send + Sync {
Expand Down Expand Up @@ -145,11 +149,11 @@ pub trait KubeClient: Send + Sync {
}

pub trait KubeEventPublisher: Send + Sync {
fn publish_app_event(&self, app: &App, event: KubeEvent) -> impl Future<Output = ()> + Send;
fn publish_app_event(&self, app: &App, event: AppEvent) -> impl Future<Output = ()> + Send;

fn publish_invitation_event(
&self,
invit: &Invitation,
event: KubeEvent,
event: InvitationEvent,
) -> impl Future<Output = ()> + Send;
}
141 changes: 51 additions & 90 deletions src/op.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{collections::BTreeMap, sync::Arc, time::Duration};

use futures::{Future, StreamExt};
use futures::StreamExt;
use kube::{
api::ObjectMeta,
runtime::{
Expand All @@ -15,7 +15,9 @@ use tracing::{debug, error, info, info_span, warn, Instrument};
use crate::{
deploy::Deployer,
domain::{App, AppStatus, Invitation, InvitationStatus, ServiceStatus},
kube::{KubeClient, KubeEvent, KubeEventKind, KubeEventPublisher, ServicePodStatus, FINALIZER},
kube::{
AppEvent, InvitationEvent, KubeClient, KubeEventPublisher, ServicePodStatus, FINALIZER,
},
mail::MailSender,
DelayArgs, SignalListener,
};
Expand Down Expand Up @@ -168,54 +170,6 @@ fn on_error<D: Deployer, K: KubeClient, M: MailSender, P: KubeEventPublisher, R>
Action::requeue(ctx.delays.retry)
}

async fn publishing_event<
E: std::error::Error + Into<Error>,
FUT: Future<Output = std::result::Result<V, E>>,
NERR: Fn(&E) -> String,
NOK: Fn(&V) -> String,
P: Fn(KubeEvent) -> PFUT,
PFUT: Future<Output = ()>,
V,
>(
fut: FUT,
action: &'static str,
ok_reason: &'static str,
note: String,
ok_note: NOK,
err_note: NERR,
publish: P,
) -> Result<V> {
let event = KubeEvent {
action,
kind: KubeEventKind::Normal,
note,
reason: action,
};
publish(event).await;
match fut.await {
Ok(val) => {
let event = KubeEvent {
action,
kind: KubeEventKind::Normal,
note: ok_note(&val),
reason: ok_reason,
};
publish(event).await;
Ok(val)
}
Err(err) => {
let event = KubeEvent {
action,
kind: KubeEventKind::Warn,
note: err_note(&err),
reason: "Failed",
};
publish(event).await;
Err(err.into())
}
}
}

async fn reconcile_app<D: Deployer, K: KubeClient, M: MailSender, P: KubeEventPublisher>(
app: Arc<App>,
ctx: Arc<OpContext<D, K, M, P>>,
Expand All @@ -228,16 +182,12 @@ async fn reconcile_app<D: Deployer, K: KubeClient, M: MailSender, P: KubeEventPu
if let Some(finalizers) = &app.metadata.finalizers {
if finalizers.iter().any(|finalizer| finalizer == FINALIZER) {
let mut app = app.as_ref().clone();
publishing_event(
ctx.deployer.undeploy(name, &app, &ctx.kube),
"Undeploying",
"Undeployed",
"Undeploying".into(),
|_| "Successfully undeployed".into(),
|err| format!("Failed to undeploy: {err}"),
|event| ctx.publisher.publish_app_event(&app, event),
)
.await?;
if let Err(err) = ctx.deployer.undeploy(name, &app, &ctx.kube).await {
ctx.publisher
.publish_app_event(&app, AppEvent::UndeploymentFailed(err.to_string()))
.await;
return Err(err.into());
}
let mut finalizers = finalizers.clone();
finalizers.retain(|finalizer| finalizer != FINALIZER);
app.metadata = ObjectMeta {
Expand All @@ -254,16 +204,25 @@ async fn reconcile_app<D: Deployer, K: KubeClient, M: MailSender, P: KubeEventPu
update_service_statuses(name, &app, &ctx.kube, &ctx.publisher).await?;
}
Some(AppStatus::WaitingForDeploy {}) | None => {
publishing_event(
ctx.deployer.deploy(name, &app, &ctx.kube),
"Deploying",
"Deployed",
"Deploying".into(),
|_| "Successfully deployed".into(),
|err| format!("Failed to deploy: {err}"),
|event| ctx.publisher.publish_app_event(&app, event),
)
.await?;
ctx.publisher
.publish_app_event(&app, AppEvent::Deploying)
.await;
match ctx.deployer.deploy(name, &app, &ctx.kube).await {
Ok(_) => {
ctx.publisher
.publish_app_event(&app, AppEvent::Deployed)
.await;
}
Err(err) => {
ctx.publisher
.publish_app_event(
&app,
AppEvent::DeploymentFailed(err.to_string()),
)
.await;
return Err(err.into());
}
}
update_service_statuses(name, &app, &ctx.kube, &ctx.publisher).await?;
}
}
Expand Down Expand Up @@ -304,16 +263,19 @@ async fn send_invitation<K: KubeClient, M: MailSender, P: KubeEventPublisher>(
kube: &K,
publisher: &P,
) -> Result {
publishing_event(
sender.send_invitation(token, invit),
"Sending",
"Sent",
format!("Sending email to {}", invit.spec.to),
|_| format!("Successfully sent to {}", invit.spec.to),
|err| format!("Failed to send mail to {}: {err}", invit.spec.to),
|event| publisher.publish_invitation_event(invit, event),
)
.await?;
match sender.send_invitation(token, invit).await {
Ok(_) => {
publisher
.publish_invitation_event(invit, InvitationEvent::Sent)
.await;
}
Err(err) => {
publisher
.publish_invitation_event(invit, InvitationEvent::SendingFailed(err.to_string()))
.await;
return Err(err.into());
}
}
let status = InvitationStatus { email_sent: true };
kube.patch_invitation_status(token, &status).await?;
Ok(())
Expand Down Expand Up @@ -347,16 +309,15 @@ async fn update_service_statuses<K: KubeClient, P: KubeEventPublisher>(
}
Ok(statuses) as Result<BTreeMap<String, ServiceStatus>>
};
let statuses = publishing_event(
monitor,
"Monitoring",
"Monitored",
"Checking probes".into(),
|_| "Probes successfully checked".into(),
|err| format!("Failed to check service probes: {err}"),
|event| publisher.publish_app_event(app, event),
)
.await?;
let statuses = match monitor.await {
Ok(statuses) => statuses,
Err(err) => {
publisher
.publish_app_event(app, AppEvent::MonitoringFailed(err.to_string()))
.await;
return Err(err);
}
};
kube.patch_app_status(name, &AppStatus::Deployed(statuses))
.await?;
Ok(())
Expand Down
Loading

0 comments on commit 1827458

Please sign in to comment.