Skip to content

Commit

Permalink
squash
Browse files Browse the repository at this point in the history
  • Loading branch information
just-in-chang committed Oct 3, 2024
1 parent 6938caa commit 44bfd22
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 202 deletions.
195 changes: 1 addition & 194 deletions ecosystem/nft-metadata-crawler/src/asset_uploader/mod.rs
Original file line number Diff line number Diff line change
@@ -1,197 +1,4 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::{
asset_uploader::config::AssetUploaderConfig,
config::Server,
models::parsed_asset_uris::ParsedAssetUris,
utils::{
constants::{MAX_ASSET_UPLOAD_RETRY_SECONDS, MAX_RETRY_TIME_SECONDS},
database::upsert_uris,
},
};
use anyhow::Context;
use axum::{routing::post, Extension, Json, Router};
use backoff::{future::retry, ExponentialBackoff};
use diesel::{
r2d2::{ConnectionManager, Pool},
PgConnection,
};
use futures::{future::try_join_all, FutureExt};
use reqwest::{multipart::Form, Client};
use serde::{Deserialize, Serialize};
use std::{sync::Arc, time::Duration};
use tracing::{info, warn};
use url::Url;

pub mod config;

#[derive(Clone)]
pub struct AssetUploaderContext {
pub asset_uploader_config: Arc<AssetUploaderConfig>,
pub pool: Pool<ConnectionManager<PgConnection>>,
}

// Structs below are for accessing relevant data in a typed way for Cloudflare API calls
#[derive(Debug, Deserialize)]
struct CloudflareImageUploadResponseResult {
id: String,
}

#[derive(Debug, Deserialize)]
struct CloudflareImageUploadResponse {
result: CloudflareImageUploadResponseResult,
}

#[derive(Debug, Deserialize)]
struct AssetUploaderRequest {
urls: Vec<Url>,
}

#[derive(Debug, Serialize)]
struct AssetUploaderResponse {
successes: Vec<String>,
failures: Vec<String>,
}

impl AssetUploaderContext {
pub fn new(
asset_uploader_config: AssetUploaderConfig,
pool: Pool<ConnectionManager<PgConnection>>,
) -> Self {
Self {
asset_uploader_config: Arc::new(asset_uploader_config),
pool,
}
}

/// Uploads an asset to Cloudflare and returns the CDN URL used to access it
/// The CDN URL uses the default variant specified in the config
async fn upload_asset(&self, url: Url) -> anyhow::Result<String> {
let hashed_url = sha256::digest(url.to_string());
let op = || {
async {
let client = Client::builder()
.timeout(Duration::from_secs(MAX_ASSET_UPLOAD_RETRY_SECONDS))
.build()
.context("Failed to build reqwest client")?;

let form = Form::new()
.text("id", hashed_url.clone()) // Replace with actual metadata
.text("url", url.to_string());

info!(
asset_uri = ?url,
"[Asset Uploader] Uploading asset to Cloudflare"
);

let res = client
.post(format!(
"https://api.cloudflare.com/client/v4/accounts/{}/images/v1",
self.asset_uploader_config.cloudflare_account_id
))
.header(
"Authorization",
format!("Bearer {}", self.asset_uploader_config.cloudflare_auth_key),
)
.multipart(form)
.send()
.await
.context("Failed to send request to Cloudflare API")?;

let res_text = res.text().await.context("Failed to get response text")?;
info!(response = ?res_text, "[Asset Uploader] Received response from Cloudflare");

let res = serde_json::from_str::<CloudflareImageUploadResponse>(&res_text)
.context("Failed to parse response to CloudflareImageUploadResponse")?;

Ok(format!(
"{}/{}/{}/{}",
self.asset_uploader_config.cloudflare_image_delivery_prefix,
self.asset_uploader_config.cloudflare_account_hash,
res.result.id,
self.asset_uploader_config.cloudflare_default_variant,
))
}
.boxed()
};

let backoff = ExponentialBackoff {
max_elapsed_time: Some(Duration::from_secs(MAX_RETRY_TIME_SECONDS)),
..Default::default()
};

match retry(backoff, op).await {
Ok(result) => Ok(result),
Err(e) => Err(e),
}
}

async fn handle_urls(
Extension(context): Extension<Arc<Self>>,
Json(urls): Json<AssetUploaderRequest>,
) -> Json<AssetUploaderResponse> {
// Spawn a task for each URL
let mut tasks = Vec::with_capacity(urls.urls.len());
let self_clone = context.clone();
for url in urls.urls.clone() {
let self_clone = self_clone.clone();
tasks.push(tokio::spawn(async move {
match self_clone.upload_asset(url.clone()).await {
Ok(cdn_url) => {
info!(
asset_uri = ?url,
cdn_uri = cdn_url,
"[Asset Uploader] Writing to Postgres"
);
let mut model = ParsedAssetUris::new(url.as_ref());
model.set_cdn_image_uri(Some(cdn_url.clone()));

let mut conn = self_clone.pool.get().context("Failed to get connection")?;
upsert_uris(&mut conn, &model, -1).context("Failed to upsert URIs")?;

Ok(cdn_url)
},
Err(e) => {
warn!(error = ?e, asset_uri = ?url, "[Asset Uploader] Failed to upload asset");
Err(e)
},
}
}));
}

// Wait for all tasks to finish
match try_join_all(tasks).await {
Ok(uris) => {
let mut successes = Vec::new();
let mut failures = Vec::new();

for (i, uri) in uris.iter().enumerate() {
match uri {
Ok(uri) => successes.push(uri.clone()),
Err(_) => failures.push(urls.urls[i].to_string()),
}
}

info!(successes = ?successes, failures = ?failures, "[Asset Uploader] Uploaded assets");
Json(AssetUploaderResponse {
successes,
failures,
})
},
Err(_) => Json(AssetUploaderResponse {
successes: vec![],
failures: urls.urls.iter().map(|url| url.to_string()).collect(),
}),
}
}
}

