Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change OpLogFilter::agent_id to be optional #826

Merged
merged 1 commit into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,28 @@ Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Added

- Added `sensor` field to `OpLog`.
- Added the `load_connection_by_prefix_timestamp_key` function and
`TimestampKeyExtractor` trait to enable querying of keys prefixed with
`timestamp`.
- The `opLogRawEvents` GraphQL API no longer requires `agentId` and now
accepts it as an optional parameter. Additionally, the API response now
includes logs from all agents displayed in chronological order, rather than
being limited to the logs of a single agent.

sophie-cluml marked this conversation as resolved.
Show resolved Hide resolved
### Changed

- Rename the `csvFormattedRawEvents` GraphQL API to `tsvFormattedRawEvents`.
- Update the compatibility version of the quic communication modules.
- Changed `PEER_VERSION_REQ` to ">=0.24.0-alpha.1,<0.25.0".
- Changed `INGEST_VERSION_REQ` to ">=0.24.0-alpha.1,<0.25.0".
- Changed `PUBLISH_VERSION_REQ` to ">=0.24.0-alpha.1,<0.25.0".
- Modify the code related to migration.
- Changed `COMPATIBLE_VERSION_REQ` to ">=0.24.0-alpha.1,<0.25.0
- Added migration function in `migrate_0_23_0_to_0_24_0_op_log`. This function
performs a migration to change the `key`, `value` of `Oplog`.

## [0.23.0] - 2024-11-21

Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "giganto"
version = "0.23.0"
version = "0.24.0-alpha.1"
edition = "2021"

[lib]
Expand Down
160 changes: 160 additions & 0 deletions src/graphql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,125 @@
Ok((records, has_previous, has_next))
}

