Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update global feed cache #112

Merged
merged 4 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 28 additions & 22 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,33 +39,39 @@ flowchart TD
end
```

### Monitoring
### Video Processing Pipeline (NSFW detection)

```mermaid
flowchart TD
IndividualCansiter1[(Individual User <br>Canister 1)]
IndividualCansiter2[(Individual User <br>Canister 2)]
UserIndex[(User Index)]
PlatformOrchestrator[Platform <br>Orchestrator]
OffChainAgent[OffChain Agent]
Prom[Prometheus]
Grafana[Grafana]

OffChainAgent --[1.1]--> PlatformOrchestrator
OffChainAgent --[1.2]--> UserIndex

Prom -- 1(http_sd_config <br> periodically fetch canisters list) --> OffChainAgent
Prom -- 2 (/metrics) --> IndividualCansiter1
Prom -- 2 (/metrics) --> IndividualCansiter2

subgraph OnChain
PlatformOrchestrator
UserIndex
IndividualCansiter1
IndividualCansiter2
Frontend[Frontend SSR]
CFStream[Cloudflare<br> Stream]
GCSVideos[GCS Videos bucket]
GCSFrames[GCS Frames bucket]
NSFWServer[NSFW Server]
BQEmbedding[BQ Embedding table]
BQNSFW[BQ NSFW table]
Upstash[Upstash]

Frontend --[1]--> CFStream
Frontend --[2.1]--> OffChainAgent
OffChainAgent --[2.x.1]--> Upstash
Upstash --[2.x.2]--> OffChainAgent
OffChainAgent --[2.2 (from Q1)]--> GCSVideos
OffChainAgent --[2.3 (from Q2)]--> GCSFrames
OffChainAgent --[2.4 (from Q3)]--> NSFWServer
BQEmbedding --[3.1]--> GCSVideos
NSFWServer --[2.4.1]--> GCSFrames
OffChainAgent --[2.5]--> BQNSFW

subgraph GCS
GCSVideos
GCSFrames
end

subgraph GoogleCloud[DigitalOcean]
Prom --> Grafana
subgraph BigQuery
BQEmbedding
BQNSFW
end

```
2 changes: 1 addition & 1 deletion contracts
67 changes: 67 additions & 0 deletions src/canister/mlfeed_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,15 @@ use std::sync::Arc;
use crate::{
app_state::AppState,
canister::mlfeed_cache::off_chain::{Empty, UpdateMlFeedCacheRequest},
consts::CLOUDFLARE_ML_FEED_CACHE_WORKER_URL,
AppError,
};
use axum::extract::State;
use candid::Principal;
use google_cloud_bigquery::http::{job::query::QueryRequest, tabledata::list::Value};
use http::StatusCode;
use off_chain::{off_chain_canister_server::OffChainCanister, MlFeedCacheItem};
use serde::{Deserialize, Serialize};
use yral_canisters_client::individual_user_template::Result25;

pub mod off_chain {
Expand Down Expand Up @@ -67,3 +73,64 @@ impl From<MlFeedCacheItem> for yral_canisters_client::individual_user_template::
}
}
}

#[derive(Serialize, Deserialize, Debug)]
pub struct CustomMlFeedCacheItem {
post_id: u64,
canister_id: String,
video_id: String,
creator_principal_id: String,
}

#[cfg(not(feature = "local-bin"))]
pub async fn update_ml_feed_cache(State(state): State<Arc<AppState>>) -> Result<(), AppError> {
let bigquery_client = state.bigquery_client.clone();
let request = QueryRequest {
query: "SELECT uri, (SELECT value FROM UNNEST(metadata) WHERE name = 'timestamp') AS timestamp, (SELECT value FROM UNNEST(metadata) WHERE name = 'canister_id') AS canister_id, (SELECT value FROM UNNEST(metadata) WHERE name = 'post_id') AS post_id, is_nsfw FROM `hot-or-not-feed-intelligence.yral_ds.video_embeddings` WHERE is_nsfw = false GROUP BY 1, 2, 3, 4, 5 ORDER BY timestamp DESC LIMIT 50".to_string(),
..Default::default()
};

let rs = bigquery_client
.job()
.query("hot-or-not-feed-intelligence", &request)
.await?;

let mut offchain_items = Vec::new();
for row in rs.rows.unwrap_or_default() {
let mut canister_id_val = "".to_string();
if let Value::String(canister_id) = &row.f[2].v {
canister_id_val = canister_id.clone();
}

let mut post_id_val = "".to_string();
if let Value::String(post_id) = &row.f[3].v {
post_id_val = post_id.clone();
}

offchain_items.push(CustomMlFeedCacheItem {
post_id: post_id_val.parse().unwrap(),
canister_id: canister_id_val,
video_id: "".to_string(),
creator_principal_id: "".to_string(),
});
}

let cf_worker_url = CLOUDFLARE_ML_FEED_CACHE_WORKER_URL;

// call POST /feed-cache/<CANISTER_ID>
let url = format!("{}/feed-cache/{}", cf_worker_url, "global-feed");
let client = reqwest::Client::new();
let response = client.post(url).json(&offchain_items).send().await;

match response {
Ok(_) => (),
Err(e) => println!("Failed to get update_ml_feed_cache response: {}", e),
}

Ok(())
}

#[cfg(feature = "local-bin")]
pub async fn update_ml_feed_cache(State(state): State<Arc<AppState>>) -> Result<(), AppError> {
Ok(())
}
3 changes: 3 additions & 0 deletions src/consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,6 @@ pub static OFF_CHAIN_AGENT_URL: Lazy<Url> =
Lazy::new(|| Url::parse("https://icp-off-chain-agent.fly.dev/").unwrap());

pub const NSFW_SERVER_URL: &str = "https://prod-yral-nsfw-classification.fly.dev:443";

pub const CLOUDFLARE_ML_FEED_CACHE_WORKER_URL: &str =
"https://yral-ml-feed-cache.go-bazzinga.workers.dev";
3 changes: 2 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use axum::http::StatusCode;
use axum::routing::post;
use axum::{routing::get, Router};
use canister::mlfeed_cache::off_chain::off_chain_canister_server::OffChainCanisterServer;
use canister::mlfeed_cache::OffChainCanisterService;
use canister::mlfeed_cache::{update_ml_feed_cache, OffChainCanisterService};
use canister::upgrade_user_token_sns_canister::{
upgrade_user_token_sns_canister_for_entire_network, upgrade_user_token_sns_canister_handler,
};
Expand Down Expand Up @@ -83,6 +83,7 @@ async fn main() -> Result<()> {
get(canister::snapshot::get_snapshot_canister),
)
.route("/extract-frames", post(extract_frames_and_upload))
.route("/update-global-ml-feed-cache", get(update_ml_feed_cache))
.nest("/qstash", qstash_routes)
.with_state(shared_state.clone());

Expand Down
Loading