impl Server for AssetUploaderContext {
fn build_router(&self) -> Router {
let self_arc = Arc::new(self.clone());
Router::new()
.route("/", post(Self::handle_urls))
.layer(Extension(self_arc.clone()))
}
}
pub mod worker;
14 changes: 14 additions & 0 deletions ecosystem/nft-metadata-crawler/src/asset_uploader/worker/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use serde::{Deserialize, Serialize};

/// Required account data and auth keys for Cloudflare
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct AssetUploaderWorkerConfig {
/// Cloudflare API key
pub cloudflare_auth_key: String,
/// Cloudflare Account ID provided at the images home page used to authenticate requests
pub cloudflare_account_id: String,
}
128 changes: 128 additions & 0 deletions ecosystem/nft-metadata-crawler/src/asset_uploader/worker/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::{
asset_uploader::worker::config::AssetUploaderWorkerConfig, config::Server,
utils::constants::MAX_ASSET_UPLOAD_RETRY_SECONDS,
};
use anyhow::Context;
use axum::{
body::Body, http::StatusCode, response::IntoResponse, routing::post, Extension, Json, Router,
};
use reqwest::{multipart::Form, Client};
use serde::Deserialize;
use std::{sync::Arc, time::Duration};
use tracing::{error, info};
use url::Url;

pub mod config;

#[derive(Clone)]
pub struct AssetUploaderWorkerContext {
asset_uploader_config: Arc<AssetUploaderWorkerConfig>,
}

#[derive(Debug, Deserialize)]
struct UploadRequest {
url: Url,
}