#[allow(clippy::too_many_lines)]
fn get_connection_by_prefix_timestamp_key<T>(
store: &RawEventStore<'_, T>,
filter: &(impl RawEventFilter + TimestampKeyExtractor),
after: Option<String>,
before: Option<String>,
first: Option<usize>,
last: Option<usize>,
) -> Result<ConnArgs<T>>
where
T: DeserializeOwned + EventFilter,
{
let (records, has_previous, has_next) = if let Some(before) = before {
if after.is_some() {
return Err("cannot use both `after` and `before`".into());

Check warning on line 379 in src/graphql.rs

View check run for this annotation

Codecov / codecov/patch

src/graphql.rs#L379

Added line #L379 was not covered by tests
}
if first.is_some() {
return Err("'before' and 'first' cannot be specified simultaneously".into());

Check warning on line 382 in src/graphql.rs

View check run for this annotation

Codecov / codecov/patch

src/graphql.rs#L382

Added line #L382 was not covered by tests
}

let last = last.unwrap_or(MAXIMUM_PAGE_SIZE).min(MAXIMUM_PAGE_SIZE);
let cursor = base64_engine.decode(before)?;

// generate storage search key
let key_builder = StorageKey::timestamp_builder();
let from_key = key_builder
.clone()
.upper_open_bound_start_key(filter.get_range_start_key().1)
.build();
let to_key = key_builder
.lower_closed_bound_start_key(filter.get_range_start_key().0)
.build();

if cursor.cmp(&from_key.key()) == std::cmp::Ordering::Greater {
return Err("invalid cursor".into());

Check warning on line 399 in src/graphql.rs

View check run for this annotation

Codecov / codecov/patch

src/graphql.rs#L399

Added line #L399 was not covered by tests
}
let mut iter = store
.boundary_iter(&cursor, &to_key.key(), Direction::Reverse)
.peekable();
if let Some(Ok((key, _))) = iter.peek() {
if key.as_ref() == cursor {
iter.next();
}
}

Check warning on line 408 in src/graphql.rs

View check run for this annotation

Codecov / codecov/patch

src/graphql.rs#L408

Added line #L408 was not covered by tests
let (mut records, has_previous) = collect_records(iter, last, filter);
records.reverse();
(records, has_previous, false)
} else if let Some(after) = after {
if before.is_some() {
return Err("cannot use both `after` and `before`".into());

Check warning on line 414 in src/graphql.rs

View check run for this annotation

Codecov / codecov/patch

src/graphql.rs#L414

Added line #L414 was not covered by tests
}
if last.is_some() {
return Err("'after' and 'last' cannot be specified simultaneously".into());

Check warning on line 417 in src/graphql.rs

View check run for this annotation

Codecov / codecov/patch

src/graphql.rs#L417

Added line #L417 was not covered by tests
}
let first = first.unwrap_or(MAXIMUM_PAGE_SIZE).min(MAXIMUM_PAGE_SIZE);
let cursor = base64_engine.decode(after)?;

// generate storage search key
let key_builder = StorageKey::timestamp_builder();
let from_key = key_builder
.clone()
.lower_closed_bound_start_key(filter.get_range_start_key().0)
.build();
let to_key = key_builder
.upper_open_bound_start_key(filter.get_range_start_key().1)
.build();

if cursor.cmp(&from_key.key()) == std::cmp::Ordering::Less {
return Err("invalid cursor".into());

Check warning on line 433 in src/graphql.rs

View check run for this annotation

Codecov / codecov/patch

src/graphql.rs#L433

Added line #L433 was not covered by tests
}
let mut iter = store
.boundary_iter(&cursor, &to_key.key(), Direction::Forward)
.peekable();
if let Some(Ok((key, _))) = iter.peek() {
if key.as_ref() == cursor {
iter.next();
}
}

Check warning on line 442 in src/graphql.rs

View check run for this annotation

Codecov / codecov/patch

src/graphql.rs#L442

Added line #L442 was not covered by tests
let (records, has_next) = collect_records(iter, first, filter);
(records, false, has_next)
} else if let Some(last) = last {
if first.is_some() {
return Err("first and last cannot be used together".into());

Check warning on line 447 in src/graphql.rs

View check run for this annotation

Codecov / codecov/patch

src/graphql.rs#L447

Added line #L447 was not covered by tests
}
let last = last.min(MAXIMUM_PAGE_SIZE);

// generate storage search key
let key_builder = StorageKey::timestamp_builder();
let from_key = key_builder
.clone()
.upper_closed_bound_start_key(filter.get_range_start_key().1)
.build();
let to_key = key_builder
.lower_closed_bound_start_key(filter.get_range_start_key().0)
.build();

let iter = store.boundary_iter(&from_key.key(), &to_key.key(), Direction::Reverse);
let (mut records, has_previous) = collect_records(iter, last, filter);
records.reverse();
(records, has_previous, false)
} else {
let first = first.unwrap_or(MAXIMUM_PAGE_SIZE).min(MAXIMUM_PAGE_SIZE);
// generate storage search key
let key_builder = StorageKey::timestamp_builder();
let from_key = key_builder
.clone()
.lower_closed_bound_start_key(filter.get_range_start_key().0)
.build();
let to_key = key_builder
.upper_open_bound_start_key(filter.get_range_start_key().1)
.build();

let iter = store.boundary_iter(&from_key.key(), &to_key.key(), Direction::Forward);
let (records, has_next) = collect_records(iter, first, filter);
(records, false, has_next)
};
Ok((records, has_previous, has_next))
}

fn load_connection<N, T>(
store: &RawEventStore<'_, T>,
filter: &(impl RawEventFilter + KeyExtractor),
Expand All @@ -377,6 +496,37 @@
let (records, has_previous, has_next) =
get_connection(store, filter, after, before, first, last)?;

create_connection(records, has_previous, has_next)
}

fn load_connection_by_prefix_timestamp_key<N, T>(
store: &RawEventStore<'_, T>,
filter: &(impl RawEventFilter + TimestampKeyExtractor),
after: Option<String>,
before: Option<String>,
first: Option<usize>,
last: Option<usize>,
) -> Result<Connection<String, N>>
where
N: FromKeyValue<T> + OutputType,
T: DeserializeOwned + EventFilter,
{
let (records, has_previous, has_next) =
get_connection_by_prefix_timestamp_key(store, filter, after, before, first, last)?;

create_connection(records, has_previous, has_next)
}

#[allow(clippy::unnecessary_wraps)]
fn create_connection<N, T>(
records: Vec<(Box<[u8]>, T)>,
has_previous: bool,
has_next: bool,
) -> Result<Connection<String, N>>
where
N: FromKeyValue<T> + OutputType,
T: DeserializeOwned,
{
let mut connection: Connection<String, N> = Connection::new(has_previous, has_next);
connection.edges = records
.into_iter()
Expand Down Expand Up @@ -437,6 +587,14 @@
(records, has_more)
}

