From ff0dcaef852233ecfa322c19669d4c9fbf1a1a3a Mon Sep 17 00:00:00 2001 From: henry0715-dev Date: Wed, 11 Sep 2024 14:07:05 +0900 Subject: [PATCH] Change `OpLogFilter::agent_id` to be optional Close #724 The RocksDB keys in the `oplog` are currently in the form agent_id-0-timestamp. This change modifies the keys to the form timestamp-0-sequence. --- CHANGELOG.md | 19 ++ Cargo.lock | 4 +- Cargo.toml | 4 +- src/graphql.rs | 160 +++++++++++++++ .../client/schema/op_log_raw_events.graphql | 2 + src/graphql/client/schema/schema.graphql | 5 +- src/graphql/export/tests.rs | 26 ++- src/graphql/log.rs | 57 ++++-- src/graphql/log/tests.rs | 187 +++++++++++++++--- src/ingest.rs | 55 ++++-- src/ingest/generation.rs | 45 +++++ src/ingest/implement.rs | 7 + src/ingest/tests.rs | 3 +- src/peer.rs | 4 +- src/publish.rs | 2 +- src/publish/tests.rs | 2 +- src/storage.rs | 75 +++++++ src/storage/migration.rs | 134 ++++++++++++- src/storage/migration/migration_structures.rs | 23 ++- 19 files changed, 727 insertions(+), 87 deletions(-) create mode 100644 src/ingest/generation.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 20287c74..2ebb9461 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,17 @@ Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +### Added + +- Added `sensor` field to `OpLog`. + - Added the `load_connection_by_prefix_timestamp_key` function and + `TimestampKeyExtractor` trait to enable querying of keys prefixed with + `timestamp`. + - The `opLogRawEvents` GraphQL API no longer requires `agentId` and now + accepts it as an optional parameter. Additionally, the API response now + includes logs from all agents displayed in chronological order, rather than + being limited to the logs of a single agent. + ### Changed - Remote configuration is no longer stored in a temporary file, nor does it @@ -14,6 +25,14 @@ Versioning](https://semver.org/spec/v2.0.0.html). - Changed GraphQL APIs `config` and `setConfig` when using local configuration. - `config` return error when using local config. - `setConfig` return `Ok(false)` when using local config. +- Update the compatibility version of the quic communication modules. + - Changed `PEER_VERSION_REQ` to ">=0.23.0-alpha.1,<0.24.0". + - Changed `INGEST_VERSION_REQ` to ">=0.23.0-alpha.1,<0.24.0". + - Changed `PUBLISH_VERSION_REQ` to ">=0.23.0-alpha.1,<0.24.0". +- Fixed code related to migration. + - Changed `COMPATIBLE_VERSION_REQ` to “>=0.21.0-alpha.2,<0.22.0” + - Added migration function in `migrate_0_21_to_0_23_0`. This function performs + a migration to change the `key`, `value` of `Oplog`. ### Removed diff --git a/Cargo.lock b/Cargo.lock index fcc91685..70ba14bd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1023,7 +1023,7 @@ dependencies = [ [[package]] name = "giganto" -version = "0.22.1" +version = "0.23.0-alpha.1" dependencies = [ "anyhow", "async-graphql", @@ -1076,7 +1076,7 @@ dependencies = [ [[package]] name = "giganto-client" version = "0.20.0" -source = "git+https://github.com/aicers/giganto-client.git?tag=0.20.0#d38bc68469ef1d64e4e5d49dd0b5ca20aec4d30b" +source = "git+https://github.com/aicers/giganto-client.git?rev=5d6148#5d6148191164d71095a280d036714c64ea71e302" dependencies = [ "anyhow", "bincode", diff --git a/Cargo.toml b/Cargo.toml index 0413f884..80fb1c77 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "giganto" -version = "0.22.1" +version = "0.23.0-alpha.1" edition = "2021" [lib] @@ -24,7 +24,7 @@ data-encoding = "2.4" deluxe = "0.5" directories = "5.0" futures-util = "0.3" -giganto-client = { git = "https://github.com/aicers/giganto-client.git", tag = "0.20.0" } +giganto-client = { git = "https://github.com/aicers/giganto-client.git", rev = "5d6148"} # TODO modify tag version graphql_client = "0.14" humantime = "2" humantime-serde = "1" diff --git a/src/graphql.rs b/src/graphql.rs index e6dac052..5a328eaa 100644 --- a/src/graphql.rs +++ b/src/graphql.rs @@ -362,6 +362,125 @@ where Ok((records, has_previous, has_next)) } +#[allow(clippy::too_many_lines)] +fn get_connection_by_prefix_timestamp_key( + store: &RawEventStore<'_, T>, + filter: &(impl RawEventFilter + TimestampKeyExtractor), + after: Option, + before: Option, + first: Option, + last: Option, +) -> Result> +where + T: DeserializeOwned + EventFilter, +{ + let (records, has_previous, has_next) = if let Some(before) = before { + if after.is_some() { + return Err("cannot use both `after` and `before`".into()); + } + if first.is_some() { + return Err("'before' and 'first' cannot be specified simultaneously".into()); + } + + let last = last.unwrap_or(MAXIMUM_PAGE_SIZE).min(MAXIMUM_PAGE_SIZE); + let cursor = base64_engine.decode(before)?; + + // generate storage search key + let key_builder = StorageKey::timestamp_builder(); + let from_key = key_builder + .clone() + .upper_open_bound_end_key(filter.get_range_start_key().1) + .build(); + let to_key = key_builder + .lower_closed_bound_end_key(filter.get_range_start_key().0) + .build(); + + if cursor.cmp(&from_key.key()) == std::cmp::Ordering::Greater { + return Err("invalid cursor".into()); + } + let mut iter = store + .boundary_iter(&cursor, &to_key.key(), Direction::Reverse) + .peekable(); + if let Some(Ok((key, _))) = iter.peek() { + if key.as_ref() == cursor { + iter.next(); + } + } + let (mut records, has_previous) = collect_records(iter, last, filter); + records.reverse(); + (records, has_previous, false) + } else if let Some(after) = after { + if before.is_some() { + return Err("cannot use both `after` and `before`".into()); + } + if last.is_some() { + return Err("'after' and 'last' cannot be specified simultaneously".into()); + } + let first = first.unwrap_or(MAXIMUM_PAGE_SIZE).min(MAXIMUM_PAGE_SIZE); + let cursor = base64_engine.decode(after)?; + + // generate storage search key + let key_builder = StorageKey::timestamp_builder(); + let from_key = key_builder + .clone() + .lower_closed_bound_end_key(filter.get_range_start_key().0) + .build(); + let to_key = key_builder + .upper_open_bound_end_key(filter.get_range_start_key().1) + .build(); + + if cursor.cmp(&from_key.key()) == std::cmp::Ordering::Less { + return Err("invalid cursor".into()); + } + let mut iter = store + .boundary_iter(&cursor, &to_key.key(), Direction::Forward) + .peekable(); + if let Some(Ok((key, _))) = iter.peek() { + if key.as_ref() == cursor { + iter.next(); + } + } + let (records, has_next) = collect_records(iter, first, filter); + (records, false, has_next) + } else if let Some(last) = last { + if first.is_some() { + return Err("first and last cannot be used together".into()); + } + let last = last.min(MAXIMUM_PAGE_SIZE); + + // generate storage search key + let key_builder = StorageKey::timestamp_builder(); + let from_key = key_builder + .clone() + .upper_closed_bound_end_key(filter.get_range_start_key().1) + .build(); + let to_key = key_builder + .lower_closed_bound_end_key(filter.get_range_start_key().0) + .build(); + + let iter = store.boundary_iter(&from_key.key(), &to_key.key(), Direction::Reverse); + let (mut records, has_previous) = collect_records(iter, last, filter); + records.reverse(); + (records, has_previous, false) + } else { + let first = first.unwrap_or(MAXIMUM_PAGE_SIZE).min(MAXIMUM_PAGE_SIZE); + // generate storage search key + let key_builder = StorageKey::timestamp_builder(); + let from_key = key_builder + .clone() + .lower_closed_bound_end_key(filter.get_range_start_key().0) + .build(); + let to_key = key_builder + .upper_open_bound_end_key(filter.get_range_start_key().1) + .build(); + + let iter = store.boundary_iter(&from_key.key(), &to_key.key(), Direction::Forward); + let (records, has_next) = collect_records(iter, first, filter); + (records, false, has_next) + }; + Ok((records, has_previous, has_next)) +} + fn load_connection( store: &RawEventStore<'_, T>, filter: &(impl RawEventFilter + KeyExtractor), @@ -377,6 +496,37 @@ where let (records, has_previous, has_next) = get_connection(store, filter, after, before, first, last)?; + create_connection(records, has_previous, has_next) +} + +fn load_connection_by_prefix_timestamp_key( + store: &RawEventStore<'_, T>, + filter: &(impl RawEventFilter + TimestampKeyExtractor), + after: Option, + before: Option, + first: Option, + last: Option, +) -> Result> +where + N: FromKeyValue + OutputType, + T: DeserializeOwned + EventFilter, +{ + let (records, has_previous, has_next) = + get_connection_by_prefix_timestamp_key(store, filter, after, before, first, last)?; + + create_connection(records, has_previous, has_next) +} + +#[allow(clippy::unnecessary_wraps)] +fn create_connection( + records: Vec<(Box<[u8]>, T)>, + has_previous: bool, + has_next: bool, +) -> Result> +where + N: FromKeyValue + OutputType, + T: DeserializeOwned, +{ let mut connection: Connection = Connection::new(has_previous, has_next); connection.edges = records .into_iter() @@ -437,6 +587,14 @@ where (records, has_more) } +pub fn get_timestamp_from_key_prefix(key: &[u8]) -> Result, anyhow::Error> { + if key.len() > TIMESTAMP_SIZE { + let timestamp = i64::from_be_bytes(key[0..TIMESTAMP_SIZE].try_into()?); + return Ok(Utc.timestamp_nanos(timestamp)); + } + Err(anyhow!("invalid database key length")) +} + pub fn get_timestamp_from_key(key: &[u8]) -> Result, anyhow::Error> { if key.len() > TIMESTAMP_SIZE { let nanos = i64::from_be_bytes(key[(key.len() - TIMESTAMP_SIZE)..].try_into()?); @@ -1515,6 +1673,8 @@ macro_rules! impl_from_giganto_search_filter_for_graphql_client { } pub(crate) use impl_from_giganto_search_filter_for_graphql_client; +use crate::storage::TimestampKeyExtractor; + #[cfg(test)] mod tests { use std::collections::{HashMap, HashSet}; diff --git a/src/graphql/client/schema/op_log_raw_events.graphql b/src/graphql/client/schema/op_log_raw_events.graphql index d40daf2e..add2d516 100644 --- a/src/graphql/client/schema/op_log_raw_events.graphql +++ b/src/graphql/client/schema/op_log_raw_events.graphql @@ -12,6 +12,8 @@ query OpLogRawEvents($filter: OpLogFilter!, $after: String, $before: String, $fi timestamp level contents + agentName + sensor } } } diff --git a/src/graphql/client/schema/schema.graphql b/src/graphql/client/schema/schema.graphql index 958ca763..bed6ee78 100644 --- a/src/graphql/client/schema/schema.graphql +++ b/src/graphql/client/schema/schema.graphql @@ -991,7 +991,8 @@ type NtlmRawEventEdge { input OpLogFilter { time: TimeRange - agentId: String! + sensor: String + agentId: String logLevel: String contents: String } @@ -1000,6 +1001,8 @@ type OpLogRawEvent { timestamp: DateTime! level: String! contents: String! + agentName: String! + sensor: String! } type OpLogRawEventConnection { diff --git a/src/graphql/export/tests.rs b/src/graphql/export/tests.rs index 4887d99d..bb152e44 100644 --- a/src/graphql/export/tests.rs +++ b/src/graphql/export/tests.rs @@ -1,5 +1,6 @@ use std::mem; use std::net::IpAddr; +use std::sync::{Arc, OnceLock}; use chrono::{Duration, Utc}; use giganto_client::ingest::{ @@ -12,6 +13,7 @@ use giganto_client::ingest::{ }; use crate::graphql::tests::TestSchema; +use crate::ingest::generation::SequenceGenerator; use crate::storage::RawEventStore; #[tokio::test] @@ -907,9 +909,10 @@ fn insert_time_series( async fn export_op_log() { let schema = TestSchema::new(); let store = schema.db.op_log_store().unwrap(); + let generator: OnceLock> = OnceLock::new(); - insert_op_log_raw_event(&store, "agent1", 1); - insert_op_log_raw_event(&store, "agent2", 1); + insert_op_log_raw_event(&store, "agent1", "src1", 1, &generator).await; + insert_op_log_raw_event(&store, "agent2", "src1", 1, &generator).await; // export csv file let query = r#" @@ -938,17 +941,26 @@ async fn export_op_log() { assert!(res.data.to_string().contains("op_log")); } -fn insert_op_log_raw_event(store: &RawEventStore, agent_name: &str, timestamp: i64) { +async fn insert_op_log_raw_event( + store: &RawEventStore<'_, OpLog>, + agent_name: &str, + sensor: &str, + timestamp: i64, + generator: &OnceLock>, +) { + let generator = generator.get_or_init(SequenceGenerator::init_generator); + let sequence_number = generator.generate_sequence_number().await; + let mut key: Vec = Vec::new(); - let agent_id = format!("{agent_name}@src1"); - key.extend_from_slice(agent_id.as_bytes()); - key.push(0); key.extend_from_slice(×tamp.to_be_bytes()); + key.push(0); + key.extend_from_slice(&sequence_number.to_be_bytes()); let op_log_body = OpLog { - agent_name: agent_id.to_string(), + agent_name: agent_name.to_string(), log_level: OpLogLevel::Info, contents: "op_log".to_string(), + sensor: sensor.to_string(), }; let value = bincode::serialize(&op_log_body).unwrap(); diff --git a/src/graphql/log.rs b/src/graphql/log.rs index 54eaa869..e06e447f 100644 --- a/src/graphql/log.rs +++ b/src/graphql/log.rs @@ -16,13 +16,13 @@ use graphql_client::GraphQLQuery; use super::{ base64_engine, client::derives::{log_raw_events, LogRawEvents}, - get_timestamp_from_key, handle_paged_events, - impl_from_giganto_time_range_struct_for_graphql_client, load_connection, - paged_events_in_cluster, Engine, FromKeyValue, + get_timestamp_from_key, get_timestamp_from_key_prefix, handle_paged_events, + impl_from_giganto_time_range_struct_for_graphql_client, + load_connection_by_prefix_timestamp_key, paged_events_in_cluster, Engine, FromKeyValue, }; use crate::{ graphql::{RawEventFilter, TimeRange}, - storage::{Database, KeyExtractor}, + storage::{Database, KeyExtractor, TimestampKeyExtractor}, }; #[derive(Default)] @@ -74,22 +74,14 @@ impl RawEventFilter for LogFilter { #[derive(InputObject)] pub struct OpLogFilter { time: Option, - agent_id: String, + sensor: Option, + agent_id: Option, log_level: Option, contents: Option, } -impl KeyExtractor for OpLogFilter { - fn get_start_key(&self) -> &str { - &self.agent_id - } - - // oplog event don't use mid key - fn get_mid_key(&self) -> Option> { - None - } - - fn get_range_end_key(&self) -> (Option>, Option>) { +impl TimestampKeyExtractor for OpLogFilter { + fn get_range_start_key(&self) -> (Option>, Option>) { if let Some(time) = &self.time { (time.start, time.end) } else { @@ -108,8 +100,8 @@ impl RawEventFilter for OpLogFilter { log_level: Option, log_contents: Option, _text: Option, - _source: Option, - _agent_id: Option, + sensor: Option, + agent_id: Option, ) -> Result { if let Some(filter_level) = &self.log_level { let log_level = if let Some(log_level) = log_level { @@ -131,6 +123,26 @@ impl RawEventFilter for OpLogFilter { return Ok(false); } } + if let Some(filter_agent_id) = &self.agent_id { + let is_agent_id_mismatch = if let Some(agent_id) = agent_id { + !agent_id.contains(filter_agent_id) + } else { + false + }; + if is_agent_id_mismatch { + return Ok(false); + } + } + if let Some(filter_sensor) = &self.sensor { + let is_sensor_mismatch = if let Some(sensor) = sensor { + !sensor.contains(filter_sensor) + } else { + false + }; + if is_sensor_mismatch { + return Ok(false); + } + } Ok(true) } } @@ -156,14 +168,18 @@ struct OpLogRawEvent { timestamp: DateTime, level: String, contents: String, + agent_name: String, + sensor: String, } impl FromKeyValue for OpLogRawEvent { fn from_key_value(key: &[u8], l: OpLog) -> Result { Ok(OpLogRawEvent { - timestamp: get_timestamp_from_key(key)?, + timestamp: get_timestamp_from_key_prefix(key)?, level: format!("{:?}", l.log_level), contents: l.contents, + agent_name: l.agent_name, + sensor: l.sensor, }) } } @@ -226,14 +242,13 @@ impl LogQuery { ) -> Result> { let db = ctx.data::()?; let store = db.op_log_store()?; - query( after, before, first, last, |after, before, first, last| async move { - load_connection(&store, &filter, after, before, first, last) + load_connection_by_prefix_timestamp_key(&store, &filter, after, before, first, last) }, ) .await diff --git a/src/graphql/log/tests.rs b/src/graphql/log/tests.rs index 2696a97b..c371d7c2 100644 --- a/src/graphql/log/tests.rs +++ b/src/graphql/log/tests.rs @@ -1,7 +1,11 @@ +use std::sync::{Arc, OnceLock}; + use chrono::DateTime; use giganto_client::ingest::log::{Log, OpLog, OpLogLevel}; use super::{base64_engine, Engine, LogFilter, LogRawEvent, OpLogFilter, OpLogRawEvent}; +use crate::graphql::load_connection; +use crate::ingest::generation::SequenceGenerator; use crate::{ graphql::{tests::TestSchema, TimeRange}, storage::RawEventStore, @@ -19,7 +23,7 @@ async fn load_time_range() { insert_log_raw_event(&store, "src1", 5, "kind1", b"log5"); // backward traversal in `start..end` - let connection = super::load_connection::( + let connection = load_connection::( &store, &LogFilter { time: Some(TimeRange { @@ -46,7 +50,7 @@ async fn load_time_range() { ); // backward traversal in `start..` - let connection = super::load_connection::( + let connection = load_connection::( &store, &LogFilter { time: Some(TimeRange { @@ -77,7 +81,7 @@ async fn load_time_range() { ); // backward traversal in `..end` - let connection = super::load_connection::( + let connection = load_connection::( &store, &LogFilter { time: Some(TimeRange { @@ -108,7 +112,7 @@ async fn load_time_range() { ); // forward traversal in `start..end` - let connection = super::load_connection::( + let connection = load_connection::( &store, &LogFilter { time: Some(TimeRange { @@ -135,7 +139,7 @@ async fn load_time_range() { ); // forward traversal `start..` - let connection = super::load_connection::( + let connection = load_connection::( &store, &LogFilter { time: Some(TimeRange { @@ -166,7 +170,7 @@ async fn load_time_range() { ); // forward traversal `..end` - let connection = super::load_connection::( + let connection = load_connection::( &store, &LogFilter { time: Some(TimeRange { @@ -193,7 +197,7 @@ async fn load_time_range() { ); // backward traversal in `start..end` and `before cursor` - let connection = super::load_connection::( + let connection = load_connection::( &store, &LogFilter { time: Some(TimeRange { @@ -220,7 +224,7 @@ async fn load_time_range() { ); // backward traversal in `start..` and `before cursor` - let connection = super::load_connection::( + let connection = load_connection::( &store, &LogFilter { time: Some(TimeRange { @@ -247,7 +251,7 @@ async fn load_time_range() { ); // backward traversal in `..end` and `before cursor` - let connection = super::load_connection::( + let connection = load_connection::( &store, &LogFilter { time: Some(TimeRange { @@ -278,7 +282,7 @@ async fn load_time_range() { ); // forward traversal in `start..end` and `after cursor` - let connection = super::load_connection::( + let connection = load_connection::( &store, &LogFilter { time: Some(TimeRange { @@ -305,7 +309,7 @@ async fn load_time_range() { ); // forward traversal `start..` and `after cursor` - let connection = super::load_connection::( + let connection = load_connection::( &store, &LogFilter { time: Some(TimeRange { @@ -332,7 +336,7 @@ async fn load_time_range() { ); // forward traversal `..end` and `after cursor` - let connection = super::load_connection::( + let connection = load_connection::( &store, &LogFilter { time: Some(TimeRange { @@ -359,7 +363,7 @@ async fn load_time_range() { ); // forward traversal `..` - let connection = super::load_connection::( + let connection = load_connection::( &store, &LogFilter { time: Some(TimeRange { @@ -453,8 +457,8 @@ async fn oplog_empty() { async fn oplog_with_data() { let schema = TestSchema::new(); let store = schema.db.op_log_store().unwrap(); - - insert_oplog_raw_event(&store, "giganto", 1); + let generator: OnceLock> = OnceLock::new(); + insert_oplog_raw_event(&store, "giganto", "src1", 1, &generator).await; let query = r#" { @@ -478,33 +482,150 @@ async fn oplog_with_data() { async fn load_oplog() { let schema = TestSchema::new(); let store = schema.db.op_log_store().unwrap(); + let generator: OnceLock> = OnceLock::new(); + + insert_oplog_raw_event(&store, "giganto", "src1", 1, &generator).await; + insert_oplog_raw_event(&store, "giganto", "src1", 2, &generator).await; + insert_oplog_raw_event(&store, "review", "src1", 2, &generator).await; + insert_oplog_raw_event(&store, "review", "src1", 3, &generator).await; + insert_oplog_raw_event(&store, "giganto", "src1", 3, &generator).await; + insert_oplog_raw_event(&store, "giganto", "src1", 4, &generator).await; + insert_oplog_raw_event(&store, "giganto", "src1", 5, &generator).await; + insert_oplog_raw_event(&store, "review", "src1", 5, &generator).await; + insert_oplog_raw_event(&store, "aice", "src1", 5, &generator).await; + + let connection = super::load_connection_by_prefix_timestamp_key::( + &store, + &OpLogFilter { + time: Some(TimeRange { + start: Some(DateTime::from_timestamp_nanos(5)), + end: Some(DateTime::from_timestamp_nanos(7)), + }), + agent_id: None, + log_level: Some("Info".to_string()), + contents: Some("oplog".to_string()), + sensor: None, + }, + None, + None, + Some(5), + None, + ) + .unwrap(); + assert_eq!(connection.edges.len(), 3); + assert_eq!(connection.edges[0].node.level.as_str(), "Info"); + assert_eq!(connection.edges[0].node.contents.as_str(), "oplog"); +} - insert_oplog_raw_event(&store, "giganto", 1); - insert_oplog_raw_event(&store, "giganto", 2); - insert_oplog_raw_event(&store, "giganto", 3); - insert_oplog_raw_event(&store, "giganto", 4); - insert_oplog_raw_event(&store, "giganto", 5); +#[tokio::test] +async fn load_connection_by_prefix_timestamp_key() { + let schema = TestSchema::new(); + let store = schema.db.op_log_store().unwrap(); + let generator: OnceLock> = OnceLock::new(); + let mut key_list: Vec> = Vec::new(); + + key_list.push(insert_oplog_raw_event(&store, "piglet", "src1", 1, &generator).await); + key_list.push(insert_oplog_raw_event(&store, "piglet", "src1", 2, &generator).await); + key_list.push(insert_oplog_raw_event(&store, "review", "src1", 2, &generator).await); + key_list.push(insert_oplog_raw_event(&store, "review", "src1", 3, &generator).await); + key_list.push(insert_oplog_raw_event(&store, "piglet", "src1", 3, &generator).await); + key_list.push(insert_oplog_raw_event(&store, "piglet", "src1", 4, &generator).await); + key_list.push(insert_oplog_raw_event(&store, "piglet", "src2", 5, &generator).await); + key_list.push(insert_oplog_raw_event(&store, "piglet", "src2", 6, &generator).await); + key_list.push(insert_oplog_raw_event(&store, "review", "src1", 4, &generator).await); + key_list.push(insert_oplog_raw_event(&store, "review", "src2", 5, &generator).await); + + let connection = super::load_connection_by_prefix_timestamp_key::( + &store, + &OpLogFilter { + time: Some(TimeRange { + start: Some(DateTime::from_timestamp_nanos(1)), + end: Some(DateTime::from_timestamp_nanos(10)), + }), + agent_id: Some("review".to_string()), + log_level: Some("Info".to_string()), + contents: Some("oplog".to_string()), + sensor: Some("src1".to_string()), + }, + None, + None, + Some(10), + None, + ) + .unwrap(); + assert_eq!(connection.edges.len(), 3); + assert_eq!(connection.edges[0].node.level.as_str(), "Info"); + assert_eq!(connection.edges[0].node.contents.as_str(), "oplog"); - let connection = super::load_connection::( + let after = key_list.get(3).unwrap(); + let after = base64_engine.encode(after); + let connection = super::load_connection_by_prefix_timestamp_key::( &store, &OpLogFilter { time: Some(TimeRange { start: Some(DateTime::from_timestamp_nanos(1)), - end: Some(DateTime::from_timestamp_nanos(3)), + end: Some(DateTime::from_timestamp_nanos(10)), }), - agent_id: "giganto@src 1".to_string(), + agent_id: Some("review".to_string()), log_level: Some("Info".to_string()), contents: Some("oplog".to_string()), + sensor: Some("src1".to_string()), }, + Some(after), None, + Some(10), None, - Some(3), + ) + .unwrap(); + assert_eq!(connection.edges.len(), 1); + assert_eq!(connection.edges[0].node.level.as_str(), "Info"); + assert_eq!(connection.edges[0].node.contents.as_str(), "oplog"); + + let before = key_list.get(8).unwrap(); + let before = base64_engine.encode(before); + let connection = super::load_connection_by_prefix_timestamp_key::( + &store, + &OpLogFilter { + time: Some(TimeRange { + start: Some(DateTime::from_timestamp_nanos(1)), + end: Some(DateTime::from_timestamp_nanos(10)), + }), + agent_id: Some("review".to_string()), + log_level: Some("Info".to_string()), + contents: Some("oplog".to_string()), + sensor: Some("src1".to_string()), + }, + None, + Some(before), None, + Some(10), ) .unwrap(); assert_eq!(connection.edges.len(), 2); assert_eq!(connection.edges[0].node.level.as_str(), "Info"); assert_eq!(connection.edges[1].node.contents.as_str(), "oplog"); + + let connection = super::load_connection_by_prefix_timestamp_key::( + &store, + &OpLogFilter { + time: Some(TimeRange { + start: Some(DateTime::from_timestamp_nanos(1)), + end: Some(DateTime::from_timestamp_nanos(10)), + }), + agent_id: Some("piglet".to_string()), + log_level: Some("Info".to_string()), + contents: Some("oplog".to_string()), + sensor: Some("src2".to_string()), + }, + None, + None, + None, + Some(10), + ) + .unwrap(); + assert_eq!(connection.edges.len(), 2); + assert_eq!(connection.edges[0].node.level.as_str(), "Info"); + assert_eq!(connection.edges[0].node.contents.as_str(), "oplog"); } fn insert_log_raw_event( @@ -528,20 +649,30 @@ fn insert_log_raw_event( store.append(&key, &value).unwrap(); } -fn insert_oplog_raw_event(store: &RawEventStore, agent_name: &str, timestamp: i64) { +async fn insert_oplog_raw_event( + store: &RawEventStore<'_, OpLog>, + agent_name: &str, + sensor: &str, + timestamp: i64, + generator: &OnceLock>, +) -> Vec { + let generator = generator.get_or_init(SequenceGenerator::init_generator); + let sequence_number = generator.generate_sequence_number().await; + let mut key: Vec = Vec::new(); let agent_id = format!("{agent_name}@src 1"); - key.extend_from_slice(agent_id.as_bytes()); - key.push(0); key.extend_from_slice(×tamp.to_be_bytes()); + key.push(0); + key.extend_from_slice(&sequence_number.to_be_bytes()); let oplog_body = OpLog { agent_name: agent_id.to_string(), log_level: OpLogLevel::Info, contents: "oplog".to_string(), + sensor: sensor.to_string(), }; let value = bincode::serialize(&oplog_body).unwrap(); - store.append(&key, &value).unwrap(); + key } diff --git a/src/ingest.rs b/src/ingest.rs index fbced5af..fe4816f3 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -1,7 +1,9 @@ +pub mod generation; pub mod implement; #[cfg(test)] mod tests; +use std::sync::OnceLock; use std::{ net::SocketAddr, sync::{ @@ -39,6 +41,7 @@ use tokio::{ use tracing::{error, info}; use x509_parser::nom::AsBytes; +use crate::ingest::generation::SequenceGenerator; use crate::publish::send_direct_stream; use crate::server::{ config_server, extract_cert_from_conn, subject_from_cert_verbose, Certs, @@ -54,10 +57,12 @@ const CHANNEL_CLOSE_MESSAGE: &[u8; 12] = b"channel done"; const CHANNEL_CLOSE_TIMESTAMP: i64 = -1; const NO_TIMESTAMP: i64 = 0; const SOURCE_INTERVAL: u64 = 60 * 60 * 24; -const INGEST_VERSION_REQ: &str = ">=0.21.0,<0.23.0"; +const INGEST_VERSION_REQ: &str = ">=0.23.0-alpha.1,<0.24.0"; type SourceInfo = (String, DateTime, ConnState, bool); +static GENERATOR: OnceLock> = OnceLock::new(); + enum ConnState { Connected, Disconnected, @@ -865,7 +870,7 @@ async fn handle_data( let mut packet_size = 0_u64; #[cfg(feature = "benchmark")] let mut packet_count = 0_u64; - for (timestamp, raw_event) in recv_buf { + for (timestamp, mut raw_event) in recv_buf { last_timestamp = timestamp; if (timestamp == CHANNEL_CLOSE_TIMESTAMP) && (raw_event.as_bytes() == CHANNEL_CLOSE_MESSAGE) @@ -879,16 +884,17 @@ async fn handle_data( } continue; } - let key_builder = StorageKey::builder().start_key(&source); - let key_builder = match raw_event_kind { + let storage_key = match raw_event_kind { RawEventKind::Log => { let Ok(log) = bincode::deserialize::(&raw_event) else { err_msg = Some("Failed to deserialize Log".to_string()); break; }; - key_builder + StorageKey::builder() + .start_key(&source) .mid_key(Some(log.kind.as_bytes().to_vec())) .end_key(timestamp) + .build() } RawEventKind::PeriodicTimeSeries => { let Ok(time_series) = @@ -901,25 +907,38 @@ async fn handle_data( StorageKey::builder() .start_key(&time_series.id) .end_key(timestamp) + .build() } RawEventKind::OpLog => { - let Ok(op_log) = bincode::deserialize::(&raw_event) else { + let Ok(mut op_log) = bincode::deserialize::(&raw_event) else { err_msg = Some("Failed to deserialize OpLog".to_string()); break; }; - let agent_id = format!("{}@{source}", op_log.agent_name); - StorageKey::builder() - .start_key(&agent_id) - .end_key(timestamp) + op_log.sensor.clone_from(&source); + let Ok(op_log) = bincode::serialize(&op_log) else { + err_msg = Some("Failed to serialize OpLog".to_string()); + break; + }; + raw_event.clone_from(&op_log); + + let generator = + GENERATOR.get_or_init(SequenceGenerator::init_generator); + let sequence_number = generator.generate_sequence_number().await; + StorageKey::timestamp_builder() + .start_key(timestamp) + .mid_key(sequence_number) + .build() } RawEventKind::Packet => { let Ok(packet) = bincode::deserialize::(&raw_event) else { err_msg = Some("Failed to deserialize Packet".to_string()); break; }; - key_builder + StorageKey::builder() + .start_key(&source) .mid_key(Some(timestamp.to_be_bytes().to_vec())) .end_key(packet.packet_timestamp) + .build() } RawEventKind::Statistics => { let Ok(statistics) = bincode::deserialize::(&raw_event) @@ -936,25 +955,31 @@ async fn handle_data( packet_count += t_packet_count; packet_size += t_packet_size; } - key_builder + StorageKey::builder() + .start_key(&source) .mid_key(Some(statistics.core.to_be_bytes().to_vec())) .end_key(timestamp) + .build() } RawEventKind::SecuLog => { let Ok(secu_log) = bincode::deserialize::(&raw_event) else { err_msg = Some("Failed to deserialize SecuLog".to_string()); break; }; - key_builder + StorageKey::builder() + .start_key(&source) .mid_key(Some(secu_log.kind.as_bytes().to_vec())) .end_key(timestamp) + .build() } - _ => key_builder.end_key(timestamp), + _ => StorageKey::builder() + .start_key(&source) + .end_key(timestamp) + .build(), }; recv_events_cnt += 1; recv_events_len += raw_event.len(); - let storage_key = key_builder.build(); store.append(&storage_key.key(), &raw_event)?; if let Some(network_key) = network_key.as_ref() { if let Err(e) = send_direct_stream( diff --git a/src/ingest/generation.rs b/src/ingest/generation.rs new file mode 100644 index 00000000..3dc088ef --- /dev/null +++ b/src/ingest/generation.rs @@ -0,0 +1,45 @@ +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; + +use chrono::{Datelike, Utc}; +use tokio::sync::RwLock; + +pub struct SequenceGenerator { + counter: AtomicUsize, + last_reset_date: RwLock, +} + +impl SequenceGenerator { + pub(crate) fn new() -> Self { + Self { + counter: AtomicUsize::new(0), + last_reset_date: RwLock::new(SequenceGenerator::get_current_date_time()), + } + } + + pub(crate) async fn generate_sequence_number(&self) -> usize { + let current_date_time = SequenceGenerator::get_current_date_time(); + { + let last_reset_day = self.last_reset_date.read().await; + if *last_reset_day == current_date_time { + return self.counter.fetch_add(1, Ordering::Relaxed); + } + } + { + let mut last_reset_day = self.last_reset_date.write().await; + self.counter.store(1, Ordering::Release); + *last_reset_day = current_date_time; + } + self.counter.fetch_add(1, Ordering::Relaxed) + } + + pub(crate) fn init_generator() -> Arc { + let generator = SequenceGenerator::new(); + Arc::new(generator) + } + + pub(crate) fn get_current_date_time() -> u32 { + let utc_now = Utc::now(); + utc_now.day() + } +} diff --git a/src/ingest/implement.rs b/src/ingest/implement.rs index 671ced99..8a1e8051 100644 --- a/src/ingest/implement.rs +++ b/src/ingest/implement.rs @@ -305,6 +305,13 @@ impl EventFilter for OpLog { fn log_contents(&self) -> Option { Some(self.contents.clone()) } + fn agent_id(&self) -> Option { + Some(self.agent_name.clone()) + } + + fn source(&self) -> Option { + Some(self.sensor.clone()) + } } impl EventFilter for PeriodicTimeSeries { diff --git a/src/ingest/tests.rs b/src/ingest/tests.rs index c7d26fee..ddf1cbb3 100644 --- a/src/ingest/tests.rs +++ b/src/ingest/tests.rs @@ -50,7 +50,7 @@ const KEY_PATH: &str = "tests/certs/node1/key.pem"; const CA_CERT_PATH: &str = "tests/certs/ca_cert.pem"; const HOST: &str = "node1"; const TEST_PORT: u16 = 60190; -const PROTOCOL_VERSION: &str = "0.22.1"; +const PROTOCOL_VERSION: &str = "0.23.0-alpha.1"; struct TestClient { conn: Connection, @@ -640,6 +640,7 @@ async fn op_log() { let (mut send_op_log, _) = client.conn.open_bi().await.expect("failed to open stream"); let op_log_body = OpLog { + sensor: String::new(), agent_name: "giganto".to_string(), log_level: OpLogLevel::Info, contents: "op_log".to_string(), diff --git a/src/peer.rs b/src/peer.rs index 12ec5c32..5171ac48 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -40,7 +40,7 @@ use crate::{ IngestSources, }; -const PEER_VERSION_REQ: &str = ">=0.21.0,<0.23.0"; +const PEER_VERSION_REQ: &str = ">=0.23.0-alpha.1,<0.24.0"; const PEER_RETRY_INTERVAL: u64 = 5; pub type Peers = Arc>>; @@ -761,7 +761,7 @@ pub mod tests { const CA_CERT_PATH: &str = "tests/certs/ca_cert.pem"; const HOST: &str = "node1"; const TEST_PORT: u16 = 60191; - const PROTOCOL_VERSION: &str = "0.22.1"; + const PROTOCOL_VERSION: &str = "0.23.0-alpha.1"; pub struct TestClient { send: SendStream, diff --git a/src/publish.rs b/src/publish.rs index 09f29fed..cc37ae3a 100644 --- a/src/publish.rs +++ b/src/publish.rs @@ -60,7 +60,7 @@ use crate::server::{ use crate::storage::{Database, Direction, RawEventStore, StorageKey}; use crate::{IngestSources, PcapSources, StreamDirectChannels}; -const PUBLISH_VERSION_REQ: &str = ">=0.21.0,<0.23.0"; +const PUBLISH_VERSION_REQ: &str = ">=0.23.0-alpha.1,<0.24.0"; pub struct Server { server_config: ServerConfig, diff --git a/src/publish/tests.rs b/src/publish/tests.rs index 3645b233..713f79c8 100644 --- a/src/publish/tests.rs +++ b/src/publish/tests.rs @@ -48,7 +48,7 @@ fn get_token() -> &'static Mutex { } const CA_CERT_PATH: &str = "tests/certs/ca_cert.pem"; -const PROTOCOL_VERSION: &str = "0.22.1"; +const PROTOCOL_VERSION: &str = "0.23.0-alpha.1"; const NODE1_CERT_PATH: &str = "tests/certs/node1/cert.pem"; const NODE1_KEY_PATH: &str = "tests/certs/node1/key.pem"; diff --git a/src/storage.rs b/src/storage.rs index 3bb8d14f..ee328dc0 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -665,6 +665,10 @@ impl StorageKey { StorageKeyBuilder::default() } + pub fn timestamp_builder() -> StorageTimestampKeyBuilder { + StorageTimestampKeyBuilder::default() + } + pub fn key(self) -> Vec { self.0 } @@ -676,6 +680,10 @@ pub trait KeyExtractor { fn get_range_end_key(&self) -> (Option>, Option>); } +pub trait TimestampKeyExtractor { + fn get_range_start_key(&self) -> (Option>, Option>); +} + #[allow(clippy::module_name_repetitions)] #[derive(Default, Debug, Clone)] pub struct StorageKeyBuilder { @@ -748,6 +756,73 @@ impl StorageKeyBuilder { } } +#[allow(clippy::module_name_repetitions)] +#[derive(Default, Debug, Clone)] +pub struct StorageTimestampKeyBuilder { + pre_key: Vec, +} + +impl StorageTimestampKeyBuilder { + pub fn start_key(mut self, key: i64) -> Self { + self.pre_key.reserve(TIMESTAMP_SIZE); + self.pre_key.extend_from_slice(&key.to_be_bytes()); + self.pre_key.push(0); + self + } + + pub fn mid_key(mut self, key: usize) -> Self { + let mid_key = key.to_be_bytes(); + self.pre_key.reserve(mid_key.len() + 1); + self.pre_key.extend_from_slice(&mid_key); + self.pre_key.push(0); + self + } + + pub fn lower_closed_bound_end_key(mut self, time: Option>) -> Self { + self.pre_key.reserve(TIMESTAMP_SIZE + 1); + let ns = if let Some(time) = time { + time.timestamp_nanos_opt().unwrap_or(i64::MAX) + } else { + 0 + }; + self.pre_key.extend_from_slice(&ns.to_be_bytes()); + self.pre_key.push(0); + self + } + + pub fn upper_open_bound_end_key(mut self, time: Option>) -> Self { + self.pre_key.reserve(TIMESTAMP_SIZE + 1); + let ns = if let Some(time) = time { + time.timestamp_nanos_opt().unwrap_or(i64::MAX) + } else { + i64::MAX + }; + self.pre_key.extend_from_slice(&ns.to_be_bytes()); + self.pre_key.push(0); + self + } + + pub fn upper_closed_bound_end_key(mut self, time: Option>) -> Self { + self.pre_key.reserve(TIMESTAMP_SIZE + 1); + if let Some(time) = time { + let ns = time.timestamp_nanos_opt().unwrap_or(i64::MAX); + if let Some(ns) = ns.checked_sub(1) { + if ns >= 0 { + self.pre_key.extend_from_slice(&ns.to_be_bytes()); + return self; + } + } + } + self.pre_key.extend_from_slice(&i64::MAX.to_be_bytes()); + self.pre_key.push(0); + self + } + + pub fn build(self) -> StorageKey { + StorageKey(self.pre_key) + } +} + pub type KeyValue = (Box<[u8]>, T); pub type RawValue = (Box<[u8]>, Box<[u8]>); diff --git a/src/storage/migration.rs b/src/storage/migration.rs index e7c5cd44..717e731b 100644 --- a/src/storage/migration.rs +++ b/src/storage/migration.rs @@ -1,6 +1,7 @@ //! Routines to check the database format version and migrate it if necessary. mod migration_structures; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::{ fs::{create_dir_all, File}, io::{Read, Write}, @@ -17,16 +18,17 @@ use self::migration_structures::{ ConnBeforeV21, HttpFromV12BeforeV21, NtlmBeforeV21, SmtpBeforeV21, SshBeforeV21, TlsBeforeV21, }; use super::Database; +use crate::storage::migration::migration_structures::OpLogBeforeV23; use crate::{ graphql::TIMESTAMP_SIZE, ingest::implement::EventFilter, storage::{ - Conn as ConnFromV21, Http as HttpFromV21, Ntlm as NtlmFromV21, RawEventStore, - Smtp as SmtpFromV21, Ssh as SshFromV21, StorageKey, Tls as TlsFromV21, + Conn as ConnFromV21, Http as HttpFromV21, Ntlm as NtlmFromV21, OpLog as OpLogFromV23, + RawEventStore, Smtp as SmtpFromV21, Ssh as SshFromV21, StorageKey, Tls as TlsFromV21, }, }; -const COMPATIBLE_VERSION_REQ: &str = ">=0.21.0,<0.23.0"; +const COMPATIBLE_VERSION_REQ: &str = ">=0.23.0-alpha.1,<0.24.0"; /// Migrates the data directory to the up-to-date format if necessary. /// @@ -52,6 +54,11 @@ pub fn migrate_data_dir(data_dir: &Path, db: &Database) -> Result<()> { Version::parse("0.21.0").expect("valid version"), migrate_0_19_to_0_21_0, ), + ( + VersionReq::parse(">=0.21.0,<0.23.0").expect("valid version requirement"), + Version::parse("0.23.0").expect("valid version"), + migrate_0_21_0_to_0_23_0, + ), ]; while let Some((_req, to, m)) = migration @@ -216,6 +223,46 @@ fn migrate_0_19_to_0_21_0(db: &Database) -> Result<()> { Ok(()) } +fn migrate_0_21_0_to_0_23_0(db: &Database) -> Result<()> { + info!("start migration for oplog"); + let store = db.op_log_store()?; + let counter = AtomicUsize::new(0); + + for raw_event in store.iter_forward() { + let Ok((key, value)) = raw_event else { + continue; + }; + + let (Ok(timestamp), Ok(old)) = ( + get_timestamp_from_key(&key), + bincode::deserialize::(&value), + ) else { + continue; + }; + + if key.len() > TIMESTAMP_SIZE + 1 { + let old_start_key = String::from_utf8_lossy(&key[..(key.len() - (TIMESTAMP_SIZE + 1))]); + let split_start_key: Vec<_> = old_start_key.split('@').collect(); + let mut convert_new: OpLogFromV23 = old.into(); + let Some(sensor) = split_start_key.get(1) else { + continue; + }; + convert_new.sensor.clone_from(&(*sensor).to_string()); + let new = bincode::serialize(&convert_new)?; + + let storage_key = StorageKey::timestamp_builder() + .start_key(timestamp) + .mid_key(counter.fetch_add(1, Ordering::Relaxed)) + .build(); + + store.append(&storage_key.key(), &new)?; + store.delete(&key)?; + } + } + info!("oplog migration complete"); + Ok(()) +} + fn migrate_netflow(store: &RawEventStore<'_, T>) -> Result<()> where T: DeserializeOwned + EventFilter, @@ -252,25 +299,42 @@ fn get_timestamp_from_key(key: &[u8]) -> Result { #[cfg(test)] mod tests { + use std::fs; + use std::fs::File; + use std::io::Write; use std::net::IpAddr; + use std::path::PathBuf; use chrono::Utc; + use giganto_client::ingest::log::OpLogLevel; use giganto_client::ingest::{ log::SecuLog, netflow::{Netflow5, Netflow9}, }; use semver::{Version, VersionReq}; + use tempfile::TempDir; use super::COMPATIBLE_VERSION_REQ; + use crate::storage::migration::migration_structures::OpLogBeforeV23; use crate::storage::{ + migrate_data_dir, migration::migration_structures::{ ConnBeforeV21, HttpFromV12BeforeV21, NtlmBeforeV21, SmtpBeforeV21, SshBeforeV21, TlsBeforeV21, }, Conn as ConnFromV21, Database, DbOptions, Http as HttpFromV21, Ntlm as NtlmFromV21, - Smtp as SmtpFromV21, Ssh as SshFromV21, StorageKey, Tls as TlsFromV21, + OpLog as OpLogFromV23, Smtp as SmtpFromV21, Ssh as SshFromV21, StorageKey, + Tls as TlsFromV21, }; + fn mock_version_file(dir: &TempDir, version_content: &str) -> PathBuf { + let version_path = dir.path().join("VERSION"); + let mut file = File::create(&version_path).expect("Failed to create VERSION file"); + file.write_all(version_content.as_bytes()) + .expect("Failed to write version"); + version_path + } + #[test] fn version() { let compatible = VersionReq::parse(COMPATIBLE_VERSION_REQ).expect("valid semver"); @@ -755,4 +819,66 @@ mod tests { }; assert_eq!(new_tls, store_tls); } + + #[test] + fn migrate_0_21_to_0_23_0() { + const TEST_TIMESTAMP: i64 = 1000; + + let db_dir = tempfile::tempdir().unwrap(); + let db = Database::open(db_dir.path(), &DbOptions::default()).unwrap(); + let op_log_store = db.op_log_store().unwrap(); + + let old_op_log = OpLogBeforeV23 { + agent_name: "local".to_string(), + log_level: OpLogLevel::Info, + contents: "test".to_string(), + }; + + let serialized_old_op_log = bincode::serialize(&old_op_log).unwrap(); + let op_log_old_key = StorageKey::builder() + .start_key("local@sr1") + .end_key(TEST_TIMESTAMP) + .build() + .key(); + + op_log_store + .append(&op_log_old_key, &serialized_old_op_log) + .unwrap(); + + super::migrate_0_21_0_to_0_23_0(&db).unwrap(); + + let count = op_log_store.iter_forward().count(); + assert_eq!(count, 1); + + for log in op_log_store.iter_forward() { + let Ok((_key, value)) = log else { + continue; + }; + + let Ok(oplog) = bincode::deserialize::(&value) else { + continue; + }; + + assert_eq!(oplog.sensor, "sr1".to_string()); + assert_eq!(oplog.agent_name, "local".to_string()); + } + } + + #[test] + fn migrate_data_dir_version_test() { + let version_dir = tempfile::tempdir().unwrap(); + let db_dir = tempfile::tempdir().unwrap(); + let db = Database::open(db_dir.path(), &DbOptions::default()).unwrap(); + + mock_version_file(&version_dir, "0.13.0"); + + let result = migrate_data_dir(version_dir.path(), &db); + assert!(result.is_ok()); + + if let Ok(updated_version) = fs::read_to_string(version_dir.path().join("VERSION")) { + let current = Version::parse(env!("CARGO_PKG_VERSION")).expect("valid semver"); + let diff = Version::parse(&updated_version).expect("valid semver"); + assert_eq!(current, diff) + } + } } diff --git a/src/storage/migration/migration_structures.rs b/src/storage/migration/migration_structures.rs index 319000f9..3de68ae2 100644 --- a/src/storage/migration/migration_structures.rs +++ b/src/storage/migration/migration_structures.rs @@ -1,10 +1,11 @@ use std::net::IpAddr; +use giganto_client::ingest::log::OpLogLevel; use serde::{Deserialize, Serialize}; use crate::storage::{ - Conn as ConnFromV21, Http as HttpFromV21, Ntlm as NtlmFromV21, Smtp as SmtpFromV21, - Ssh as SshFromV21, Tls as TlsFromV21, + Conn as ConnFromV21, Http as HttpFromV21, Ntlm as NtlmFromV21, OpLog as OpLogFromV23, + Smtp as SmtpFromV21, Ssh as SshFromV21, Tls as TlsFromV21, }; #[derive(Debug, Deserialize, Serialize, Eq, PartialEq)] pub struct HttpFromV12BeforeV21 { @@ -284,3 +285,21 @@ impl From for TlsFromV21 { } } } + +#[derive(Deserialize, Serialize)] +pub struct OpLogBeforeV23 { + pub agent_name: String, + pub log_level: OpLogLevel, + pub contents: String, +} + +impl From for OpLogFromV23 { + fn from(input: OpLogBeforeV23) -> Self { + Self { + sensor: String::new(), + agent_name: input.agent_name, + log_level: input.log_level, + contents: input.contents, + } + } +}