Skip to content

Commit

Permalink
Change OpLogFilter::agent_id to be optional
Browse files Browse the repository at this point in the history
Close #724

The RocksDB keys in the `oplog` are currently in the form agent_id-0-timestamp.
This change modifies the keys to the form timestamp-0-sequence.
  • Loading branch information
henry0715-dev committed Nov 20, 2024
1 parent cbd7878 commit d868535
Show file tree
Hide file tree
Showing 15 changed files with 710 additions and 82 deletions.
16 changes: 16 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@ Versioning](https://semver.org/spec/v2.0.0.html).

- Added GraphQL API `csvFormattedRawEvents` that returns the values of raw
events of the request protocol in csv format String, delimited by tab.
- Added `sensor` field to `OpLog`.
- Added the `load_connection_by_prefix_timestamp_key` function and
`TimestampKeyExtractor` trait to enable querying of keys prefixed with
`timestamp`.
- The `opLogRawEvents` GraphQL API no longer requires `agentId` and now
accepts it as an optional parameter. Additionally, the API response now
includes logs from all agents displayed in chronological order, rather than
being limited to the logs of a single agent.

### Changed

Expand All @@ -29,6 +37,14 @@ Versioning](https://semver.org/spec/v2.0.0.html).
- The `sourceId` field in the `export` GraphQL API is renamed to `sensorId`.
- The `source` field in the filter parameters of all GraphQL APIs is changed
to `sensor`.
- Update the compatibility version of the quic communication modules.
- Changed `PEER_VERSION_REQ` to ">=0.23.0-alpha.1,<0.24.0".
- Changed `INGEST_VERSION_REQ` to ">=0.23.0-alpha.1,<0.24.0".
- Changed `PUBLISH_VERSION_REQ` to ">=0.23.0-alpha.1,<0.24.0".
- Fixed code related to migration.
- Changed `COMPATIBLE_VERSION_REQ` to ">=0.23.0-alpha.1,<0.24.0
- Added migration function in `migrate_0_21_to_0_23_0`. This function performs
a migration to change the `key`, `value` of `Oplog`.

### Removed

Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "giganto"
version = "0.23.0-alpha.1"
version = "0.23.0-alpha.2"
edition = "2021"

[lib]
Expand Down
160 changes: 160 additions & 0 deletions src/graphql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,125 @@ where
Ok((records, has_previous, has_next))
}

