Skip to content

refactor: move kinesis to src/ #1269

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 0 additions & 85 deletions src/handlers/http/kinesis.rs

This file was deleted.

1 change: 0 additions & 1 deletion src/handlers/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ pub mod cluster;
pub mod correlation;
pub mod health_check;
pub mod ingest;
mod kinesis;
pub mod llm;
pub mod logstream;
pub mod middleware;
Expand Down
10 changes: 3 additions & 7 deletions src/handlers/http/modal/utils/ingest_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,8 @@ use crate::{
format::{json, EventFormat, LogSource},
FORMAT_KEY, SOURCE_IP_KEY, USER_AGENT_KEY,
},
handlers::{
http::{
ingest::PostError,
kinesis::{flatten_kinesis_logs, Message},
},
LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY,
},
handlers::{http::ingest::PostError, LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY},
kinesis::{flatten_kinesis_logs, Message},
otel::{logs::flatten_otel_logs, metrics::flatten_otel_metrics, traces::flatten_otel_traces},
parseable::PARSEABLE,
storage::StreamType,
Expand Down Expand Up @@ -143,6 +138,7 @@ async fn push_logs(
)?
.process()?;
}

Ok(())
}

Expand Down
197 changes: 197 additions & 0 deletions src/kinesis.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/*
* Parseable Server (C) 2022 - 2024 Parseable, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

use base64::{engine::general_purpose::STANDARD, Engine as _};
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
use std::str;

#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct Message {
records: Vec<Data>,
request_id: String,
timestamp: u64,
}
#[derive(Serialize, Deserialize, Debug)]
struct Data {
data: String,
}

// Flatten Kinesis logs is used to flatten the Kinesis logs into a queryable JSON format.
// Kinesis logs are in the format
// {
// "requestId": "9b848d8a-2d89-474b-b073-04b8e5232210",
// "timestamp": 1705026780451,
// "records": [
// {
// "data": "eyJDSEFOR0UiOi0wLjQ1LCJQUklDRSI6NS4zNiwiVElDS0VSX1NZTUJPTCI6IkRFRyIsIlNFQ1RPUiI6IkVORVJHWSJ9"
// }
// ]
// }
// The data field is base64 encoded JSON (there can be multiple data fields), and there is a requestId and timestamp field.
// Kinesis logs are flattened to the following format:
// {
// "CHANGE": 3.16,
// "PRICE": 73.76,
// "SECTOR": "RETAIL",
// "TICKER_SYMBOL": "WMT",
// "p_metadata": "",
// "p_tags": "",
// "p_timestamp": "2024-01-11T09:08:34.290",
// "requestId": "b858288a-f5d8-4181-a746-3f3dd716be8a",
// "timestamp": "1704964113659"
// }
pub fn flatten_kinesis_logs(message: Message) -> Vec<Value> {
let mut vec_kinesis_json = Vec::new();

for record in message.records.iter() {
let bytes = STANDARD.decode(record.data.clone()).unwrap();
let json_string: String = String::from_utf8(bytes).unwrap();
let json: serde_json::Value = serde_json::from_str(&json_string).unwrap();
let mut kinesis_json: Map<String, Value> = match serde_json::from_value(json) {
Ok(value) => value,
Err(error) => panic!("Failed to deserialize JSON: {}", error),
};

kinesis_json.insert(
"requestId".to_owned(),
Value::String(message.request_id.clone()),
);
kinesis_json.insert(
"timestamp".to_owned(),
Value::String(message.timestamp.to_string()),
);

vec_kinesis_json.push(Value::Object(kinesis_json));
}

vec_kinesis_json
}
Comment on lines +60 to +85
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve error handling to avoid potential panics

The function uses multiple unwrap() calls which will cause the program to panic if errors occur. Consider using proper error handling with Result types instead.

-pub fn flatten_kinesis_logs(message: Message) -> Vec<Value> {
+pub fn flatten_kinesis_logs(message: Message) -> Result<Vec<Value>, Box<dyn std::error::Error>> {
     let mut vec_kinesis_json = Vec::new();
 
     for record in message.records.iter() {
-        let bytes = STANDARD.decode(record.data.clone()).unwrap();
-        let json_string: String = String::from_utf8(bytes).unwrap();
-        let json: serde_json::Value = serde_json::from_str(&json_string).unwrap();
-        let mut kinesis_json: Map<String, Value> = match serde_json::from_value(json) {
-            Ok(value) => value,
-            Err(error) => panic!("Failed to deserialize JSON: {}", error),
-        };
+        let bytes = STANDARD.decode(&record.data)?;
+        let json_string = String::from_utf8(bytes)?;
+        let json: serde_json::Value = serde_json::from_str(&json_string)?;
+        let mut kinesis_json: Map<String, Value> = serde_json::from_value(json)?;
 
         kinesis_json.insert(
             "requestId".to_owned(),
-            Value::String(message.request_id.clone()),
+            Value::String(message.request_id.to_owned()),
         );
         kinesis_json.insert(
             "timestamp".to_owned(),
             Value::String(message.timestamp.to_string()),
         );
 
         vec_kinesis_json.push(Value::Object(kinesis_json));
     }
 
-    vec_kinesis_json
+    Ok(vec_kinesis_json)
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub fn flatten_kinesis_logs(message: Message) -> Vec<Value> {
let mut vec_kinesis_json = Vec::new();
for record in message.records.iter() {
let bytes = STANDARD.decode(record.data.clone()).unwrap();
let json_string: String = String::from_utf8(bytes).unwrap();
let json: serde_json::Value = serde_json::from_str(&json_string).unwrap();
let mut kinesis_json: Map<String, Value> = match serde_json::from_value(json) {
Ok(value) => value,
Err(error) => panic!("Failed to deserialize JSON: {}", error),
};
kinesis_json.insert(
"requestId".to_owned(),
Value::String(message.request_id.clone()),
);
kinesis_json.insert(
"timestamp".to_owned(),
Value::String(message.timestamp.to_string()),
);
vec_kinesis_json.push(Value::Object(kinesis_json));
}
vec_kinesis_json
}
pub fn flatten_kinesis_logs(message: Message) -> Result<Vec<Value>, Box<dyn std::error::Error>> {
let mut vec_kinesis_json = Vec::new();
for record in message.records.iter() {
let bytes = STANDARD.decode(&record.data)?;
let json_string = String::from_utf8(bytes)?;
let json: serde_json::Value = serde_json::from_str(&json_string)?;
let mut kinesis_json: Map<String, Value> = serde_json::from_value(json)?;
kinesis_json.insert(
"requestId".to_owned(),
Value::String(message.request_id.to_owned()),
);
kinesis_json.insert(
"timestamp".to_owned(),
Value::String(message.timestamp.to_string()),
);
vec_kinesis_json.push(Value::Object(kinesis_json));
}
Ok(vec_kinesis_json)
}


#[cfg(test)]
mod tests {
use serde_json::{json, Value};

use super::{flatten_kinesis_logs, Message};

#[test]
fn flatten_kinesis_logs_decodes_base64_data() {
let message: Message = serde_json::from_value(json!( {
"requestId": "9b848d8a-2d89-474b-b073-04b8e5232210".to_string(),
"timestamp": 1705026780451_i64,
"records": [
{
"data": "eyJDSEFOR0UiOi0wLjQ1LCJQUklDRSI6NS4zNiwiVElDS0VSX1NZTUJPTCI6IkRFRyIsIlNFQ1RPUiI6IkVORVJHWSJ9".to_string(),
},
{
"data": "eyJDSEFOR0UiOjMuMTYsIlBSSUNFIjo3My43NiwiVElDS0VSX1NZTUJPTCI6IldNVCIsIlNFQ1RPUiI6IlJFVEFJTCJ9".to_string(),
},
],
})).unwrap();

let result = flatten_kinesis_logs(message);
assert_eq!(result.len(), 2);

let Value::Object(map) = &result[0] else {
panic!("Expected first result to be a JSON object");
};
assert_eq!(map.get("CHANGE").unwrap().as_f64().unwrap(), -0.45);
assert_eq!(map.get("PRICE").unwrap().as_f64().unwrap(), 5.36);
assert_eq!(map.get("TICKER_SYMBOL").unwrap().as_str().unwrap(), "DEG");
assert_eq!(map.get("SECTOR").unwrap().as_str().unwrap(), "ENERGY");
assert_eq!(
map.get("requestId").unwrap().as_str().unwrap(),
"9b848d8a-2d89-474b-b073-04b8e5232210"
);
assert_eq!(
map.get("timestamp").unwrap().as_str().unwrap(),
"1705026780451"
);

let Value::Object(map) = &result[1] else {
panic!("Expected second result to be a JSON object");
};
assert_eq!(map.get("CHANGE").unwrap().as_f64().unwrap(), 3.16);
assert_eq!(map.get("PRICE").unwrap().as_f64().unwrap(), 73.76);
assert_eq!(map.get("TICKER_SYMBOL").unwrap(), "WMT");
assert_eq!(map.get("SECTOR").unwrap(), "RETAIL");
assert_eq!(
map.get("requestId").unwrap().as_str().unwrap(),
"9b848d8a-2d89-474b-b073-04b8e5232210"
);
assert_eq!(
map.get("timestamp").unwrap().as_str().unwrap(),
"1705026780451"
);
}

#[test]
fn flatten_kinesis_logs_adds_request_id_and_timestamp() {
let message: Message = serde_json::from_value(json!( {
"requestId": "9b848d8a-2d89-474b-b073-04b8e5232210".to_string(),
"timestamp": 1705026780451_i64,
"records": [
{
"data": "eyJDSEFOR0UiOi0wLjQ1LCJQUklDRSI6NS4zNiwiVElDS0VSX1NZTUJPTCI6IkRFRyIsIlNFQ1RPUiI6IkVORVJHWSJ9".to_string(),
},
{
"data": "eyJDSEFOR0UiOjMuMTYsIlBSSUNFIjo3My43NiwiVElDS0VSX1NZTUJPTCI6IldNVCIsIlNFQ1RPUiI6IlJFVEFJTCJ9".to_string(),
},
],
})).unwrap();

let result = flatten_kinesis_logs(message);
assert_eq!(result.len(), 2);

let event = result[0].as_object().unwrap();
assert_eq!(
event.get("requestId").unwrap().as_str().unwrap(),
"9b848d8a-2d89-474b-b073-04b8e5232210"
);
assert_eq!(
event.get("timestamp").unwrap().as_str().unwrap(),
"1705026780451"
);

let event = result[1].as_object().unwrap();
assert_eq!(
event.get("requestId").unwrap().as_str().unwrap(),
"9b848d8a-2d89-474b-b073-04b8e5232210"
);
assert_eq!(
event.get("timestamp").unwrap().as_str().unwrap(),
"1705026780451"
);
}

#[test]
#[should_panic(expected = "InvalidByte(7, 95)")]
fn malformed_json_after_base64_decoding() {
let message: Message = serde_json::from_value(json!({
"requestId": "9b848d8a-2d89-474b-b073-04b8e5232210".to_string(),
"timestamp": 1705026780451_i64,
"records": [ {
"data": "invalid_base64_data".to_string(),
}],
}))
.unwrap();

flatten_kinesis_logs(message);
}
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub mod enterprise;
pub mod event;
pub mod handlers;
pub mod hottier;
mod kinesis;
mod livetail;
mod metadata;
pub mod metrics;
Expand Down
Loading