diff --git a/Cargo.lock b/Cargo.lock index 4c01fb7b..ff2df05b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1576,6 +1576,7 @@ dependencies = [ "fred", "futures-channel", "futures-util", + "rust_decimal", "serde", "serde_json", "tokio", @@ -1938,6 +1939,7 @@ dependencies = [ "async-graphql-axum", "aws_lambda_events 0.13.0", "axum 0.7.5", + "futures-util", "http 1.0.0", "httpclient", "openssl", @@ -1948,6 +1950,7 @@ dependencies = [ "tracing", "tracing-subscriber", "types", + "wsclient", ] [[package]] @@ -2657,6 +2660,7 @@ dependencies = [ "fred", "futures", "pg", + "rust_decimal", "serde", "shutdown", "tokio", @@ -3154,20 +3158,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "postgres" -version = "0.19.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "960c214283ef8f0027974c03e9014517ced5db12f021a9abb66185a5751fab0a" -dependencies = [ - "bytes", - "fallible-iterator", - "futures-util", - "log", - "tokio", - "tokio-postgres", -] - [[package]] name = "postgres-derive" version = "0.4.5" @@ -3789,20 +3779,19 @@ dependencies = [ [[package]] name = "rust_decimal" -version = "1.34.3" +version = "1.36.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b39449a79f45e8da28c57c341891b69a183044b29518bb8f86dbac9df60bb7df" +checksum = "b082d80e3e3cc52b2ed634388d436fe1f4de6af5786cc2de9ba9737527bdf555" dependencies = [ "arrayvec", "borsh", "bytes", "num-traits", - "postgres", + "postgres-types", "rand 0.8.5", "rkyv", "serde", "serde_json", - "tokio-postgres", ] [[package]] @@ -4636,7 +4625,7 @@ dependencies = [ "futures-util", "log", "tokio", - "tungstenite", + "tungstenite 0.21.0", ] [[package]] @@ -4875,6 +4864,24 @@ dependencies = [ "utf-8", ] +[[package]] +name = "tungstenite" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18e5b8366ee7a95b16d32197d0b2604b43a0be89dc5fac9f8e96ccafbaedda8a" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http 1.0.0", + "httparse", + "log", + "rand 0.8.5", + "sha1", + "thiserror", + "utf-8", +] + [[package]] name = "typenum" version = "1.16.0" @@ -5387,6 +5394,13 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "wsclient" +version = "0.1.0" +dependencies = [ + "tungstenite 0.24.0", +] + [[package]] name = "wyz" version = "0.5.1" diff --git a/Cargo.toml b/Cargo.toml index 8de8049d..dd81d857 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ members = [ "crates/service", "crates/shutdown", "crates/types", + "crates/wsclient", "services/auto-confirm", "services/balance-by-account", "services/event", diff --git a/crates/wsclient/Cargo.toml b/crates/wsclient/Cargo.toml new file mode 100644 index 00000000..f0810640 --- /dev/null +++ b/crates/wsclient/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "wsclient" +version = "0.1.0" +edition = "2021" +rust-version.workspace = true + +[dependencies] +tungstenite = "0.24.0" diff --git a/crates/wsclient/src/lib.rs b/crates/wsclient/src/lib.rs new file mode 100644 index 00000000..da89ad5a --- /dev/null +++ b/crates/wsclient/src/lib.rs @@ -0,0 +1,128 @@ +use std::net::TcpStream; +use tungstenite::{connect, stream::MaybeTlsStream, WebSocket}; + +pub struct WsClient { + uri: WsUri, +} + +impl WsClient { + pub fn new( + base_uri: String, + measure: String, + date: String, + country: Option, + region: Option, + municipality: Option, + ) -> Self { + let uri = WsUri::new(base_uri, measure, date, country, region, municipality); + WsClient { uri } + } + + pub fn connect(&self) -> WebSocket> { + let (socket, _) = connect(self.uri.query_string()).expect("failed to connect"); + socket + } +} + +// todo: merge with Params in services/measure/src/main.rs as separate crate +struct WsUri { + base_uri: String, + measure: String, + date: String, + country: Option, + region: Option, + municipality: Option, +} + +impl WsUri { + pub fn new( + base_uri: String, + measure: String, + date: String, + country: Option, + region: Option, + municipality: Option, + ) -> Self { + WsUri { + base_uri, + measure, + date, + country, + region, + municipality, + } + } + + pub fn query_string(&self) -> String { + let mut uri = format!("{}?", self.base_uri); + uri.push_str(&format!("measure={}", self.measure)); + uri.push_str(&format!("&date={}", self.date)); + if let Some(country) = &self.country { + uri.push_str(&format!("&country={}", country)); + } + if let Some(region) = &self.region { + uri.push_str(&format!("®ion={}", region)); + } + if let Some(municipality) = &self.municipality { + uri.push_str(&format!("&municipality={}", municipality)); + } + uri.replace(" ", "%20") + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_ws_uri() { + let ws_uri = WsUri::new( + "ws://localhost:10010/ws".to_string(), + "gdp".to_string(), + "2024-08-21".to_string(), + Some("United States of America".to_string()), + Some("California".to_string()), + Some("Sacramento".to_string()), + ); + + assert_eq!(ws_uri.measure, "gdp"); + assert_eq!(ws_uri.date, "2024-08-21"); + assert_eq!(ws_uri.country, Some("United States of America".to_string())); + assert_eq!(ws_uri.region, Some("California".to_string())); + assert_eq!(ws_uri.municipality, Some("Sacramento".to_string())); + } + + #[test] + fn test_ws_uri_full_query_string() { + let ws_uri = WsUri::new( + "ws://localhost:10010/ws".to_string(), + "gdp".to_string(), + "2024-08-21".to_string(), + Some("United States of America".to_string()), + Some("California".to_string()), + Some("Sacramento".to_string()), + ); + + assert_eq!( + ws_uri.query_string(), + "ws://localhost:10010/ws?measure=gdp&date=2024-08-21&country=United%20States%20of%20America®ion=California&municipality=Sacramento" + ); + } + + #[test] + fn test_ws_uri_cal_query_string() { + let ws_uri = WsUri::new( + "ws://localhost:10010/ws".to_string(), + "gdp".to_string(), + "2024-08-21".to_string(), + Some("United States of America".to_string()), + Some("California".to_string()), + None, + ); + + assert_eq!( + ws_uri.query_string(), + "ws://localhost:10010/ws?measure=gdp&date=2024-08-21&country=United%20States%20of%20America®ion=California" + ); + } +} diff --git a/project.yaml b/project.yaml index ae960c28..b05066b9 100644 --- a/project.yaml +++ b/project.yaml @@ -75,6 +75,12 @@ crates: type: lib env_var: null params: [] + wsclient: + runtime: rust1.x + min_code_cov: 0 + type: lib + env_var: null + params: [] docker: env_var: set: @@ -355,6 +361,8 @@ services: - BALANCE_BY_ACCOUNT_URL - READINESS_CHECK_PATH - GRAPHQL_PORT + - MEASURE_URL + - MEASURE_RESOURCE request-create: runtime: rust1.x min_code_cov: null @@ -613,6 +621,12 @@ services: MEASURE_PORT: ssm: null default: 10010 + MEASURE_URL: + ssm: null + default: null # script sets as $LOCAL_ADDRESS:$PORT + MEASURE_RESOURCE: + ssm: null + default: ws get: - PGHOST - PGPORT diff --git a/scripts/create-env-file.sh b/scripts/create-env-file.sh index 1cdc1256..319b4f40 100644 --- a/scripts/create-env-file.sh +++ b/scripts/create-env-file.sh @@ -63,7 +63,14 @@ source ./scripts/set-uri-vars.sh function set_default_values() { for s in "${SECRETS[@]}"; do - if [[ "$s" == *'_URL' ]]; then + # todo: hardcode protocol prefixes in apps. this ws:// exception for MEASURE_URL is a hack + if [[ "$s" == 'MEASURE_URL' ]]; then + SVC_NAME=$(printf '%s' "$s" | sed 's/_URL//') + PORT_ENV_VAR="$SVC_NAME"_PORT + PORT_VAL=$(yq "... | select(has(\"$PORT_ENV_VAR\")).$PORT_ENV_VAR.default" $PROJECT_CONF) + echo "$s=ws://$LOCAL_ADDRESS:$PORT_VAL" >> $ENV_FILE + continue + elif [[ "$s" == *'_URL' ]]; then SVC_NAME=$(printf '%s' "$s" | sed 's/_URL//') PORT_ENV_VAR="$SVC_NAME"_PORT PORT_VAL=$(yq "... | select(has(\"$PORT_ENV_VAR\")).$PORT_ENV_VAR.default" $PROJECT_CONF) diff --git a/scripts/insert-transactions.sh b/scripts/insert-transactions.sh index 34d7e6f7..37126d51 100644 --- a/scripts/insert-transactions.sh +++ b/scripts/insert-transactions.sh @@ -2,14 +2,15 @@ set -e -if [[ "$#" -ne 0 ]] && [[ "$#" -ne 1 ]]; then - echo "use: bash scripts/insert-transactions.sh --continue" +if [[ "$#" -gt 2 ]]; then + echo "use: bash scripts/insert-transactions.sh --continue --mix" exit 1 fi while [[ "$#" -gt 0 ]]; do case $1 in --continue) CONT=1 ;; + --mix) MIX=1 ;; *) echo "unknown parameter passed: $1" exit 1 @@ -48,42 +49,70 @@ echo "*** finished migrations" echo "" -function transact() { - local transaction="$1" - # create requests for each in tests/testdata/requests.json - TRANSACTION_ID=$(curl -s -H 'Content-Type: application/json' -d "$transaction" $REQUEST_CREATE_URL | yq '.transaction.id') - - # approve every other request to create mix of requests and transactions - if [[ $(("$TRANSACTION_ID" % 2)) -eq 0 ]]; then - - # get debitor approver from user (NOT rule) added transaction item - DEBITOR_APPROVER=$(echo -n "$transaction" | yq -I0 '[.transaction.transaction_items[] | select(.rule_instance_id == null)][0] | .debitor') - - # mock lambda input event - APPROVAL="{\"auth_account\":\""$DEBITOR_APPROVER"\",\"id\":\""$TRANSACTION_ID"\",\"account_name\":\""$DEBITOR_APPROVER"\",\"account_role\":\"debitor\"}" +function request() { + local request="$1" + # create transaction request + TRANSACTION_ID=$(curl -s -H 'Content-Type: application/json' -d "$request" $REQUEST_CREATE_URL | yq '.transaction.id') + # get debitor approver from user (NOT rule) added transaction item + DEBITOR_APPROVER=$(echo -n "$request" | yq -I0 '[.transaction.transaction_items[] | select(.rule_instance_id == null)][0] | .debitor') +} - # approve transaction request - curl -s -H 'Content-Type: application/json' -d "$APPROVAL" $REQUEST_APPROVE_URL >/dev/null - fi +function approve() { + # mock lambda input event + APPROVAL="{\"auth_account\":\""$DEBITOR_APPROVER"\",\"id\":\""$TRANSACTION_ID"\",\"account_name\":\""$DEBITOR_APPROVER"\",\"account_role\":\"debitor\"}" + # approve transaction request + curl -s -H 'Content-Type: application/json' -d "$APPROVAL" $REQUEST_APPROVE_URL >/dev/null + unset TRANSACTION_ID + unset DEBITOR_APPROVER } -# continue inserting random transactions from testdata file indefinitely +# insert random transactions indefinitely if [[ -n "$CONT" ]]; then - echo "*** adding a random mix of requests and transactions from $TEST_DATA_FILE indefinitely" + if [[ -n "$MIX" ]]; then + echo "*** adding a random mix of requests and transactions from $TEST_DATA_FILE indefinitely" + else + echo "*** adding transactions from $TEST_DATA_FILE indefinitely" + fi + # increase account balances to avoid insufficient balance error psql -U $PGUSER -d $PGDATABASE -h $PGHOST -c "UPDATE account_balance SET current_balance = 999999000 WHERE true;" >/dev/null + while true; do RANDOM_INDEX=$((RANDOM % TEST_DATA_FILE_LENGTH)) transaction=$(yq -I0 -o=json ".[$RANDOM_INDEX]" $TEST_DATA_FILE) - transact "$transaction" + request "$transaction" + # skip approving odd transactions when mixing + if [[ -n "$MIX" ]] && [[ $(("$TRANSACTION_ID" % 2)) -ne 0 ]]; then + continue + else + approve + fi + # sleep for 100ms to avoid burdening cpu, e.g. 1000/4.5 minutes vs 1000/2.5 minutes without sleep + sleep 0.1 done -else - echo "*** adding a mix of requests and transactions from $TEST_DATA_FILE" +else # insert all transactions from testdata file + if [[ -n "$MIX" ]]; then + echo "*** adding a mix of requests and transactions from $TEST_DATA_FILE" + else + echo "*** adding transactions from $TEST_DATA_FILE" + fi + # insert all transactions from testdata file yq -I0 -o=json '.[]' $TEST_DATA_FILE | while IFS='\n' read transaction; do - transact "$transaction" - printf '%s' '.' + request "$transaction" + # skip approving odd transactions when mixing + if [[ -n "$MIX" ]] && [[ $(("$TRANSACTION_ID" % 2)) -ne 0 ]]; then + continue + else + approve + fi done + echo "" - echo "*** $(yq -o=json 'length' $TEST_DATA_FILE) requests and transactions (mixed) added from $TEST_DATA_FILE" + + if [[ -n "$MIX" ]]; then + echo "*** $TEST_DATA_FILE_LENGTH requests and transactions (mixed) added from $TEST_DATA_FILE" + else + echo "*** $TEST_DATA_FILE_LENGTH transactions added from $TEST_DATA_FILE" + fi fi diff --git a/services/event/Cargo.toml b/services/event/Cargo.toml index fd9ab080..eed1d6b9 100644 --- a/services/event/Cargo.toml +++ b/services/event/Cargo.toml @@ -12,3 +12,4 @@ futures-util = { version = "0.3.30", features = ["sink"] } serde = { version = "1.0.152", features = ["derive"] } serde_json = "1.0.93" fred = { version = "9.0.3", features = ["i-scripts"] } +rust_decimal = "1.36.0" diff --git a/services/event/README.md b/services/event/README.md index ce05949a..4ca8cda1 100644 --- a/services/event/README.md +++ b/services/event/README.md @@ -5,4 +5,4 @@ ### event -receives events such as `gdp` from postgres and increments keys in redis for the measure service \ No newline at end of file +listens to and receives events such as `gdp` from postgres and increments keys in redis for the measure service \ No newline at end of file diff --git a/services/event/src/events.rs b/services/event/src/events.rs index 810c2597..43e89a31 100644 --- a/services/event/src/events.rs +++ b/services/event/src/events.rs @@ -1,5 +1,7 @@ use crate::RedisClient; use fred::prelude::LuaInterface; +use rust_decimal::prelude::*; +use rust_decimal::Decimal; use std::collections::HashMap; // create events for measure api @@ -40,7 +42,7 @@ const GDP_TTL: i64 = 60 * 60 * 24 * 3; // secs * mins * hours * days = 3 pub async fn redis_incrby_gdp<'a>(client: &RedisClient, gdp_map: Gdp<'a>) { for (key, value) in gdp_map.0.iter() { let k = key.to_string(); - let v = value.parse::().unwrap(); + let v = trim_string_decimal(&value.parse::().unwrap()); let _: () = client .eval( INCRBY_GDP, @@ -52,6 +54,16 @@ pub async fn redis_incrby_gdp<'a>(client: &RedisClient, gdp_map: Gdp<'a>) { } } +fn trim_string_decimal(s: &str) -> String { + if s.parse::().is_ok() { + let d = Decimal::from_str(s).unwrap(); + // set precision to 3 decimal places + d.round_dp(3).to_string() + } else { + s.to_string() + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/services/event/src/main.rs b/services/event/src/main.rs index f352f6cd..f70b7945 100644 --- a/services/event/src/main.rs +++ b/services/event/src/main.rs @@ -47,57 +47,65 @@ struct Event { #[tokio::main] async fn main() { let pg_uri = pg_conn_uri(); - - let (client, mut connection) = tokio_postgres::connect(pg_uri.as_str(), NoTls) - .await - .unwrap(); - - let (tx, mut rx) = mpsc::unbounded(); - - let stream = - stream::poll_fn(move |cx| connection.poll_message(cx)).map_err(|e| panic!("{}", e)); - - let connection = stream.forward(tx).map(|r| r.unwrap()); - - tokio::spawn(connection); - - client.batch_execute("LISTEN event;").await.unwrap(); // add more listeners in multiline string - let redis_uri = redis_conn_uri(); let redis_config = RedisConfig::from_url(&redis_uri).unwrap(); let redis_client = Builder::from_config(redis_config).build().unwrap(); redis_client.init().await.unwrap(); loop { - // block until message received - let message = match rx.next().await { - Some(message) => message, - None => continue, + let (client, mut connection) = match tokio_postgres::connect(pg_uri.as_str(), NoTls).await { + Ok(conn) => conn, + Err(_e) => { + // println!("failed to connect to postgres: {}", e); + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + continue; + } }; - match message { - AsyncMessage::Notification(n) => { - // search for json_build_object in schema - // for matching postgres Event structure - match serde_json::from_str::(n.payload()) { - Ok(event) => { - match event.name.as_str() { - "gdp" => { - // parse event value as Gdp map - let gdp_map = events::Gdp::new(event.value.as_str()); - events::redis_incrby_gdp(&redis_client, gdp_map).await; - } - _ => { - println!("unknown event: {}", event.name); - } + let (tx, mut rx) = mpsc::unbounded(); + + let stream = + stream::poll_fn(move |cx| connection.poll_message(cx)).map_err(|e| panic!("{}", e)); + let connection = stream.forward(tx).map(|r| r.unwrap()); + let handler = tokio::spawn(connection); + + if let Err(e) = client.batch_execute("LISTEN event;").await { + println!("failed to execute LISTEN command: {}", e); + handler.abort(); + continue; + } + + loop { + let message = match rx.next().await { + Some(message) => message, + None => { + // println!("connection terminated. attempting to reconnect..."); + handler.abort(); + break; + } + }; + + match message { + AsyncMessage::Notification(n) => match serde_json::from_str::(n.payload()) { + Ok(event) => match event.name.as_str() { + "gdp" => { + let gdp_map = events::Gdp::new(event.value.as_str()); + events::redis_incrby_gdp(&redis_client, gdp_map).await; } - } + _ => { + println!("unknown event: {}", event.name); + } + }, Err(e) => { - println!("{}", e); + println!("failed to parse event: {}", e); } + }, + _ => { + println!("unhandled message: {:?}", message); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + continue; } } - _ => continue, // todo: handle error - }; + } } } diff --git a/services/graphql/Cargo.toml b/services/graphql/Cargo.toml index b93e6951..48908a86 100644 --- a/services/graphql/Cargo.toml +++ b/services/graphql/Cargo.toml @@ -23,6 +23,8 @@ aws_lambda_events = { version = "0.13.0", default-features = false, features = [ tower-http = { version = "0.5.1", features = ["cors"] } http = "1.0.0" shutdown = { path = "../../crates/shutdown" } +futures-util = "0.3.30" +wsclient = { path = "../../crates/wsclient" } [target.x86_64-unknown-linux-musl.dependencies] # https://github.com/cross-rs/cross/wiki/Recipes#vendored diff --git a/services/graphql/README.md b/services/graphql/README.md index 8e60f390..c79cabcd 100644 --- a/services/graphql/README.md +++ b/services/graphql/README.md @@ -7,6 +7,24 @@ structures and routes requests to systemaccounting services +\* postman does not support exporting subscription configuration from its graphql feature. until requests are ported to an alternative api testing client with an export feature, subscription testing is availble on http://localhost:10000/ + +subscription query: +```gql +subscription QueryGdp($date: String!, $country: String, $region: String, $municipality: String) { + queryGdp(date: $date, country: $country, region: $region, municipality: $municipality) +} +``` +query variables: +```json +{ + "date": "2024-08-28", + "country": "United States of America", + "region": "California", + "municipality": "Sacramento" +} +``` + #### build & deploy FAST * `make deploy ENV=dev` to build and deploy lambda diff --git a/services/graphql/src/main.rs b/services/graphql/src/main.rs index 4d87bbd7..6bab31e7 100644 --- a/services/graphql/src/main.rs +++ b/services/graphql/src/main.rs @@ -4,25 +4,27 @@ use ::types::{ transaction::Transaction, transaction_item::{TransactionItem, TransactionItems}, }; +use ::wsclient::WsClient; use async_graphql::*; -use async_graphql_axum::{GraphQLRequest, GraphQLResponse}; +use async_graphql_axum::{GraphQL, GraphQLSubscription}; use aws_lambda_events::event::apigw::ApiGatewayV2httpRequestContext; use axum::{ - extract::State, http::{HeaderMap, StatusCode}, response::{self, IntoResponse}, - routing::{get, post}, + routing::get, Router, }; +use futures_util::{stream, stream::Stream}; use httpclient::HttpClient as Client; use serde_json::json; use shutdown::shutdown_signal; -use std::{env, net::ToSocketAddrs, result::Result}; +use std::{env, net::ToSocketAddrs, result::Result, task::Poll}; use tokio::net::TcpListener; use tower_http::cors::CorsLayer; const READINESS_CHECK_PATH: &str = "READINESS_CHECK_PATH"; const GRAPHQL_RESOURCE: &str = "query"; +const SUBSCRIPTION_RESOURCE: &str = "ws"; struct Query; @@ -180,6 +182,35 @@ impl Mutation { } } +struct Subscription; + +#[Subscription] +impl Subscription { + async fn query_gdp( + &self, + _ctx: &Context<'_>, + #[graphql(name = "date")] date: String, + #[graphql(name = "country")] country: Option, + #[graphql(name = "region")] region: Option, + #[graphql(name = "municipality")] municipality: Option, + ) -> impl Stream { + let base_uri = env::var("MEASURE_URL").unwrap(); + let resource = env::var("MEASURE_RESOURCE").unwrap(); + let uri = format!("{}/{}", base_uri, resource); + let ws_client = WsClient::new(uri, "gdp".to_string(), date, country, region, municipality); + let mut socket = ws_client.connect(); + + // send socket messages to stream + stream::poll_fn(move |_cx| match socket.read() { + Ok(msg) => { + let gdp = msg.into_text().unwrap().parse::().unwrap(); + Poll::Ready(Some(gdp)) + } + Err(_e) => Poll::Ready(None), + }) + } +} + fn account_auth(account_name: String, auth_account: String) -> String { json!({ "account_name": account_name, @@ -230,20 +261,11 @@ async fn graphiql() -> impl IntoResponse { response::Html( http::GraphiQLSource::build() .endpoint(format!("/{}", GRAPHQL_RESOURCE).as_str()) + .subscription_endpoint(format!("/{}", SUBSCRIPTION_RESOURCE).as_str()) .finish(), ) } -async fn graphql_handler( - State(schema): State>, - headers: HeaderMap, - req: GraphQLRequest, -) -> GraphQLResponse { - let mut req = req.into_inner(); - req = req.data(headers.clone()); - schema.execute(req).await.into() -} - #[tokio::main] async fn main() { tracing_subscriber::fmt().with_ansi(false).init(); @@ -251,20 +273,23 @@ async fn main() { let readiness_check_path = env::var(READINESS_CHECK_PATH) .unwrap_or_else(|_| panic!("{READINESS_CHECK_PATH} variable assignment")); - let schema = Schema::build(Query, Mutation, EmptySubscription).finish(); + let schema = Schema::build(Query, Mutation, Subscription).finish(); let app = Router::new() .route("/", get(graphiql)) - .route( + .route_service( format!("/{}", GRAPHQL_RESOURCE).as_str(), - post(graphql_handler), + GraphQL::new(schema.clone()), ) .route( readiness_check_path.as_str(), // absolute path so format not used get(|| async { StatusCode::OK }), ) - .layer(CorsLayer::permissive()) - .with_state(schema); + .route_service( + format!("/{}", SUBSCRIPTION_RESOURCE).as_str(), + GraphQLSubscription::new(schema), + ) + .layer(CorsLayer::permissive()); let hostname_or_ip = env::var("HOSTNAME_OR_IP").unwrap_or("0.0.0.0".to_string()); diff --git a/services/measure/Cargo.toml b/services/measure/Cargo.toml index 093fd96b..0b8ed497 100644 --- a/services/measure/Cargo.toml +++ b/services/measure/Cargo.toml @@ -12,5 +12,6 @@ tokio = { version = "1.35.1", features = ["macros", "rt-multi-thread"] } shutdown = { path = "../../crates/shutdown" } futures = "0.3.30" serde = { version = "1.0.208", features = ["serde_derive"] } -fred = {version = "9.1.1", default-features = false, features = ["i-pubsub"] } -pg = { path = "../../crates/pg" } \ No newline at end of file +fred = { version = "9.1.1", default-features = false, features = ["i-pubsub"] } +pg = { path = "../../crates/pg" } +rust_decimal = "1.34.3" diff --git a/services/measure/README.md b/services/measure/README.md new file mode 100644 index 00000000..6f66d2b9 --- /dev/null +++ b/services/measure/README.md @@ -0,0 +1,8 @@ +

+ systemaccounting +

+ + +### measure + +subscribes to redis channels and sends events to websockets consumed by graphql \ No newline at end of file diff --git a/services/measure/src/main.rs b/services/measure/src/main.rs index 3155223a..9abd2fce 100644 --- a/services/measure/src/main.rs +++ b/services/measure/src/main.rs @@ -13,6 +13,8 @@ use axum::{ use fred::prelude::*; use futures::{sink::SinkExt, stream::StreamExt}; use pg::postgres::{ConnectionPool, DatabaseConnection, DB}; +use rust_decimal::prelude::*; +use rust_decimal::Decimal; use serde::Deserialize; use shutdown::shutdown_signal; use std::{env, net::SocketAddr}; @@ -37,18 +39,25 @@ fn redis_conn_uri() -> String { struct Params { measure: String, date: String, - country: String, - region: String, - // sub_region: String, - municipality: String, + country: Option, + region: Option, + // sub_region: Option, + municipality: Option, } impl Params { fn redis_gdp_key(&self) -> String { - format!( - "{}:{}:{}:{}:{}", // 2024-08-20:gdp:usa:cal:sac - self.date, self.measure, self.country, self.region, self.municipality - ) + let mut key = format!("{}:{}", self.date, self.measure,); + if self.country.is_some() { + key.push_str(&format!(":{}", self.country.as_ref().unwrap())); + } + if self.region.is_some() { + key.push_str(&format!(":{}", self.region.as_ref().unwrap())); + } + if self.municipality.is_some() { + key.push_str(&format!(":{}", self.municipality.as_ref().unwrap())); + } + key } } @@ -115,21 +124,25 @@ async fn handle_socket(socket: WebSocket, _who: SocketAddr, pool: ConnectionPool async fn proxy_redis_subscription(redis_client: RedisClient, socket: WebSocket) { let (mut ws_tx, mut ws_rx) = socket.split(); let mut redis_stream = redis_client.message_rx(); - while let Ok(message) = redis_stream.recv().await { - let message = message.value.as_string().unwrap(); - ws_tx.send(Message::Text(message)).await.unwrap(); + loop { tokio::select! { ws_msg = ws_rx.next() => { if let Some(Ok(Message::Close(_))) = ws_msg { break; } + if ws_msg.is_none() { + break; + } }, redis_msg = redis_stream.recv() => { match redis_msg { Ok(message) => { let message = message.value.as_string().unwrap(); - ws_tx.send(Message::Text(message)).await.unwrap(); + let item = Message::Text(trim_string_decimal(message.as_str())); + if ws_tx.send(item).await.is_err() { + break; + } } Err(e) => { tracing::error!("error receiving message: {}", e); @@ -139,19 +152,34 @@ async fn proxy_redis_subscription(redis_client: RedisClient, socket: WebSocket) } } } + + if let Err(e) = ws_tx.close().await { + tracing::error!("error closing websocket: {}", e); + } } async fn redis_names(pg_conn: &DatabaseConnection, ws_params: Params) -> Params { - let country = query_key(pg_conn, ws_params.country).await; - let region = query_key(pg_conn, ws_params.region).await; - let municipality = query_key(pg_conn, ws_params.municipality).await; - Params { + let mut keys: Params = Params { measure: ws_params.measure, date: ws_params.date, - country, - region, - municipality, + country: None, + region: None, + municipality: None, + }; + if ws_params.country.is_some() { + keys.country = Some(query_key(pg_conn, ws_params.country.unwrap()).await); + } else { + return keys; + } + if ws_params.region.is_some() { + keys.region = Some(query_key(pg_conn, ws_params.region.unwrap()).await); + } else { + return keys; + } + if ws_params.municipality.is_some() { + keys.municipality = Some(query_key(pg_conn, ws_params.municipality.unwrap()).await); } + keys } async fn query_key(pg_conn: &DatabaseConnection, place: String) -> String { @@ -163,6 +191,16 @@ async fn query_key(pg_conn: &DatabaseConnection, place: String) -> String { .get(0) } +fn trim_string_decimal(s: &str) -> String { + if s.parse::().is_ok() { + let d = Decimal::from_str(s).unwrap(); + // set precision to 3 decimal places + d.round_dp(3).to_string() + } else { + s.to_string() + } +} + #[cfg(test)] mod tests { use super::*; @@ -172,9 +210,9 @@ mod tests { let params = Params { measure: "gdp".to_string(), date: "2024-08-20".to_string(), - country: "usa".to_string(), - region: "cal".to_string(), - municipality: "sac".to_string(), + country: Some("usa".to_string()), + region: Some("cal".to_string()), + municipality: Some("sac".to_string()), }; assert_eq!( params.redis_gdp_key(), diff --git a/services/request-create/src/main.rs b/services/request-create/src/main.rs index 06eefea1..77aa1767 100644 --- a/services/request-create/src/main.rs +++ b/services/request-create/src/main.rs @@ -88,7 +88,7 @@ async fn test_values(req: IntraTransaction) -> Result { - println!("client request equal to rule response") + // println!("client request equal to rule response") } Err(e) => { return Err(Box::new(e));