#[allow(clippy::too_many_lines)]
fn get_connection_by_prefix_timestamp_key<T>(
store: &RawEventStore<'_, T>,
filter: &(impl RawEventFilter + TimestampKeyExtractor),
after: Option<String>,
before: Option<String>,
first: Option<usize>,
last: Option<usize>,
) -> Result<ConnArgs<T>>
where
T: DeserializeOwned + EventFilter,
{
let (records, has_previous, has_next) = if let Some(before) = before {
if after.is_some() {
return Err("cannot use both `after` and `before`".into());

Check warning on line 379 in src/graphql.rs

View check run for this annotation

Codecov / codecov/patch

src/graphql.rs#L379

Added line #L379 was not covered by tests
}
if first.is_some() {
return Err("'before' and 'first' cannot be specified simultaneously".into());

Check warning on line 382 in src/graphql.rs

View check run for this annotation

Codecov / codecov/patch

src/graphql.rs#L382

Added line #L382 was not covered by tests
}

let last = last.unwrap_or(MAXIMUM_PAGE_SIZE).min(MAXIMUM_PAGE_SIZE);
let cursor = base64_engine.decode(before)?;

// generate storage search key
let key_builder = StorageKey::timestamp_builder();
let from_key = key_builder
.clone()
.upper_open_bound_end_key(filter.get_range_start_key().1)
.build();
let to_key = key_builder
.lower_closed_bound_end_key(filter.get_range_start_key().0)
.build();

if cursor.cmp(&from_key.key()) == std::cmp::Ordering::Greater {
return Err("invalid cursor".into());

Check warning on line 399 in src/graphql.rs

View check run for this annotation

Codecov / codecov/patch

src/graphql.rs#L399

Added line #L399 was not covered by tests
}
let mut iter = store
.boundary_iter(&cursor, &to_key.key(), Direction::Reverse)
.peekable();
if let Some(Ok((key, _))) = iter.peek() {
if key.as_ref() == cursor {
iter.next();
}
}

Check warning on line 408 in src/graphql.rs

View check run for this annotation

Codecov / codecov/patch

src/graphql.rs#L408

Added line #L408 was not covered by tests
let (mut records, has_previous) = collect_records(iter, last, filter);
records.reverse();
(records, has_previous, false)
} else if let Some(after) = after {
if before.is_some() {
return Err("cannot use both `after` and `before`".into());

Check warning on line 414 in src/graphql.rs

View check run for this annotation

Codecov / codecov/patch

src/graphql.rs#L414

Added line #L414 was not covered by tests
}
if last.is_some() {
return Err("'after' and 'last' cannot be specified simultaneously".into());

Check warning on line 417 in src/graphql.rs

View check run for this annotation

Codecov / codecov/patch

src/graphql.rs#L417

Added line #L417 was not covered by tests
}
let first = first.unwrap_or(MAXIMUM_PAGE_SIZE).min(MAXIMUM_PAGE_SIZE);
let cursor = base64_engine.decode(after)?;

// generate storage search key
let key_builder = StorageKey::timestamp_builder();
let from_key = key_builder
.clone()
.lower_closed_bound_end_key(filter.get_range_start_key().0)
.build();
let to_key = key_builder
.upper_open_bound_end_key(filter.get_range_start_key().1)
.build();

if cursor.cmp(&from_key.key()) == std::cmp::Ordering::Less {
return Err("invalid cursor".into());

Check warning on line 433 in src/graphql.rs

View check run for this annotation

Codecov / codecov/patch

src/graphql.rs#L433

Added line #L433 was not covered by tests
}
let mut iter = store
.boundary_iter(&cursor, &to_key.key(), Direction::Forward)
.peekable();
if let Some(Ok((key, _))) = iter.peek() {
if key.as_ref() == cursor {
iter.next();
}
}

Check warning on line 442 in src/graphql.rs

View check run for this annotation

Codecov / codecov/patch

src/graphql.rs#L442

Added line #L442 was not covered by tests
let (records, has_next) = collect_records(iter, first, filter);
(records, false, has_next)
} else if let Some(last) = last {
if first.is_some() {
return Err("first and last cannot be used together".into());

Check warning on line 447 in src/graphql.rs

View check run for this annotation

Codecov / codecov/patch

src/graphql.rs#L447

Added line #L447 was not covered by tests
}
let last = last.min(MAXIMUM_PAGE_SIZE);

// generate storage search key
let key_builder = StorageKey::timestamp_builder();
let from_key = key_builder
.clone()
.upper_closed_bound_end_key(filter.get_range_start_key().1)
.build();
let to_key = key_builder
.lower_closed_bound_end_key(filter.get_range_start_key().0)
.build();

let iter = store.boundary_iter(&from_key.key(), &to_key.key(), Direction::Reverse);
let (mut records, has_previous) = collect_records(iter, last, filter);
records.reverse();
(records, has_previous, false)
} else {
let first = first.unwrap_or(MAXIMUM_PAGE_SIZE).min(MAXIMUM_PAGE_SIZE);
// generate storage search key
let key_builder = StorageKey::timestamp_builder();
let from_key = key_builder
.clone()
.lower_closed_bound_end_key(filter.get_range_start_key().0)
.build();
let to_key = key_builder
.upper_open_bound_end_key(filter.get_range_start_key().1)
.build();

let iter = store.boundary_iter(&from_key.key(), &to_key.key(), Direction::Forward);
let (records, has_next) = collect_records(iter, first, filter);
(records, false, has_next)
};
Ok((records, has_previous, has_next))
}

fn load_connection<N, T>(
store: &RawEventStore<'_, T>,
filter: &(impl RawEventFilter + KeyExtractor),
Expand All @@ -377,6 +496,37 @@ where
let (records, has_previous, has_next) =
get_connection(store, filter, after, before, first, last)?;

create_connection(records, has_previous, has_next)
}

fn load_connection_by_prefix_timestamp_key<N, T>(
store: &RawEventStore<'_, T>,
filter: &(impl RawEventFilter + TimestampKeyExtractor),
after: Option<String>,
before: Option<String>,
first: Option<usize>,
last: Option<usize>,
) -> Result<Connection<String, N>>
where
N: FromKeyValue<T> + OutputType,
T: DeserializeOwned + EventFilter,
{
let (records, has_previous, has_next) =
get_connection_by_prefix_timestamp_key(store, filter, after, before, first, last)?;

create_connection(records, has_previous, has_next)
}

#[allow(clippy::unnecessary_wraps)]
fn create_connection<N, T>(
records: Vec<(Box<[u8]>, T)>,
has_previous: bool,
has_next: bool,
) -> Result<Connection<String, N>>
where
N: FromKeyValue<T> + OutputType,
T: DeserializeOwned,
{
let mut connection: Connection<String, N> = Connection::new(has_previous, has_next);
connection.edges = records
.into_iter()
Expand Down Expand Up @@ -437,6 +587,14 @@ where
(records, has_more)
}