pub fn get_timestamp_from_key_prefix(key: &[u8]) -> Result<DateTime<Utc>, anyhow::Error> {
if key.len() > TIMESTAMP_SIZE {
let timestamp = i64::from_be_bytes(key[0..TIMESTAMP_SIZE].try_into()?);
return Ok(Utc.timestamp_nanos(timestamp));
}
Err(anyhow!("invalid database key length"))

Check warning on line 595 in src/graphql.rs

View check run for this annotation

Codecov / codecov/patch

src/graphql.rs#L594-L595

Added lines #L594 - L595 were not covered by tests
}

pub fn get_timestamp_from_key(key: &[u8]) -> Result<DateTime<Utc>, anyhow::Error> {
if key.len() > TIMESTAMP_SIZE {
let nanos = i64::from_be_bytes(key[(key.len() - TIMESTAMP_SIZE)..].try_into()?);
Expand Down Expand Up @@ -1578,6 +1736,8 @@
}
pub(crate) use impl_from_giganto_search_filter_for_graphql_client;

use crate::storage::TimestampKeyExtractor;

#[cfg(test)]
mod tests {
use std::collections::{HashMap, HashSet};
Expand Down
2 changes: 2 additions & 0 deletions src/graphql/client/schema/op_log_raw_events.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ query OpLogRawEvents($filter: OpLogFilter!, $after: String, $before: String, $fi
timestamp
level
contents
agentName
sophie-cluml marked this conversation as resolved.
Show resolved Hide resolved
sensor
}
}
}
Expand Down
5 changes: 4 additions & 1 deletion src/graphql/client/schema/schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -980,7 +980,8 @@ type NtlmRawEventEdge {

input OpLogFilter {
time: TimeRange
agentId: String!
sensor: String
agentId: String
logLevel: String
contents: String
}
Expand All @@ -989,6 +990,8 @@ type OpLogRawEvent {
timestamp: DateTime!
level: String!
contents: String!
agentName: String!
sensor: String!
}

type OpLogRawEventConnection {
Expand Down
26 changes: 18 additions & 8 deletions src/graphql/export/tests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::mem;
use std::net::IpAddr;
use std::sync::{Arc, OnceLock};

use chrono::{Duration, Utc};
use giganto_client::ingest::{
Expand All @@ -12,6 +13,7 @@ use giganto_client::ingest::{
};

use crate::graphql::tests::TestSchema;
use crate::ingest::generation::SequenceGenerator;
use crate::storage::RawEventStore;

#[tokio::test]
Expand Down Expand Up @@ -907,9 +909,10 @@ fn insert_time_series(
async fn export_op_log() {
let schema = TestSchema::new();
let store = schema.db.op_log_store().unwrap();
let generator: OnceLock<Arc<SequenceGenerator>> = OnceLock::new();

insert_op_log_raw_event(&store, "agent1", 1);
insert_op_log_raw_event(&store, "agent2", 1);
insert_op_log_raw_event(&store, "agent1", "src1", 1, &generator).await;
insert_op_log_raw_event(&store, "agent2", "src1", 1, &generator).await;

// export csv file
let query = r#"
Expand Down Expand Up @@ -938,16 +941,23 @@ async fn export_op_log() {
assert!(res.data.to_string().contains("op_log"));
}

fn insert_op_log_raw_event(store: &RawEventStore<OpLog>, agent_name: &str, timestamp: i64) {
async fn insert_op_log_raw_event(
store: &RawEventStore<'_, OpLog>,
agent_name: &str,
sensor: &str,
timestamp: i64,
generator: &OnceLock<Arc<SequenceGenerator>>,
) {
let generator = generator.get_or_init(SequenceGenerator::init_generator);
let sequence_number = generator.generate_sequence_number();

let mut key: Vec<u8> = Vec::new();
let agent_id = format!("{agent_name}@src1");
key.extend_from_slice(agent_id.as_bytes());
key.push(0);
key.extend_from_slice(&timestamp.to_be_bytes());
key.extend_from_slice(&sequence_number.to_be_bytes());

let op_log_body = OpLog {
sensor: "sensor".to_string(),
agent_name: agent_id.to_string(),
sensor: sensor.to_string(),
agent_name: agent_name.to_string(),
log_level: OpLogLevel::Info,
contents: "op_log".to_string(),
};
Expand Down
Loading