Skip to content

Commit

Permalink
Change OpLogFilter::agent_id to be optional
Browse files Browse the repository at this point in the history
Close #724

The RocksDB keys in the `oplog` are currently in the form agent_id-0-timestamp.
This change modifies the keys to the form timestamp-0-sequence.
  • Loading branch information
henry0715-dev committed Nov 7, 2024
1 parent 3096144 commit fe5bc59
Show file tree
Hide file tree
Showing 16 changed files with 673 additions and 75 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Added

- Added `sensor` field to `OpLog`.
- Added the `load_connection_by_prefix_timestamp_key` function and
`TimestampKeyExtractor` trait to enable querying of keys prefixed with
`timestamp`.

### Changed

- Remote configuration is no longer stored in a temporary file, nor does it
Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ data-encoding = "2.4"
deluxe = "0.5"
directories = "5.0"
futures-util = "0.3"
giganto-client = { git = "https://github.com/aicers/giganto-client.git", tag = "0.20.0" }
giganto-client = { git = "https://github.com/aicers/giganto-client.git", rev = "5d6148"} # TODO modify tag version
graphql_client = "0.14"
humantime = "2"
humantime-serde = "1"
Expand Down
160 changes: 160 additions & 0 deletions src/graphql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,125 @@ where
Ok((records, has_previous, has_next))
}

#[allow(clippy::too_many_lines)]
fn get_connection_by_prefix_timestamp_key<T>(
store: &RawEventStore<'_, T>,
filter: &(impl RawEventFilter + TimestampKeyExtractor),
after: Option<String>,
before: Option<String>,
first: Option<usize>,
last: Option<usize>,
) -> Result<ConnArgs<T>>
where
T: DeserializeOwned + EventFilter,
{
let (records, has_previous, has_next) = if let Some(before) = before {
if after.is_some() {
return Err("cannot use both `after` and `before`".into());

Check warning on line 379 in src/graphql.rs

View check run for this annotation

Codecov / codecov/patch

src/graphql.rs#L379

Added line #L379 was not covered by tests
}
if first.is_some() {
return Err("'before' and 'first' cannot be specified simultaneously".into());

Check warning on line 382 in src/graphql.rs

View check run for this annotation

Codecov / codecov/patch

src/graphql.rs#L382

Added line #L382 was not covered by tests
}

let last = last.unwrap_or(MAXIMUM_PAGE_SIZE).min(MAXIMUM_PAGE_SIZE);
let cursor = base64_engine.decode(before)?;

// generate storage search key
let key_builder = StorageKey::timestamp_builder();
let from_key = key_builder
.clone()
.upper_open_bound_end_key(filter.get_range_start_key().1)
.build();
let to_key = key_builder
.lower_closed_bound_end_key(filter.get_range_start_key().0)
.build();

if cursor.cmp(&from_key.key()) == std::cmp::Ordering::Greater {
return Err("invalid cursor".into());

Check warning on line 399 in src/graphql.rs

View check run for this annotation

Codecov / codecov/patch

src/graphql.rs#L399

Added line #L399 was not covered by tests
}
let mut iter = store
.boundary_iter(&cursor, &to_key.key(), Direction::Reverse)
.peekable();
if let Some(Ok((key, _))) = iter.peek() {
if key.as_ref() == cursor {
iter.next();
}
}

Check warning on line 408 in src/graphql.rs

View check run for this annotation

Codecov / codecov/patch

src/graphql.rs#L408

Added line #L408 was not covered by tests
let (mut records, has_previous) = collect_records(iter, last, filter);
records.reverse();
(records, has_previous, false)
} else if let Some(after) = after {
if before.is_some() {
return Err("cannot use both `after` and `before`".into());

Check warning on line 414 in src/graphql.rs

View check run for this annotation

Codecov / codecov/patch

src/graphql.rs#L414

Added line #L414 was not covered by tests
}
if last.is_some() {
return Err("'after' and 'last' cannot be specified simultaneously".into());

Check warning on line 417 in src/graphql.rs

View check run for this annotation

Codecov / codecov/patch

src/graphql.rs#L417

Added line #L417 was not covered by tests
}
let first = first.unwrap_or(MAXIMUM_PAGE_SIZE).min(MAXIMUM_PAGE_SIZE);
let cursor = base64_engine.decode(after)?;

// generate storage search key
let key_builder = StorageKey::timestamp_builder();
let from_key = key_builder
.clone()
.lower_closed_bound_end_key(filter.get_range_start_key().0)
.build();
let to_key = key_builder
.upper_open_bound_end_key(filter.get_range_start_key().1)
.build();

if cursor.cmp(&from_key.key()) == std::cmp::Ordering::Less {
return Err("invalid cursor".into());

Check warning on line 433 in src/graphql.rs

View check run for this annotation

Codecov / codecov/patch

src/graphql.rs#L433

Added line #L433 was not covered by tests
}
let mut iter = store
.boundary_iter(&cursor, &to_key.key(), Direction::Forward)
.peekable();
if let Some(Ok((key, _))) = iter.peek() {
if key.as_ref() == cursor {
iter.next();
}
}

Check warning on line 442 in src/graphql.rs

View check run for this annotation

Codecov / codecov/patch

src/graphql.rs#L442

Added line #L442 was not covered by tests
let (records, has_next) = collect_records(iter, first, filter);
(records, false, has_next)
} else if let Some(last) = last {
if first.is_some() {
return Err("first and last cannot be used together".into());

Check warning on line 447 in src/graphql.rs

View check run for this annotation

Codecov / codecov/patch

src/graphql.rs#L447

Added line #L447 was not covered by tests
}
let last = last.min(MAXIMUM_PAGE_SIZE);

// generate storage search key
let key_builder = StorageKey::timestamp_builder();
let from_key = key_builder
.clone()
.upper_closed_bound_end_key(filter.get_range_start_key().1)
.build();
let to_key = key_builder
.lower_closed_bound_end_key(filter.get_range_start_key().0)
.build();

let iter = store.boundary_iter(&from_key.key(), &to_key.key(), Direction::Reverse);
let (mut records, has_previous) = collect_records(iter, last, filter);
records.reverse();
(records, has_previous, false)
} else {
let first = first.unwrap_or(MAXIMUM_PAGE_SIZE).min(MAXIMUM_PAGE_SIZE);
// generate storage search key
let key_builder = StorageKey::timestamp_builder();
let from_key = key_builder
.clone()
.lower_closed_bound_end_key(filter.get_range_start_key().0)
.build();
let to_key = key_builder
.upper_open_bound_end_key(filter.get_range_start_key().1)
.build();

let iter = store.boundary_iter(&from_key.key(), &to_key.key(), Direction::Forward);
let (records, has_next) = collect_records(iter, first, filter);
(records, false, has_next)
};
Ok((records, has_previous, has_next))
}

