From 19d4409c82118c397c34c6cc5ac74c10514a18f9 Mon Sep 17 00:00:00 2001 From: henry0715-dev Date: Wed, 11 Sep 2024 14:07:05 +0900 Subject: [PATCH] Change `OpLogFilter::agent_id` to be optional Close #724 The RocksDB keys in the `oplog` are currently in the form agent_id-0-timestamp. This change modifies the keys to the form timestamp-sequence. --- CHANGELOG.md | 19 ++ Cargo.lock | 2 +- Cargo.toml | 2 +- src/graphql.rs | 160 +++++++++++++++ .../client/schema/op_log_raw_events.graphql | 2 + src/graphql/client/schema/schema.graphql | 5 +- src/graphql/export/tests.rs | 26 ++- src/graphql/log.rs | 57 ++++-- src/graphql/log/tests.rs | 186 +++++++++++++++--- src/ingest.rs | 55 ++++-- src/ingest/generation.rs | 42 ++++ src/ingest/implement.rs | 7 + src/ingest/tests.rs | 2 +- src/peer.rs | 4 +- src/publish.rs | 2 +- src/publish/tests.rs | 2 +- src/storage.rs | 70 +++++++ src/storage/migration.rs | 143 +++++++++++++- src/storage/migration/migration_structures.rs | 23 ++- 19 files changed, 720 insertions(+), 89 deletions(-) create mode 100644 src/ingest/generation.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index f784dc72..75148c36 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,9 +7,28 @@ Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +### Added + +- Added `sensor` field to `OpLog`. + - Added the `load_connection_by_prefix_timestamp_key` function and + `TimestampKeyExtractor` trait to enable querying of keys prefixed with + `timestamp`. + - The `opLogRawEvents` GraphQL API no longer requires `agentId` and now + accepts it as an optional parameter. Additionally, the API response now + includes logs from all agents displayed in chronological order, rather than + being limited to the logs of a single agent. + ### Changed - Rename the `csvFormattedRawEvents` GraphQL API to `tsvFormattedRawEvents`. +- Update the compatibility version of the quic communication modules. + - Changed `PEER_VERSION_REQ` to ">=0.24.0-alpha.1,<0.25.0". + - Changed `INGEST_VERSION_REQ` to ">=0.24.0-alpha.1,<0.25.0". + - Changed `PUBLISH_VERSION_REQ` to ">=0.24.0-alpha.1,<0.25.0". +- Modify the code related to migration. + - Changed `COMPATIBLE_VERSION_REQ` to ">=0.24.0-alpha.1,<0.25.0 + - Added migration function in `migrate_0_23_0_to_0_24_0_op_log`. This function + performs a migration to change the `key`, `value` of `Oplog`. ## [0.23.0] - 2024-11-21 diff --git a/Cargo.lock b/Cargo.lock index 58df88d7..f3ae222a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1024,7 +1024,7 @@ dependencies = [ [[package]] name = "giganto" -version = "0.23.0" +version = "0.24.0-alpha.1" dependencies = [ "anyhow", "async-graphql", diff --git a/Cargo.toml b/Cargo.toml index e81e3cd7..07a15c0c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "giganto" -version = "0.23.0" +version = "0.24.0-alpha.1" edition = "2021" [lib] diff --git a/src/graphql.rs b/src/graphql.rs index 1bbdc29d..ae14e191 100644 --- a/src/graphql.rs +++ b/src/graphql.rs @@ -362,6 +362,125 @@ where Ok((records, has_previous, has_next)) } +#[allow(clippy::too_many_lines)] +fn get_connection_by_prefix_timestamp_key( + store: &RawEventStore<'_, T>, + filter: &(impl RawEventFilter + TimestampKeyExtractor), + after: Option, + before: Option, + first: Option, + last: Option, +) -> Result> +where + T: DeserializeOwned + EventFilter, +{ + let (records, has_previous, has_next) = if let Some(before) = before { + if after.is_some() { + return Err("cannot use both `after` and `before`".into()); + } + if first.is_some() { + return Err("'before' and 'first' cannot be specified simultaneously".into()); + } + + let last = last.unwrap_or(MAXIMUM_PAGE_SIZE).min(MAXIMUM_PAGE_SIZE); + let cursor = base64_engine.decode(before)?; + + // generate storage search key + let key_builder = StorageKey::timestamp_builder(); + let from_key = key_builder + .clone() + .upper_open_bound_start_key(filter.get_range_start_key().1) + .build(); + let to_key = key_builder + .lower_closed_bound_start_key(filter.get_range_start_key().0) + .build(); + + if cursor.cmp(&from_key.key()) == std::cmp::Ordering::Greater { + return Err("invalid cursor".into()); + } + let mut iter = store + .boundary_iter(&cursor, &to_key.key(), Direction::Reverse) + .peekable(); + if let Some(Ok((key, _))) = iter.peek() { + if key.as_ref() == cursor { + iter.next(); + } + } + let (mut records, has_previous) = collect_records(iter, last, filter); + records.reverse(); + (records, has_previous, false) + } else if let Some(after) = after { + if before.is_some() { + return Err("cannot use both `after` and `before`".into()); + } + if last.is_some() { + return Err("'after' and 'last' cannot be specified simultaneously".into()); + } + let first = first.unwrap_or(MAXIMUM_PAGE_SIZE).min(MAXIMUM_PAGE_SIZE); + let cursor = base64_engine.decode(after)?; + + // generate storage search key + let key_builder = StorageKey::timestamp_builder(); + let from_key = key_builder + .clone() + .lower_closed_bound_start_key(filter.get_range_start_key().0) + .build(); + let to_key = key_builder + .upper_open_bound_start_key(filter.get_range_start_key().1) + .build(); + + if cursor.cmp(&from_key.key()) == std::cmp::Ordering::Less { + return Err("invalid cursor".into()); + } + let mut iter = store + .boundary_iter(&cursor, &to_key.key(), Direction::Forward) + .peekable(); + if let Some(Ok((key, _))) = iter.peek() { + if key.as_ref() == cursor { + iter.next(); + } + } + let (records, has_next) = collect_records(iter, first, filter); + (records, false, has_next) + } else if let Some(last) = last { + if first.is_some() { + return Err("first and last cannot be used together".into()); + } + let last = last.min(MAXIMUM_PAGE_SIZE); + + // generate storage search key + let key_builder = StorageKey::timestamp_builder(); + let from_key = key_builder + .clone() + .upper_closed_bound_start_key(filter.get_range_start_key().1) + .build(); + let to_key = key_builder + .lower_closed_bound_start_key(filter.get_range_start_key().0) + .build(); + + let iter = store.boundary_iter(&from_key.key(), &to_key.key(), Direction::Reverse); + let (mut records, has_previous) = collect_records(iter, last, filter); + records.reverse(); + (records, has_previous, false) + } else { + let first = first.unwrap_or(MAXIMUM_PAGE_SIZE).min(MAXIMUM_PAGE_SIZE); + // generate storage search key + let key_builder = StorageKey::timestamp_builder(); + let from_key = key_builder + .clone() + .lower_closed_bound_start_key(filter.get_range_start_key().0) + .build(); + let to_key = key_builder + .upper_open_bound_start_key(filter.get_range_start_key().1) + .build(); + + let iter = store.boundary_iter(&from_key.key(), &to_key.key(), Direction::Forward); + let (records, has_next) = collect_records(iter, first, filter); + (records, false, has_next) + }; + Ok((records, has_previous, has_next)) +} + fn load_connection( store: &RawEventStore<'_, T>, filter: &(impl RawEventFilter + KeyExtractor), @@ -377,6 +496,37 @@ where let (records, has_previous, has_next) = get_connection(store, filter, after, before, first, last)?; + create_connection(records, has_previous, has_next) +} + +fn load_connection_by_prefix_timestamp_key( + store: &RawEventStore<'_, T>, + filter: &(impl RawEventFilter + TimestampKeyExtractor), + after: Option, + before: Option, + first: Option, + last: Option, +) -> Result> +where + N: FromKeyValue + OutputType, + T: DeserializeOwned + EventFilter, +{ + let (records, has_previous, has_next) = + get_connection_by_prefix_timestamp_key(store, filter, after, before, first, last)?; + + create_connection(records, has_previous, has_next) +} + +#[allow(clippy::unnecessary_wraps)] +fn create_connection( + records: Vec<(Box<[u8]>, T)>, + has_previous: bool, + has_next: bool, +) -> Result> +where + N: FromKeyValue + OutputType, + T: DeserializeOwned, +{ let mut connection: Connection = Connection::new(has_previous, has_next); connection.edges = records .into_iter() @@ -437,6 +587,14 @@ where (records, has_more) } +pub fn get_timestamp_from_key_prefix(key: &[u8]) -> Result, anyhow::Error> { + if key.len() > TIMESTAMP_SIZE { + let timestamp = i64::from_be_bytes(key[0..TIMESTAMP_SIZE].try_into()?); + return Ok(Utc.timestamp_nanos(timestamp)); + } + Err(anyhow!("invalid database key length")) +} + pub fn get_timestamp_from_key(key: &[u8]) -> Result, anyhow::Error> { if key.len() > TIMESTAMP_SIZE { let nanos = i64::from_be_bytes(key[(key.len() - TIMESTAMP_SIZE)..].try_into()?); @@ -1578,6 +1736,8 @@ macro_rules! impl_from_giganto_search_filter_for_graphql_client { } pub(crate) use impl_from_giganto_search_filter_for_graphql_client; +use crate::storage::TimestampKeyExtractor; + #[cfg(test)] mod tests { use std::collections::{HashMap, HashSet}; diff --git a/src/graphql/client/schema/op_log_raw_events.graphql b/src/graphql/client/schema/op_log_raw_events.graphql index d40daf2e..add2d516 100644 --- a/src/graphql/client/schema/op_log_raw_events.graphql +++ b/src/graphql/client/schema/op_log_raw_events.graphql @@ -12,6 +12,8 @@ query OpLogRawEvents($filter: OpLogFilter!, $after: String, $before: String, $fi timestamp level contents + agentName + sensor } } } diff --git a/src/graphql/client/schema/schema.graphql b/src/graphql/client/schema/schema.graphql index 05f7ea10..b8357067 100644 --- a/src/graphql/client/schema/schema.graphql +++ b/src/graphql/client/schema/schema.graphql @@ -980,7 +980,8 @@ type NtlmRawEventEdge { input OpLogFilter { time: TimeRange - agentId: String! + sensor: String + agentId: String logLevel: String contents: String } @@ -989,6 +990,8 @@ type OpLogRawEvent { timestamp: DateTime! level: String! contents: String! + agentName: String! + sensor: String! } type OpLogRawEventConnection { diff --git a/src/graphql/export/tests.rs b/src/graphql/export/tests.rs index fdb7a504..4517af12 100644 --- a/src/graphql/export/tests.rs +++ b/src/graphql/export/tests.rs @@ -1,5 +1,6 @@ use std::mem; use std::net::IpAddr; +use std::sync::{Arc, OnceLock}; use chrono::{Duration, Utc}; use giganto_client::ingest::{ @@ -12,6 +13,7 @@ use giganto_client::ingest::{ }; use crate::graphql::tests::TestSchema; +use crate::ingest::generation::SequenceGenerator; use crate::storage::RawEventStore; #[tokio::test] @@ -907,9 +909,10 @@ fn insert_time_series( async fn export_op_log() { let schema = TestSchema::new(); let store = schema.db.op_log_store().unwrap(); + let generator: OnceLock> = OnceLock::new(); - insert_op_log_raw_event(&store, "agent1", 1); - insert_op_log_raw_event(&store, "agent2", 1); + insert_op_log_raw_event(&store, "agent1", "src1", 1, &generator).await; + insert_op_log_raw_event(&store, "agent2", "src1", 1, &generator).await; // export csv file let query = r#" @@ -938,16 +941,23 @@ async fn export_op_log() { assert!(res.data.to_string().contains("op_log")); } -fn insert_op_log_raw_event(store: &RawEventStore, agent_name: &str, timestamp: i64) { +async fn insert_op_log_raw_event( + store: &RawEventStore<'_, OpLog>, + agent_name: &str, + sensor: &str, + timestamp: i64, + generator: &OnceLock>, +) { + let generator = generator.get_or_init(SequenceGenerator::init_generator); + let sequence_number = generator.generate_sequence_number(); + let mut key: Vec = Vec::new(); - let agent_id = format!("{agent_name}@src1"); - key.extend_from_slice(agent_id.as_bytes()); - key.push(0); key.extend_from_slice(×tamp.to_be_bytes()); + key.extend_from_slice(&sequence_number.to_be_bytes()); let op_log_body = OpLog { - sensor: "sensor".to_string(), - agent_name: agent_id.to_string(), + sensor: sensor.to_string(), + agent_name: agent_name.to_string(), log_level: OpLogLevel::Info, contents: "op_log".to_string(), }; diff --git a/src/graphql/log.rs b/src/graphql/log.rs index b87ba201..f56fcf51 100644 --- a/src/graphql/log.rs +++ b/src/graphql/log.rs @@ -33,13 +33,13 @@ use super::{ client::derives::{ log_raw_events, tsv_formatted_raw_events, LogRawEvents, TsvFormattedRawEvents, }, - events_vec_in_cluster, get_timestamp_from_key, handle_paged_events, - impl_from_giganto_time_range_struct_for_graphql_client, load_connection, - paged_events_in_cluster, Engine, FromKeyValue, + events_vec_in_cluster, get_timestamp_from_key, get_timestamp_from_key_prefix, + handle_paged_events, impl_from_giganto_time_range_struct_for_graphql_client, + load_connection_by_prefix_timestamp_key, paged_events_in_cluster, Engine, FromKeyValue, }; use crate::{ graphql::{RawEventFilter, TimeRange}, - storage::{Database, KeyExtractor, RawEventStore}, + storage::{Database, KeyExtractor, RawEventStore, TimestampKeyExtractor}, }; #[derive(Default)] @@ -99,22 +99,14 @@ impl RawEventFilter for LogFilter { #[derive(InputObject)] pub struct OpLogFilter { time: Option, - agent_id: String, + sensor: Option, + agent_id: Option, log_level: Option, contents: Option, } -impl KeyExtractor for OpLogFilter { - fn get_start_key(&self) -> &str { - &self.agent_id - } - - // oplog event don't use mid key - fn get_mid_key(&self) -> Option> { - None - } - - fn get_range_end_key(&self) -> (Option>, Option>) { +impl TimestampKeyExtractor for OpLogFilter { + fn get_range_start_key(&self) -> (Option>, Option>) { if let Some(time) = &self.time { (time.start, time.end) } else { @@ -133,8 +125,8 @@ impl RawEventFilter for OpLogFilter { log_level: Option, log_contents: Option, _text: Option, - _sensor: Option, - _agent_id: Option, + sensor: Option, + agent_id: Option, ) -> Result { if let Some(filter_level) = &self.log_level { let log_level = if let Some(log_level) = log_level { @@ -156,6 +148,26 @@ impl RawEventFilter for OpLogFilter { return Ok(false); } } + if let Some(filter_agent_id) = &self.agent_id { + let is_agent_id_mismatch = if let Some(agent_id) = agent_id { + !agent_id.contains(filter_agent_id) + } else { + false + }; + if is_agent_id_mismatch { + return Ok(false); + } + } + if let Some(filter_sensor) = &self.sensor { + let is_sensor_mismatch = if let Some(sensor) = sensor { + !sensor.contains(filter_sensor) + } else { + false + }; + if is_sensor_mismatch { + return Ok(false); + } + } Ok(true) } } @@ -181,14 +193,18 @@ struct OpLogRawEvent { timestamp: DateTime, level: String, contents: String, + agent_name: String, + sensor: String, } impl FromKeyValue for OpLogRawEvent { fn from_key_value(key: &[u8], l: OpLog) -> Result { Ok(OpLogRawEvent { - timestamp: get_timestamp_from_key(key)?, + timestamp: get_timestamp_from_key_prefix(key)?, level: format!("{:?}", l.log_level), contents: l.contents, + agent_name: l.agent_name, + sensor: l.sensor, }) } } @@ -251,14 +267,13 @@ impl LogQuery { ) -> Result> { let db = ctx.data::()?; let store = db.op_log_store()?; - query( after, before, first, last, |after, before, first, last| async move { - load_connection(&store, &filter, after, before, first, last) + load_connection_by_prefix_timestamp_key(&store, &filter, after, before, first, last) }, ) .await diff --git a/src/graphql/log/tests.rs b/src/graphql/log/tests.rs index 674eb7ac..92a320fe 100644 --- a/src/graphql/log/tests.rs +++ b/src/graphql/log/tests.rs @@ -1,3 +1,4 @@ +use std::sync::{Arc, OnceLock}; use std::{mem, net::IpAddr}; use chrono::{DateTime, TimeZone, Utc}; @@ -7,6 +8,8 @@ use giganto_client::ingest::{ }; use super::{base64_engine, Engine, LogFilter, LogRawEvent, OpLogFilter, OpLogRawEvent}; +use crate::graphql::load_connection; +use crate::ingest::generation::SequenceGenerator; use crate::{ graphql::{tests::TestSchema, TimeRange}, storage::RawEventStore, @@ -24,7 +27,7 @@ async fn load_time_range() { insert_log_raw_event(&store, "src1", 5, "kind1", b"log5"); // backward traversal in `start..end` - let connection = super::load_connection::( + let connection = load_connection::( &store, &LogFilter { time: Some(TimeRange { @@ -51,7 +54,7 @@ async fn load_time_range() { ); // backward traversal in `start..` - let connection = super::load_connection::( + let connection = load_connection::( &store, &LogFilter { time: Some(TimeRange { @@ -82,7 +85,7 @@ async fn load_time_range() { ); // backward traversal in `..end` - let connection = super::load_connection::( + let connection = load_connection::( &store, &LogFilter { time: Some(TimeRange { @@ -113,7 +116,7 @@ async fn load_time_range() { ); // forward traversal in `start..end` - let connection = super::load_connection::( + let connection = load_connection::( &store, &LogFilter { time: Some(TimeRange { @@ -140,7 +143,7 @@ async fn load_time_range() { ); // forward traversal `start..` - let connection = super::load_connection::( + let connection = load_connection::( &store, &LogFilter { time: Some(TimeRange { @@ -171,7 +174,7 @@ async fn load_time_range() { ); // forward traversal `..end` - let connection = super::load_connection::( + let connection = load_connection::( &store, &LogFilter { time: Some(TimeRange { @@ -198,7 +201,7 @@ async fn load_time_range() { ); // backward traversal in `start..end` and `before cursor` - let connection = super::load_connection::( + let connection = load_connection::( &store, &LogFilter { time: Some(TimeRange { @@ -225,7 +228,7 @@ async fn load_time_range() { ); // backward traversal in `start..` and `before cursor` - let connection = super::load_connection::( + let connection = load_connection::( &store, &LogFilter { time: Some(TimeRange { @@ -252,7 +255,7 @@ async fn load_time_range() { ); // backward traversal in `..end` and `before cursor` - let connection = super::load_connection::( + let connection = load_connection::( &store, &LogFilter { time: Some(TimeRange { @@ -283,7 +286,7 @@ async fn load_time_range() { ); // forward traversal in `start..end` and `after cursor` - let connection = super::load_connection::( + let connection = load_connection::( &store, &LogFilter { time: Some(TimeRange { @@ -310,7 +313,7 @@ async fn load_time_range() { ); // forward traversal `start..` and `after cursor` - let connection = super::load_connection::( + let connection = load_connection::( &store, &LogFilter { time: Some(TimeRange { @@ -337,7 +340,7 @@ async fn load_time_range() { ); // forward traversal `..end` and `after cursor` - let connection = super::load_connection::( + let connection = load_connection::( &store, &LogFilter { time: Some(TimeRange { @@ -364,7 +367,7 @@ async fn load_time_range() { ); // forward traversal `..` - let connection = super::load_connection::( + let connection = load_connection::( &store, &LogFilter { time: Some(TimeRange { @@ -458,8 +461,8 @@ async fn oplog_empty() { async fn oplog_with_data() { let schema = TestSchema::new(); let store = schema.db.op_log_store().unwrap(); - - insert_oplog_raw_event(&store, "giganto", 1); + let generator: OnceLock> = OnceLock::new(); + insert_oplog_raw_event(&store, "giganto", "src1", 1, &generator).await; let query = r#" { @@ -483,33 +486,150 @@ async fn oplog_with_data() { async fn load_oplog() { let schema = TestSchema::new(); let store = schema.db.op_log_store().unwrap(); + let generator: OnceLock> = OnceLock::new(); + + insert_oplog_raw_event(&store, "giganto", "src1", 1, &generator).await; + insert_oplog_raw_event(&store, "giganto", "src1", 2, &generator).await; + insert_oplog_raw_event(&store, "review", "src1", 2, &generator).await; + insert_oplog_raw_event(&store, "review", "src1", 3, &generator).await; + insert_oplog_raw_event(&store, "giganto", "src1", 3, &generator).await; + insert_oplog_raw_event(&store, "giganto", "src1", 4, &generator).await; + insert_oplog_raw_event(&store, "giganto", "src1", 5, &generator).await; + insert_oplog_raw_event(&store, "review", "src1", 5, &generator).await; + insert_oplog_raw_event(&store, "aice", "src1", 5, &generator).await; + + let connection = super::load_connection_by_prefix_timestamp_key::( + &store, + &OpLogFilter { + time: Some(TimeRange { + start: Some(DateTime::from_timestamp_nanos(5)), + end: Some(DateTime::from_timestamp_nanos(7)), + }), + agent_id: None, + log_level: Some("Info".to_string()), + contents: Some("oplog".to_string()), + sensor: None, + }, + None, + None, + Some(5), + None, + ) + .unwrap(); + assert_eq!(connection.edges.len(), 3); + assert_eq!(connection.edges[0].node.level.as_str(), "Info"); + assert_eq!(connection.edges[0].node.contents.as_str(), "oplog"); +} - insert_oplog_raw_event(&store, "giganto", 1); - insert_oplog_raw_event(&store, "giganto", 2); - insert_oplog_raw_event(&store, "giganto", 3); - insert_oplog_raw_event(&store, "giganto", 4); - insert_oplog_raw_event(&store, "giganto", 5); +#[tokio::test] +async fn load_connection_by_prefix_timestamp_key() { + let schema = TestSchema::new(); + let store = schema.db.op_log_store().unwrap(); + let generator: OnceLock> = OnceLock::new(); + let mut key_list: Vec> = Vec::new(); + + key_list.push(insert_oplog_raw_event(&store, "piglet", "src1", 1, &generator).await); + key_list.push(insert_oplog_raw_event(&store, "piglet", "src1", 2, &generator).await); + key_list.push(insert_oplog_raw_event(&store, "review", "src1", 2, &generator).await); + key_list.push(insert_oplog_raw_event(&store, "review", "src1", 3, &generator).await); + key_list.push(insert_oplog_raw_event(&store, "piglet", "src1", 3, &generator).await); + key_list.push(insert_oplog_raw_event(&store, "piglet", "src1", 4, &generator).await); + key_list.push(insert_oplog_raw_event(&store, "piglet", "src2", 5, &generator).await); + key_list.push(insert_oplog_raw_event(&store, "piglet", "src2", 6, &generator).await); + key_list.push(insert_oplog_raw_event(&store, "review", "src1", 4, &generator).await); + key_list.push(insert_oplog_raw_event(&store, "review", "src2", 5, &generator).await); + + let connection = super::load_connection_by_prefix_timestamp_key::( + &store, + &OpLogFilter { + time: Some(TimeRange { + start: Some(DateTime::from_timestamp_nanos(1)), + end: Some(DateTime::from_timestamp_nanos(10)), + }), + agent_id: Some("review".to_string()), + log_level: Some("Info".to_string()), + contents: Some("oplog".to_string()), + sensor: Some("src1".to_string()), + }, + None, + None, + Some(10), + None, + ) + .unwrap(); + assert_eq!(connection.edges.len(), 3); + assert_eq!(connection.edges[0].node.level.as_str(), "Info"); + assert_eq!(connection.edges[0].node.contents.as_str(), "oplog"); - let connection = super::load_connection::( + let after = key_list.get(3).unwrap(); + let after = base64_engine.encode(after); + let connection = super::load_connection_by_prefix_timestamp_key::( &store, &OpLogFilter { time: Some(TimeRange { start: Some(DateTime::from_timestamp_nanos(1)), - end: Some(DateTime::from_timestamp_nanos(3)), + end: Some(DateTime::from_timestamp_nanos(10)), }), - agent_id: "giganto@src 1".to_string(), + agent_id: Some("review".to_string()), log_level: Some("Info".to_string()), contents: Some("oplog".to_string()), + sensor: Some("src1".to_string()), }, + Some(after), None, + Some(10), None, - Some(3), + ) + .unwrap(); + assert_eq!(connection.edges.len(), 1); + assert_eq!(connection.edges[0].node.level.as_str(), "Info"); + assert_eq!(connection.edges[0].node.contents.as_str(), "oplog"); + + let before = key_list.get(8).unwrap(); + let before = base64_engine.encode(before); + let connection = super::load_connection_by_prefix_timestamp_key::( + &store, + &OpLogFilter { + time: Some(TimeRange { + start: Some(DateTime::from_timestamp_nanos(1)), + end: Some(DateTime::from_timestamp_nanos(10)), + }), + agent_id: Some("review".to_string()), + log_level: Some("Info".to_string()), + contents: Some("oplog".to_string()), + sensor: Some("src1".to_string()), + }, + None, + Some(before), None, + Some(10), ) .unwrap(); assert_eq!(connection.edges.len(), 2); assert_eq!(connection.edges[0].node.level.as_str(), "Info"); assert_eq!(connection.edges[1].node.contents.as_str(), "oplog"); + + let connection = super::load_connection_by_prefix_timestamp_key::( + &store, + &OpLogFilter { + time: Some(TimeRange { + start: Some(DateTime::from_timestamp_nanos(1)), + end: Some(DateTime::from_timestamp_nanos(10)), + }), + agent_id: Some("piglet".to_string()), + log_level: Some("Info".to_string()), + contents: Some("oplog".to_string()), + sensor: Some("src2".to_string()), + }, + None, + None, + None, + Some(10), + ) + .unwrap(); + assert_eq!(connection.edges.len(), 2); + assert_eq!(connection.edges[0].node.level.as_str(), "Info"); + assert_eq!(connection.edges[0].node.contents.as_str(), "oplog"); } fn insert_log_raw_event( @@ -533,23 +653,31 @@ fn insert_log_raw_event( store.append(&key, &value).unwrap(); } -fn insert_oplog_raw_event(store: &RawEventStore, agent_name: &str, timestamp: i64) { +async fn insert_oplog_raw_event( + store: &RawEventStore<'_, OpLog>, + agent_name: &str, + sensor: &str, + timestamp: i64, + generator: &OnceLock>, +) -> Vec { + let generator = generator.get_or_init(SequenceGenerator::init_generator); + let sequence_number = generator.generate_sequence_number(); + let mut key: Vec = Vec::new(); let agent_id = format!("{agent_name}@src 1"); - key.extend_from_slice(agent_id.as_bytes()); - key.push(0); key.extend_from_slice(×tamp.to_be_bytes()); + key.extend_from_slice(&sequence_number.to_be_bytes()); let oplog_body = OpLog { - sensor: "sensor".to_string(), + sensor: sensor.to_string(), agent_name: agent_id.to_string(), log_level: OpLogLevel::Info, contents: "oplog".to_string(), }; let value = bincode::serialize(&oplog_body).unwrap(); - store.append(&key, &value).unwrap(); + key } #[tokio::test] diff --git a/src/ingest.rs b/src/ingest.rs index 9fc9d2fc..750ccb07 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -1,7 +1,9 @@ +pub mod generation; pub mod implement; #[cfg(test)] mod tests; +use std::sync::OnceLock; use std::{ net::SocketAddr, sync::{ @@ -39,6 +41,7 @@ use tokio::{ use tracing::{error, info}; use x509_parser::nom::AsBytes; +use crate::ingest::generation::SequenceGenerator; use crate::publish::send_direct_stream; use crate::server::{ config_server, extract_cert_from_conn, subject_from_cert_verbose, Certs, @@ -54,10 +57,12 @@ const CHANNEL_CLOSE_MESSAGE: &[u8; 12] = b"channel done"; const CHANNEL_CLOSE_TIMESTAMP: i64 = -1; const NO_TIMESTAMP: i64 = 0; const SENSOR_INTERVAL: u64 = 60 * 60 * 24; -const INGEST_VERSION_REQ: &str = ">=0.23.0,<0.24.0"; +const INGEST_VERSION_REQ: &str = ">=0.24.0-alpha.1,<0.25.0"; type SensorInfo = (String, DateTime, ConnState, bool); +static GENERATOR: OnceLock> = OnceLock::new(); + enum ConnState { Connected, Disconnected, @@ -865,7 +870,7 @@ async fn handle_data( let mut packet_size = 0_u64; #[cfg(feature = "benchmark")] let mut packet_count = 0_u64; - for (timestamp, raw_event) in recv_buf { + for (timestamp, mut raw_event) in recv_buf { last_timestamp = timestamp; if (timestamp == CHANNEL_CLOSE_TIMESTAMP) && (raw_event.as_bytes() == CHANNEL_CLOSE_MESSAGE) @@ -879,16 +884,17 @@ async fn handle_data( } continue; } - let key_builder = StorageKey::builder().start_key(&sensor); - let key_builder = match raw_event_kind { + let storage_key = match raw_event_kind { RawEventKind::Log => { let Ok(log) = bincode::deserialize::(&raw_event) else { err_msg = Some("Failed to deserialize Log".to_string()); break; }; - key_builder + StorageKey::builder() + .start_key(&sensor) .mid_key(Some(log.kind.as_bytes().to_vec())) .end_key(timestamp) + .build() } RawEventKind::PeriodicTimeSeries => { let Ok(time_series) = @@ -901,25 +907,38 @@ async fn handle_data( StorageKey::builder() .start_key(&time_series.id) .end_key(timestamp) + .build() } RawEventKind::OpLog => { - let Ok(op_log) = bincode::deserialize::(&raw_event) else { + let Ok(mut op_log) = bincode::deserialize::(&raw_event) else { err_msg = Some("Failed to deserialize OpLog".to_string()); break; }; - let agent_id = format!("{}@{sensor}", op_log.agent_name); - StorageKey::builder() - .start_key(&agent_id) - .end_key(timestamp) + op_log.sensor.clone_from(&sensor); + let Ok(op_log) = bincode::serialize(&op_log) else { + err_msg = Some("Failed to serialize OpLog".to_string()); + break; + }; + raw_event.clone_from(&op_log); + + let generator = + GENERATOR.get_or_init(SequenceGenerator::init_generator); + let sequence_number = generator.generate_sequence_number(); + StorageKey::timestamp_builder() + .start_key(timestamp) + .mid_key(sequence_number) + .build() } RawEventKind::Packet => { let Ok(packet) = bincode::deserialize::(&raw_event) else { err_msg = Some("Failed to deserialize Packet".to_string()); break; }; - key_builder + StorageKey::builder() + .start_key(&sensor) .mid_key(Some(timestamp.to_be_bytes().to_vec())) .end_key(packet.packet_timestamp) + .build() } RawEventKind::Statistics => { let Ok(statistics) = bincode::deserialize::(&raw_event) @@ -936,25 +955,31 @@ async fn handle_data( packet_count += t_packet_count; packet_size += t_packet_size; } - key_builder + StorageKey::builder() + .start_key(&sensor) .mid_key(Some(statistics.core.to_be_bytes().to_vec())) .end_key(timestamp) + .build() } RawEventKind::SecuLog => { let Ok(secu_log) = bincode::deserialize::(&raw_event) else { err_msg = Some("Failed to deserialize SecuLog".to_string()); break; }; - key_builder + StorageKey::builder() + .start_key(&sensor) .mid_key(Some(secu_log.kind.as_bytes().to_vec())) .end_key(timestamp) + .build() } - _ => key_builder.end_key(timestamp), + _ => StorageKey::builder() + .start_key(&sensor) + .end_key(timestamp) + .build(), }; recv_events_cnt += 1; recv_events_len += raw_event.len(); - let storage_key = key_builder.build(); store.append(&storage_key.key(), &raw_event)?; if let Some(network_key) = network_key.as_ref() { if let Err(e) = send_direct_stream( diff --git a/src/ingest/generation.rs b/src/ingest/generation.rs new file mode 100644 index 00000000..afa72223 --- /dev/null +++ b/src/ingest/generation.rs @@ -0,0 +1,42 @@ +use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering}; +use std::sync::Arc; + +use chrono::{Datelike, Utc}; + +pub struct SequenceGenerator { + counter: AtomicUsize, + last_reset_date: AtomicU32, +} + +impl SequenceGenerator { + pub(crate) fn new() -> Self { + Self { + counter: AtomicUsize::new(0), + last_reset_date: AtomicU32::new(SequenceGenerator::get_current_date_time()), + } + } + + pub(crate) fn generate_sequence_number(&self) -> usize { + let current_date_time = SequenceGenerator::get_current_date_time(); + let last_reset_day = self.last_reset_date.load(Ordering::Acquire); + + if last_reset_day == current_date_time { + return self.counter.fetch_add(1, Ordering::Relaxed); + } + + self.last_reset_date + .store(current_date_time, Ordering::Release); + self.counter.store(1, Ordering::Release); + 1 + } + + pub(crate) fn init_generator() -> Arc { + let generator = SequenceGenerator::new(); + Arc::new(generator) + } + + pub(crate) fn get_current_date_time() -> u32 { + let utc_now = Utc::now(); + utc_now.day() + } +} diff --git a/src/ingest/implement.rs b/src/ingest/implement.rs index 2457d153..4a4864ad 100644 --- a/src/ingest/implement.rs +++ b/src/ingest/implement.rs @@ -305,6 +305,13 @@ impl EventFilter for OpLog { fn log_contents(&self) -> Option { Some(self.contents.clone()) } + fn agent_id(&self) -> Option { + Some(self.agent_name.clone()) + } + + fn sensor(&self) -> Option { + Some(self.sensor.clone()) + } } impl EventFilter for PeriodicTimeSeries { diff --git a/src/ingest/tests.rs b/src/ingest/tests.rs index eafa0653..de0f6d17 100644 --- a/src/ingest/tests.rs +++ b/src/ingest/tests.rs @@ -50,7 +50,7 @@ const KEY_PATH: &str = "tests/certs/node1/key.pem"; const CA_CERT_PATH: &str = "tests/certs/ca_cert.pem"; const HOST: &str = "node1"; const TEST_PORT: u16 = 60190; -const PROTOCOL_VERSION: &str = "0.23.0"; +const PROTOCOL_VERSION: &str = "0.24.0-alpha.1"; struct TestClient { conn: Connection, diff --git a/src/peer.rs b/src/peer.rs index 3175f8bb..166d507b 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -40,7 +40,7 @@ use crate::{ IngestSensors, }; -const PEER_VERSION_REQ: &str = ">=0.23.0,<0.24.0"; +const PEER_VERSION_REQ: &str = ">=0.24.0-alpha.1,<0.25.0"; const PEER_RETRY_INTERVAL: u64 = 5; pub type Peers = Arc>>; @@ -761,7 +761,7 @@ pub mod tests { const CA_CERT_PATH: &str = "tests/certs/ca_cert.pem"; const HOST: &str = "node1"; const TEST_PORT: u16 = 60191; - const PROTOCOL_VERSION: &str = "0.23.0"; + const PROTOCOL_VERSION: &str = "0.24.0-alpha.1"; pub struct TestClient { send: SendStream, diff --git a/src/publish.rs b/src/publish.rs index baea0347..565b7784 100644 --- a/src/publish.rs +++ b/src/publish.rs @@ -60,7 +60,7 @@ use crate::server::{ use crate::storage::{Database, Direction, RawEventStore, StorageKey}; use crate::{IngestSensors, PcapSensors, StreamDirectChannels}; -const PUBLISH_VERSION_REQ: &str = ">=0.23.0,<0.24.0"; +const PUBLISH_VERSION_REQ: &str = ">=0.24.0-alpha.1,<0.25.0"; pub struct Server { server_config: ServerConfig, diff --git a/src/publish/tests.rs b/src/publish/tests.rs index d3bace52..d01c486c 100644 --- a/src/publish/tests.rs +++ b/src/publish/tests.rs @@ -48,7 +48,7 @@ fn get_token() -> &'static Mutex { } const CA_CERT_PATH: &str = "tests/certs/ca_cert.pem"; -const PROTOCOL_VERSION: &str = "0.23.0"; +const PROTOCOL_VERSION: &str = "0.24.0-alpha.1"; const NODE1_CERT_PATH: &str = "tests/certs/node1/cert.pem"; const NODE1_KEY_PATH: &str = "tests/certs/node1/key.pem"; diff --git a/src/storage.rs b/src/storage.rs index 7cc5ed9a..0198dceb 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -665,6 +665,10 @@ impl StorageKey { StorageKeyBuilder::default() } + pub fn timestamp_builder() -> StorageTimestampKeyBuilder { + StorageTimestampKeyBuilder::default() + } + pub fn key(self) -> Vec { self.0 } @@ -676,6 +680,10 @@ pub trait KeyExtractor { fn get_range_end_key(&self) -> (Option>, Option>); } +pub trait TimestampKeyExtractor { + fn get_range_start_key(&self) -> (Option>, Option>); +} + #[allow(clippy::module_name_repetitions)] #[derive(Default, Debug, Clone)] pub struct StorageKeyBuilder { @@ -748,6 +756,68 @@ impl StorageKeyBuilder { } } +#[allow(clippy::module_name_repetitions)] +#[derive(Default, Debug, Clone)] +pub struct StorageTimestampKeyBuilder { + pre_key: Vec, +} + +impl StorageTimestampKeyBuilder { + pub fn start_key(mut self, key: i64) -> Self { + self.pre_key.reserve(TIMESTAMP_SIZE); + self.pre_key.extend_from_slice(&key.to_be_bytes()); + self + } + + pub fn mid_key(mut self, key: usize) -> Self { + let mid_key = key.to_be_bytes(); + self.pre_key.reserve(mid_key.len()); + self.pre_key.extend_from_slice(&mid_key); + self + } + + pub fn lower_closed_bound_start_key(mut self, time: Option>) -> Self { + self.pre_key.reserve(TIMESTAMP_SIZE); + let ns = if let Some(time) = time { + time.timestamp_nanos_opt().unwrap_or(i64::MAX) + } else { + 0 + }; + self.pre_key.extend_from_slice(&ns.to_be_bytes()); + self + } + + pub fn upper_open_bound_start_key(mut self, time: Option>) -> Self { + self.pre_key.reserve(TIMESTAMP_SIZE); + let ns = if let Some(time) = time { + time.timestamp_nanos_opt().unwrap_or(i64::MAX) + } else { + i64::MAX + }; + self.pre_key.extend_from_slice(&ns.to_be_bytes()); + self + } + + pub fn upper_closed_bound_start_key(mut self, time: Option>) -> Self { + self.pre_key.reserve(TIMESTAMP_SIZE); + if let Some(time) = time { + let ns = time.timestamp_nanos_opt().unwrap_or(i64::MAX); + if let Some(ns) = ns.checked_sub(1) { + if ns >= 0 { + self.pre_key.extend_from_slice(&ns.to_be_bytes()); + return self; + } + } + } + self.pre_key.extend_from_slice(&i64::MAX.to_be_bytes()); + self + } + + pub fn build(self) -> StorageKey { + StorageKey(self.pre_key) + } +} + pub type KeyValue = (Box<[u8]>, T); pub type RawValue = (Box<[u8]>, Box<[u8]>); diff --git a/src/storage/migration.rs b/src/storage/migration.rs index c5162424..8c893103 100644 --- a/src/storage/migration.rs +++ b/src/storage/migration.rs @@ -1,6 +1,7 @@ //! Routines to check the database format version and migrate it if necessary. mod migration_structures; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::{ fs::{create_dir_all, File}, io::{Read, Write}, @@ -18,18 +19,19 @@ use self::migration_structures::{ SecuLogBeforeV23, SmtpBeforeV21, SshBeforeV21, TlsBeforeV21, }; use super::{data_dir_to_db_path, Database, RAW_DATA_COLUMN_FAMILY_NAMES}; +use crate::storage::migration::migration_structures::OpLogBeforeV24; use crate::{ graphql::TIMESTAMP_SIZE, ingest::implement::EventFilter, storage::{ rocksdb_options, Conn as ConnFromV21, DbOptions, Http as HttpFromV21, Netflow5 as Netflow5FromV23, Netflow9 as Netflow9FromV23, Ntlm as NtlmFromV21, - RawEventStore, SecuLog as SecuLogFromV23, Smtp as SmtpFromV21, Ssh as SshFromV21, - StorageKey, Tls as TlsFromV21, + OpLog as OpLogFromV24, RawEventStore, SecuLog as SecuLogFromV23, Smtp as SmtpFromV21, + Ssh as SshFromV21, StorageKey, Tls as TlsFromV21, }, }; -const COMPATIBLE_VERSION_REQ: &str = ">=0.23.0,<0.24.0"; +const COMPATIBLE_VERSION_REQ: &str = ">=0.24.0-alpha.1,<0.25.0"; /// Migrates the data directory to the up-to-date format if necessary. /// @@ -61,6 +63,11 @@ pub fn migrate_data_dir(data_dir: &Path, db_opts: &DbOptions) -> Result<()> { Version::parse("0.23.0").expect("valid version"), migrate_0_21_to_0_23, ), + ( + VersionReq::parse(">=0.23.0,<0.24.0").expect("valid version requirement"), + Version::parse("0.24.0").expect("valid version"), + migrate_0_23_to_0_24, + ), ]; while let Some((_req, to, m)) = migration @@ -286,6 +293,12 @@ fn migrate_0_21_to_0_23(db_path: &Path, db_opts: &DbOptions) -> Result<()> { Ok(()) } +fn migrate_0_23_to_0_24(db_path: &Path, db_opts: &DbOptions) -> Result<()> { + let db = Database::open(db_path, db_opts)?; + migrate_0_23_0_to_0_24_0_op_log(&db)?; + Ok(()) +} + fn migrate_0_21_to_0_23_netflow5(db: &Database) -> Result<()> { let store = db.netflow5_store()?; for raw_event in store.iter_forward() { @@ -325,6 +338,46 @@ fn migrate_0_21_to_0_23_secu_log(db: &Database) -> Result<()> { Ok(()) } +fn migrate_0_23_0_to_0_24_0_op_log(db: &Database) -> Result<()> { + info!("start migration for oplog"); + let store = db.op_log_store()?; + let counter = AtomicUsize::new(0); + + for raw_event in store.iter_forward() { + let Ok((key, value)) = raw_event else { + continue; + }; + + let (Ok(timestamp), Ok(old)) = ( + get_timestamp_from_key(&key), + bincode::deserialize::(&value), + ) else { + continue; + }; + + if key.len() > TIMESTAMP_SIZE + 1 { + let old_start_key = String::from_utf8_lossy(&key[..(key.len() - (TIMESTAMP_SIZE + 1))]); + let split_start_key: Vec<_> = old_start_key.split('@').collect(); + let mut convert_new: OpLogFromV24 = old.into(); + let Some(sensor) = split_start_key.get(1) else { + continue; + }; + convert_new.sensor.clone_from(&(*sensor).to_string()); + let new = bincode::serialize(&convert_new)?; + + let storage_key = StorageKey::timestamp_builder() + .start_key(timestamp) + .mid_key(counter.fetch_add(1, Ordering::Relaxed)) + .build(); + + store.append(&storage_key.key(), &new)?; + store.delete(&key)?; + } + } + info!("oplog migration complete"); + Ok(()) +} + // Since rocksdb does not provide column familiy renaming interface, we need to copy the data from // the old column family to the new one, and then drop the old column family. fn rename_sources_to_sensors(db_path: &Path, db_opts: &DbOptions) -> Result<()> { @@ -379,24 +432,40 @@ fn get_timestamp_from_key(key: &[u8]) -> Result { #[cfg(test)] mod tests { + use std::fs; + use std::fs::File; + use std::io::Write; use std::net::IpAddr; + use std::path::PathBuf; use chrono::Utc; + use giganto_client::ingest::log::OpLogLevel; use rocksdb::{Options, WriteBatch, DB}; use semver::{Version, VersionReq}; + use tempfile::TempDir; use super::COMPATIBLE_VERSION_REQ; + use crate::storage::migration::migration_structures::OpLogBeforeV24; use crate::storage::{ - data_dir_to_db_path, + data_dir_to_db_path, migrate_data_dir, migration::migration_structures::{ ConnBeforeV21, HttpFromV12BeforeV21, Netflow5BeforeV23, Netflow9BeforeV23, NtlmBeforeV21, SecuLogBeforeV23, SmtpBeforeV21, SshBeforeV21, TlsBeforeV21, }, Conn as ConnFromV21, Database, DbOptions, Http as HttpFromV21, Netflow5 as Netflow5FromV23, - Netflow9 as Netflow9FromV23, Ntlm as NtlmFromV21, SecuLog as SecuLogFromV23, - Smtp as SmtpFromV21, Ssh as SshFromV21, StorageKey, Tls as TlsFromV21, + Netflow9 as Netflow9FromV23, Ntlm as NtlmFromV21, OpLog as OpLogFromV24, + SecuLog as SecuLogFromV23, Smtp as SmtpFromV21, Ssh as SshFromV21, StorageKey, + Tls as TlsFromV21, }; + fn mock_version_file(dir: &TempDir, version_content: &str) -> PathBuf { + let version_path = dir.path().join("VERSION"); + let mut file = File::create(&version_path).expect("Failed to create VERSION file"); + file.write_all(version_content.as_bytes()) + .expect("Failed to write version"); + version_path + } + #[test] fn version() { let compatible = VersionReq::parse(COMPATIBLE_VERSION_REQ).expect("valid semver"); @@ -1113,4 +1182,66 @@ mod tests { assert_eq!(result_value_1, b"test_value_1"); assert_eq!(result_value_2, b"test_value_2"); } + + #[test] + fn migrate_0_23_to_0_24_0_oplog() { + const TEST_TIMESTAMP: i64 = 1000; + + let db_dir = tempfile::tempdir().unwrap(); + let db = Database::open(db_dir.path(), &DbOptions::default()).unwrap(); + let op_log_store = db.op_log_store().unwrap(); + + let old_op_log = OpLogBeforeV24 { + agent_name: "local".to_string(), + log_level: OpLogLevel::Info, + contents: "test".to_string(), + }; + + let serialized_old_op_log = bincode::serialize(&old_op_log).unwrap(); + let op_log_old_key = StorageKey::builder() + .start_key("local@sr1") + .end_key(TEST_TIMESTAMP) + .build() + .key(); + + op_log_store + .append(&op_log_old_key, &serialized_old_op_log) + .unwrap(); + + super::migrate_0_23_0_to_0_24_0_op_log(&db).unwrap(); + + let count = op_log_store.iter_forward().count(); + assert_eq!(count, 1); + + for log in op_log_store.iter_forward() { + let Ok((_key, value)) = log else { + continue; + }; + + let Ok(oplog) = bincode::deserialize::(&value) else { + continue; + }; + + assert_eq!(oplog.sensor, "sr1".to_string()); + assert_eq!(oplog.agent_name, "local".to_string()); + } + } + + #[test] + fn migrate_data_dir_version_test() { + let version_dir = tempfile::tempdir().unwrap(); + + mock_version_file(&version_dir, "0.13.0"); + + let db_options = DbOptions::new(8000, 512, 8, 2); + + let result = migrate_data_dir(version_dir.path(), &db_options); + assert!(result.is_ok()); + + if let Ok(updated_version) = fs::read_to_string(version_dir.path().join("VERSION")) { + let current = Version::parse(env!("CARGO_PKG_VERSION")).expect("valid semver"); + let diff = Version::parse(&updated_version).expect("valid semver"); + assert_eq!(current, diff) + } + } } diff --git a/src/storage/migration/migration_structures.rs b/src/storage/migration/migration_structures.rs index 01c9e2a9..9c9c442b 100644 --- a/src/storage/migration/migration_structures.rs +++ b/src/storage/migration/migration_structures.rs @@ -1,13 +1,14 @@ use std::net::IpAddr; +use giganto_client::ingest::log::OpLogLevel; use serde::{Deserialize, Serialize}; use crate::{ ingest::implement::EventFilter, storage::{ Conn as ConnFromV21, Http as HttpFromV21, Netflow5 as Netflow5FromV23, - Netflow9 as Netflow9FromV23, Ntlm as NtlmFromV21, SecuLog as SecuLogFromV23, - Smtp as SmtpFromV21, Ssh as SshFromV21, Tls as TlsFromV21, + Netflow9 as Netflow9FromV23, Ntlm as NtlmFromV21, OpLog as OpLogFromV24, + SecuLog as SecuLogFromV23, Smtp as SmtpFromV21, Ssh as SshFromV21, Tls as TlsFromV21, }, }; #[derive(Debug, Deserialize, Serialize, Eq, PartialEq)] @@ -487,3 +488,21 @@ impl From for SecuLogFromV23 { } } } + +#[derive(Deserialize, Serialize)] +pub struct OpLogBeforeV24 { + pub agent_name: String, + pub log_level: OpLogLevel, + pub contents: String, +} + +impl From for OpLogFromV24 { + fn from(input: OpLogBeforeV24) -> Self { + Self { + sensor: String::new(), + agent_name: input.agent_name, + log_level: input.log_level, + contents: input.contents, + } + } +}