pub fn get_timestamp_from_key_prefix(key: &[u8]) -> Result<DateTime<Utc>, anyhow::Error> {
if key.len() > TIMESTAMP_SIZE {
let timestamp = i64::from_be_bytes(key[0..TIMESTAMP_SIZE].try_into()?);
return Ok(Utc.timestamp_nanos(timestamp));
}
Err(anyhow!("invalid database key length"))

Check warning on line 595 in src/graphql.rs

View check run for this annotation

Codecov / codecov/patch

src/graphql.rs#L594-L595

Added lines #L594 - L595 were not covered by tests
}

pub fn get_timestamp_from_key(key: &[u8]) -> Result<DateTime<Utc>, anyhow::Error> {
if key.len() > TIMESTAMP_SIZE {
let nanos = i64::from_be_bytes(key[(key.len() - TIMESTAMP_SIZE)..].try_into()?);
Expand Down Expand Up @@ -1578,6 +1736,8 @@ macro_rules! impl_from_giganto_search_filter_for_graphql_client {
}
pub(crate) use impl_from_giganto_search_filter_for_graphql_client;

use crate::storage::TimestampKeyExtractor;

#[cfg(test)]
mod tests {
use std::collections::{HashMap, HashSet};
Expand Down
2 changes: 2 additions & 0 deletions src/graphql/client/schema/op_log_raw_events.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ query OpLogRawEvents($filter: OpLogFilter!, $after: String, $before: String, $fi
timestamp
level
contents
agentName
sensor
}
}
}
Expand Down
5 changes: 4 additions & 1 deletion src/graphql/client/schema/schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -980,7 +980,8 @@ type NtlmRawEventEdge {

input OpLogFilter {
time: TimeRange
agentId: String!
sensor: String
agentId: String
logLevel: String
contents: String
}
Expand All @@ -989,6 +990,8 @@ type OpLogRawEvent {
timestamp: DateTime!
level: String!
contents: String!
agentName: String!
sensor: String!
}

type OpLogRawEventConnection {
Expand Down
27 changes: 19 additions & 8 deletions src/graphql/export/tests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::mem;
use std::net::IpAddr;
use std::sync::{Arc, OnceLock};

use chrono::{Duration, Utc};
use giganto_client::ingest::{
Expand All @@ -12,6 +13,7 @@ use giganto_client::ingest::{
};

use crate::graphql::tests::TestSchema;
use crate::ingest::generation::SequenceGenerator;
use crate::storage::RawEventStore;

#[tokio::test]
Expand Down Expand Up @@ -907,9 +909,10 @@ fn insert_time_series(
async fn export_op_log() {
let schema = TestSchema::new();
let store = schema.db.op_log_store().unwrap();
let generator: OnceLock<Arc<SequenceGenerator>> = OnceLock::new();

insert_op_log_raw_event(&store, "agent1", 1);
insert_op_log_raw_event(&store, "agent2", 1);
insert_op_log_raw_event(&store, "agent1", "src1", 1, &generator).await;
insert_op_log_raw_event(&store, "agent2", "src1", 1, &generator).await;

// export csv file
let query = r#"
Expand Down Expand Up @@ -938,16 +941,24 @@ async fn export_op_log() {
assert!(res.data.to_string().contains("op_log"));
}

fn insert_op_log_raw_event(store: &RawEventStore<OpLog>, agent_name: &str, timestamp: i64) {
async fn insert_op_log_raw_event(
store: &RawEventStore<'_, OpLog>,
agent_name: &str,
sensor: &str,
timestamp: i64,
generator: &OnceLock<Arc<SequenceGenerator>>,
) {
let generator = generator.get_or_init(SequenceGenerator::init_generator);
let sequence_number = generator.generate_sequence_number().await;

let mut key: Vec<u8> = Vec::new();
let agent_id = format!("{agent_name}@src1");
key.extend_from_slice(agent_id.as_bytes());
key.push(0);
key.extend_from_slice(&timestamp.to_be_bytes());
key.push(0);
key.extend_from_slice(&sequence_number.to_be_bytes());

let op_log_body = OpLog {
sensor: "sensor".to_string(),
agent_name: agent_id.to_string(),
sensor: sensor.to_string(),
agent_name: agent_name.to_string(),
log_level: OpLogLevel::Info,
contents: "op_log".to_string(),
};
Expand Down
Loading

0 comments on commit d868535

Please sign in to comment.