fn load_connection<N, T>(
store: &RawEventStore<'_, T>,
filter: &(impl RawEventFilter + KeyExtractor),
Expand All @@ -377,6 +496,37 @@ where
let (records, has_previous, has_next) =
get_connection(store, filter, after, before, first, last)?;

create_connection(records, has_previous, has_next)
}

fn load_connection_by_prefix_timestamp_key<N, T>(
store: &RawEventStore<'_, T>,
filter: &(impl RawEventFilter + TimestampKeyExtractor),
after: Option<String>,
before: Option<String>,
first: Option<usize>,
last: Option<usize>,
) -> Result<Connection<String, N>>
where
N: FromKeyValue<T> + OutputType,
T: DeserializeOwned + EventFilter,
{
let (records, has_previous, has_next) =
get_connection_by_prefix_timestamp_key(store, filter, after, before, first, last)?;

create_connection(records, has_previous, has_next)
}

#[allow(clippy::unnecessary_wraps)]
fn create_connection<N, T>(
records: Vec<(Box<[u8]>, T)>,
has_previous: bool,
has_next: bool,
) -> Result<Connection<String, N>>
where
N: FromKeyValue<T> + OutputType,
T: DeserializeOwned,
{
let mut connection: Connection<String, N> = Connection::new(has_previous, has_next);
connection.edges = records
.into_iter()
Expand Down Expand Up @@ -437,6 +587,14 @@ where
(records, has_more)
}

