Skip to content

Commit

Permalink
Some cleanup. Also (parseablehq#8)
Browse files Browse the repository at this point in the history
- Use MinIO play instance, so no need to run local MinIO for testing.
- Add post_event handler
- Rename config packages
- Organize functions little better
  • Loading branch information
dev870 authored Oct 30, 2021
1 parent d8dcac0 commit 0564b4a
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 60 deletions.
2 changes: 1 addition & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ edition = "2018"
actix-web = "3"
arrow = "4.4.0"
lazy_static = "1.4.0"
config = "0.11.0"
config-rs = { version = "0.11.0", package = "config" }
toml = "0.5.8"
serde_derive = "^1.0.8"
serde = "^1.0.8"
Expand Down
6 changes: 6 additions & 0 deletions server/Config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[s3]
aws_access_key_id="Q3AM3UQ867SPQQA43P2F"
aws_secret_key="zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG"
aws_default_region="us-east-1"
aws_endpoint_url="https://play.minio.io/"
aws_bucket_name="67111b0f870e443ca59200b51221243b"
6 changes: 0 additions & 6 deletions server/Configs.toml

This file was deleted.

5 changes: 2 additions & 3 deletions server/src/configs.rs → server/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use std::env;
use config::{ConfigError, Config, File};
use config_rs::{ConfigError, Config, File};
use aws_sdk_s3::{Client,Endpoint};
use aws_sdk_s3::Config as s3_config;
use http::{Uri};


#[derive(Debug, Deserialize)]
pub struct ConfigToml {
s3: S3,
Expand All @@ -22,7 +21,7 @@ struct S3 {
impl ConfigToml {
fn new() -> Result<Self, ConfigError> {
let mut s = Config::default();
s.merge(File::with_name("Configs"))?;
s.merge(File::with_name("Config"))?;
s.try_into()
}

Expand Down
81 changes: 48 additions & 33 deletions server/src/handler.rs
Original file line number Diff line number Diff line change
@@ -1,42 +1,57 @@

use std::env;
use aws_sdk_s3::{Error};
use actix_web::{HttpRequest, HttpResponse};
use crate::config;

#[tokio::main]
pub async fn create_stream(s3_client: Option<aws_sdk_s3::Client>,stream_name: String) -> Result<(), Error> {
match s3_client {
Some(client) => {
let resp = client
.put_object()
.bucket(env::var("AWS_BUCKET_NAME").unwrap().to_string())
.key(format!("{}{}", stream_name, "/.schema"))
.send()
.await?;
println!("Upload success. Version: {:?}", resp.version_id);
Ok(())
},
_ => {
Ok(())
},
pub async fn put_stream(req: HttpRequest) -> HttpResponse {
let stream_name: String = req.match_info().get("stream").unwrap().parse().unwrap();
let s3_client = config::ConfigToml::s3client();
match stream_exists(&s3_client, &stream_name) {
Ok(_) => HttpResponse::Ok().body(format!("Stream {} already exists, please create a Stream with unique name", stream_name)),
Err(_) => {
match create_stream(&s3_client, &stream_name) {
Ok(_) => HttpResponse::Ok().body(format!("Created Stream {}", stream_name)),
Err(_) => HttpResponse::Ok().body(format!("Failed to create Stream {}", stream_name))
}
}
}
}

/*
#[tokio::main]
pub async fn bucket_exists(s3_client: Option<aws_sdk_s3::Client>,bucket_name: String, stream_name: String) -> bool {
match s3_client {
Some(client) => {
let resp = client
.get_object()
.bucket(bucket_name)
.key(format!("{}{}", stream_name, "/.schema"))
.send();
println!("Bucket {:?} Exists", bucket_name);
return true;
},
None => {
return false;
},
pub async fn post_event(req: HttpRequest) -> HttpResponse {
let stream_name: String = req.match_info().get("stream").unwrap().parse().unwrap();
let s3_client = config::ConfigToml::s3client();
match stream_exists(&s3_client, &stream_name) {
Ok(_) => HttpResponse::Ok().body(format!("Uploading event to Stream {} ", stream_name)),
Err(_) => HttpResponse::Ok().body(format!("Stream {} doesn't exist", stream_name))
}
// TODO
// 1. Check if this is the first event in the stream
// a. If yes, create a schema and upload the schema file to <bucket>/<stream_prefix>/.schema.
// b. If no, validate if the schema of new event matches existing schema. Fail with invalid schema, if no match.
// 2. Add the event to existing Arrow RecordBatch.
// 3. Check if event count threshold is reached, convert record batch to parquet and push to S3.
// 4. Init new RecordBatch if previos record batch was pushed to S3.
}

#[tokio::main]
pub async fn create_stream(s3_client: &aws_sdk_s3::Client, stream_name: &String) -> Result<(), Error> {
let _resp = s3_client
.put_object()
.bucket(env::var("AWS_BUCKET_NAME").unwrap().to_string())
.key(format!("{}{}", stream_name, "/.schema"))
.send()
.await?;
Ok(())
}

#[tokio::main]
pub async fn stream_exists(s3_client: &aws_sdk_s3::Client, stream_name: &String) -> Result<(), Error> {
let _resp = s3_client
.get_object()
.bucket(env::var("AWS_BUCKET_NAME").unwrap().to_string())
.key(format!("{}{}", stream_name, "/.schema"))
.send()
.await?;
Ok(())
}
*/
22 changes: 5 additions & 17 deletions server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,17 @@
use actix_web::{web, App, HttpRequest, HttpResponse, HttpServer};
use actix_web::{web, App, HttpServer};
#[macro_use]
extern crate serde_derive;

mod configs;
mod handler;

extern crate config;

async fn put_stream(req: HttpRequest) -> HttpResponse {
let stream_name: String = req.match_info().get("stream").unwrap().parse().unwrap();
let stream_name_clone = stream_name.clone();
let s3_client = configs::ConfigToml::s3client();
match handler::create_stream(Some(s3_client), stream_name) {
Ok(_) => HttpResponse::Ok().body(format!("Created Stream {}", stream_name_clone)),
Err(_) => {
HttpResponse::Ok().body(format!("Failed to create Stream"))
}
}
}
mod handler;
mod config;

#[actix_web::main]
async fn main() -> std::io::Result<()> {
HttpServer::new(|| {
App::new()
.route("/{stream}", web::put().to(put_stream))
.route("/{stream}", web::post().to(handler::post_event))
.route("/{stream}", web::put().to(handler::put_stream))
})
.bind("127.0.0.1:8080")?
.run()
Expand Down

0 comments on commit 0564b4a

Please sign in to comment.