impl AssetUploaderWorkerContext {
pub fn new(asset_uploader_config: AssetUploaderWorkerConfig) -> Self {
Self {
asset_uploader_config: Arc::new(asset_uploader_config),
}
}

/// Uploads an asset to Cloudflare and returns the response
async fn upload_asset(&self, url: &Url) -> anyhow::Result<impl IntoResponse> {
let hashed_url = sha256::digest(url.to_string());
let client = Client::builder()
.timeout(Duration::from_secs(MAX_ASSET_UPLOAD_RETRY_SECONDS))
.build()
.context("Error building reqwest client")?;
let form = Form::new()
.text("id", hashed_url.clone()) // Replace with actual metadata
.text("url", url.to_string());

info!(
asset_uri = ?url,
"[Asset Uploader] Uploading asset to Cloudflare"
);

let res = client
.post(format!(
"https://api.cloudflare.com/client/v4/accounts/{}/images/v1",
self.asset_uploader_config.cloudflare_account_id
))
.header(
"Authorization",
format!("Bearer {}", self.asset_uploader_config.cloudflare_auth_key),
)
.multipart(form)
.send()
.await
.context("Error sending request to Cloudflare")?;

reqwest_response_to_axum_response(res).await
}

async fn handle_upload(
Extension(context): Extension<Arc<AssetUploaderWorkerContext>>,
Json(request): Json<UploadRequest>,
) -> impl IntoResponse {
match context.upload_asset(&request.url).await {
Ok(res) => {
let res = res.into_response(); // TODO: How to log response body?
info!(asset_uri = ?request.url, response = ?res, "[Asset Uploader] Asset uploaded with response");
res
},
Err(e) => {
error!(asset_uri = ?request.url, error = ?e, "[Asset Uploader] Error uploading asset");
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Error uploading asset: {}", e),
)
.into_response()
},
}
}
}

impl Server for AssetUploaderWorkerContext {
fn build_router(&self) -> Router {
let self_arc = Arc::new(self.clone());
Router::new()
.route("/", post(Self::handle_upload))
.layer(Extension(self_arc.clone()))
}
}

/// Converts a reqwest response to an axum response
/// Only copies the response status, response body, and Content-Type header
async fn reqwest_response_to_axum_response(
response: reqwest::Response,
) -> anyhow::Result<impl IntoResponse> {
let status = response.status();
let headers = response.headers().clone();

let body_bytes = response
.bytes()
.await
.context("Error reading response body")?;

let mut response = axum::http::Response::builder().status(status.as_u16());

// Set Content-Type header if present
if let Some(content_type) = headers.get(reqwest::header::CONTENT_TYPE) {
response = response.header(
axum::http::header::CONTENT_TYPE,
content_type
.to_str()
.context("Error parsing Content-Type header")?,
);
}

let body = Body::from(body_bytes);
response.body(body).context("Error building response")
}
19 changes: 11 additions & 8 deletions ecosystem/nft-metadata-crawler/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use crate::{
asset_uploader::{config::AssetUploaderConfig, AssetUploaderContext},
asset_uploader::worker::{config::AssetUploaderWorkerConfig, AssetUploaderWorkerContext},
parser::{config::ParserConfig, ParserContext},
utils::database::{establish_connection_pool, run_migrations},
};
Expand All @@ -27,7 +27,7 @@ pub trait Server: Send + Sync {
#[serde(tag = "type")]
pub enum ServerConfig {
Parser(ParserConfig),
AssetUploader(AssetUploaderConfig),
AssetUploaderWorker(AssetUploaderWorkerConfig),
}

/// Structs to hold config from YAML
Expand All @@ -43,7 +43,7 @@ pub struct NFTMetadataCrawlerConfig {
#[enum_dispatch(Server)]
pub enum ServerContext {
Parser(ParserContext),
AssetUploader(AssetUploaderContext),
AssetUploaderWorker(AssetUploaderWorkerContext),
}

impl ServerConfig {
Expand All @@ -55,9 +55,11 @@ impl ServerConfig {
ServerConfig::Parser(parser_config) => {
ServerContext::Parser(ParserContext::new(parser_config.clone(), pool).await)
},
ServerConfig::AssetUploader(asset_uploader_config) => ServerContext::AssetUploader(
AssetUploaderContext::new(asset_uploader_config.clone(), pool),
),
ServerConfig::AssetUploaderWorker(asset_uploader_config) => {
ServerContext::AssetUploaderWorker(AssetUploaderWorkerContext::new(
asset_uploader_config.clone(),
))
},
}
}
}
Expand Down Expand Up @@ -86,8 +88,9 @@ impl RunnableConfig for NFTMetadataCrawlerConfig {

fn get_server_name(&self) -> String {
match &self.server_config {
ServerConfig::Parser(_) => "parser".to_string(),
ServerConfig::AssetUploader(_) => "asset_uploader".to_string(),
ServerConfig::Parser(_) => "parser",
ServerConfig::AssetUploaderWorker(_) => "asset_uploader_worker",
}
.to_string()
}
}

0 comments on commit 44bfd22

Please sign in to comment.