pub fn get_timestamp_from_key_prefix(key: &[u8]) -> Result<DateTime<Utc>, anyhow::Error> {
if key.len() > TIMESTAMP_SIZE {
let timestamp = i64::from_be_bytes(key[0..8].try_into()?);
return Ok(Utc.timestamp_nanos(timestamp));
}
Err(anyhow!("invalid database key length"))

Check warning on line 595 in src/graphql.rs

View check run for this annotation

Codecov / codecov/patch

src/graphql.rs#L594-L595

Added lines #L594 - L595 were not covered by tests
}

pub fn get_timestamp_from_key(key: &[u8]) -> Result<DateTime<Utc>, anyhow::Error> {
if key.len() > TIMESTAMP_SIZE {
let nanos = i64::from_be_bytes(key[(key.len() - TIMESTAMP_SIZE)..].try_into()?);
Expand Down Expand Up @@ -1515,6 +1673,8 @@ macro_rules! impl_from_giganto_search_filter_for_graphql_client {
}
pub(crate) use impl_from_giganto_search_filter_for_graphql_client;

use crate::storage::TimestampKeyExtractor;

#[cfg(test)]
mod tests {
use std::collections::{HashMap, HashSet};
Expand Down
1 change: 1 addition & 0 deletions src/graphql/client/schema/op_log_raw_events.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ query OpLogRawEvents($filter: OpLogFilter!, $after: String, $before: String, $fi
timestamp
level
contents
agentName
}
}
}
Expand Down
5 changes: 4 additions & 1 deletion src/graphql/client/schema/schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -991,7 +991,8 @@ type NtlmRawEventEdge {

input OpLogFilter {
time: TimeRange
agentId: String!
sensor: String
agentId: String
logLevel: String
contents: String
}
Expand All @@ -1000,6 +1001,8 @@ type OpLogRawEvent {
timestamp: DateTime!
level: String!
contents: String!
agentName: String!
sensor: String!
}

type OpLogRawEventConnection {
Expand Down
26 changes: 19 additions & 7 deletions src/graphql/export/tests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::mem;
use std::net::IpAddr;
use std::sync::{Arc, OnceLock};

use chrono::{Duration, Utc};
use giganto_client::ingest::{
Expand All @@ -12,6 +13,7 @@ use giganto_client::ingest::{
};

use crate::graphql::tests::TestSchema;
use crate::ingest::generation::SequenceGenerator;
use crate::storage::RawEventStore;

#[tokio::test]
Expand Down Expand Up @@ -907,9 +909,10 @@ fn insert_time_series(
async fn export_op_log() {
let schema = TestSchema::new();
let store = schema.db.op_log_store().unwrap();
let generator: OnceLock<Arc<SequenceGenerator>> = OnceLock::new();

insert_op_log_raw_event(&store, "agent1", 1);
insert_op_log_raw_event(&store, "agent2", 1);
insert_op_log_raw_event(&store, "agent1", "src1", 1, &generator);
insert_op_log_raw_event(&store, "agent2", "src1", 1, &generator);

// export csv file
let query = r#"
Expand Down Expand Up @@ -938,17 +941,26 @@ async fn export_op_log() {
assert!(res.data.to_string().contains("op_log"));
}

fn insert_op_log_raw_event(store: &RawEventStore<OpLog>, agent_name: &str, timestamp: i64) {
fn insert_op_log_raw_event(
store: &RawEventStore<OpLog>,
agent_name: &str,
sensor: &str,
timestamp: i64,
generator: &OnceLock<Arc<SequenceGenerator>>,
) {
let generator = generator.get_or_init(SequenceGenerator::init_generator);
let sequence_number = generator.generate_sequence_number();

let mut key: Vec<u8> = Vec::new();
let agent_id = format!("{agent_name}@src1");
key.extend_from_slice(agent_id.as_bytes());
key.push(0);
key.extend_from_slice(&timestamp.to_be_bytes());
key.push(0);
key.extend_from_slice(&sequence_number.to_be_bytes());

let op_log_body = OpLog {
agent_name: agent_id.to_string(),
agent_name: agent_name.to_string(),
log_level: OpLogLevel::Info,
contents: "op_log".to_string(),
sensor: sensor.to_string(),
};

let value = bincode::serialize(&op_log_body).unwrap();
Expand Down
Loading

0 comments on commit fe5bc59

Please sign in to comment.