Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
update serialize model
Browse files Browse the repository at this point in the history
  • Loading branch information
benjamin-747 committed Jul 10, 2024
1 parent e516e30 commit e1607ce
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 52 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ tar = { workspace = true }
flate2 = { workspace = true }
tracing-subscriber = { workspace = true }
rdkafka = { workspace = true, features = ["cmake-build"] }
bincode = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }

[workspace.dependencies]
anyhow = "1.0.81"
Expand All @@ -53,4 +53,4 @@ tar = "0.4.41"
flate2 = "1.0.30"
git2 = "0.19.0"
rdkafka = "0.36.2"
bincode = "1.3.3"
serde_json = "1.0.120"
19 changes: 9 additions & 10 deletions src/crate_to_repo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use walkdir::WalkDir;
use entity::{db_enums::RepoSyncStatus, repo_sync_status};

use crate::{
kafka::{self, RepoMessage},
kafka::{self},
util,
};

Expand Down Expand Up @@ -260,20 +260,19 @@ pub async fn convert_crate_to_repo(workspace: PathBuf) {
if push_res.status.success() {
record.status = Set(RepoSyncStatus::Succeed);
record.err_message = Set(None);
let kafka_payload: RepoMessage = record.clone().into();
kafka::producer::send_message(
producer,
&env::var("KAFKA_TOPIC").unwrap(),
bincode::serialize(&kafka_payload).unwrap(),
)
.await;
} else {
record.status = Set(RepoSyncStatus::Failed);
record.err_message = Set(Some(String::from_utf8_lossy(&push_res.stderr).to_string()));
}
record.updated_at = Set(chrono::Utc::now().naive_utc());
record.save(conn).await.unwrap();

let res = record.save(conn).await.unwrap();
let kafka_payload: repo_sync_status::Model = res.try_into().unwrap();
kafka::producer::send_message(
producer,
&env::var("KAFKA_TOPIC").unwrap(),
serde_json::to_string(&kafka_payload).unwrap(),
)
.await;
println!("Push res: {}", String::from_utf8_lossy(&push_res.stdout));
println!("Push err: {}", String::from_utf8_lossy(&push_res.stderr));
}
Expand Down
18 changes: 9 additions & 9 deletions src/handle_repo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::process::exit;
use std::process::Command;

use entity::db_enums::RepoSyncStatus;
use entity::repo_sync_status;
use regex::Regex;
use sea_orm::ActiveModelTrait;
use sea_orm::Set;
Expand All @@ -12,7 +13,6 @@ use url::Url;
use walkdir::WalkDir;

use crate::kafka;
use crate::kafka::RepoMessage;
use crate::util;

pub async fn add_and_push_to_remote(workspace: PathBuf) {
Expand Down Expand Up @@ -85,21 +85,21 @@ pub async fn add_and_push_to_remote(workspace: PathBuf) {
if push_res.status.success() {
record.status = Set(RepoSyncStatus::Succeed);
record.err_message = Set(None);
let kafka_payload: RepoMessage = record.clone().into();
kafka::producer::send_message(
&producer,
&env::var("KAFKA_TOPIC").unwrap(),
bincode::serialize(&kafka_payload).unwrap(),
)
.await;
} else {
record.status = Set(RepoSyncStatus::Failed);
record.err_message =
Set(Some(String::from_utf8_lossy(&push_res.stderr).to_string()));
}
record.updated_at = Set(chrono::Utc::now().naive_utc());
record.save(&conn).await.unwrap();
let res = record.save(&conn).await.unwrap();

let kafka_payload: repo_sync_status::Model = res.try_into().unwrap();
kafka::producer::send_message(
&producer,
&env::var("KAFKA_TOPIC").unwrap(),
serde_json::to_string(&kafka_payload).unwrap(),
)
.await;
println!("Push res: {}", String::from_utf8_lossy(&push_res.stdout));
println!("Push err: {}", String::from_utf8_lossy(&push_res.stderr));
}
Expand Down
28 changes: 0 additions & 28 deletions src/kafka/mod.rs
Original file line number Diff line number Diff line change
@@ -1,38 +1,10 @@
use std::env;

use entity::{
db_enums::{CrateType, RepoSyncStatus},
repo_sync_status,
};
use rdkafka::producer::FutureProducer;
use serde::{Deserialize, Serialize};

pub mod producer;

pub fn get_producer() -> FutureProducer {
let brokers = env::var("KAFKA_BROKER").unwrap();
producer::new_producer(&brokers)
}

#[derive(Serialize, Deserialize)]
pub struct RepoMessage {
pub crate_name: String,
pub github_url: Option<String>,
pub mega_url: String,
pub crate_type: CrateType,
pub status: RepoSyncStatus,
pub err_message: Option<String>,
}

impl From<repo_sync_status::ActiveModel> for RepoMessage {
fn from(value: repo_sync_status::ActiveModel) -> Self {
Self {
crate_name: value.crate_name.unwrap(),
github_url: value.github_url.unwrap(),
mega_url: value.mega_url.unwrap(),
crate_type: value.crate_type.unwrap(),
status: value.status.unwrap(),
err_message: value.err_message.unwrap(),
}
}
}
3 changes: 2 additions & 1 deletion src/kafka/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ pub fn new_producer(brokers: &str) -> FutureProducer {
.expect("Producer creation error")
}

pub async fn send_message(producer: &FutureProducer, topic_name: &str, kafka_payload: Vec<u8>) {
pub async fn send_message(producer: &FutureProducer, topic_name: &str, kafka_payload: String) {
let producer = producer.clone();
let topic_name = topic_name.to_owned();
let kafka_payload = kafka_payload.to_owned();
tokio::spawn(async move {
let produce_future = producer.send(
FutureRecord::to(&topic_name)
Expand Down
1 change: 0 additions & 1 deletion storage/entity/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,3 @@ path = "src/lib.rs"
serde = { workspace = true, features = ["derive"] }
chrono = { workspace = true }
sea-orm = { workspace = true, features = ["sqlx-postgres"] }
bincode = { workspace = true }
3 changes: 2 additions & 1 deletion storage/entity/src/repo_sync_status.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.15

use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};

use crate::db_enums::{CrateType, RepoSyncStatus};

#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "repo_sync_status")]
pub struct Model {
#[sea_orm(primary_key)]
Expand Down

0 comments on commit e1607ce

Please sign in to comment.