Skip to content

Commit

Permalink
Add sysmon event to publish stream data (#510)
Browse files Browse the repository at this point in the history
- `FileCreate`, `FileDelete`
  • Loading branch information
BLYKIM authored Aug 28, 2023
1 parent 3071c9c commit 80f8e81
Show file tree
Hide file tree
Showing 9 changed files with 71 additions and 58 deletions.
33 changes: 6 additions & 27 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "giganto"
version = "0.12.4-alpha.1"
version = "0.13.0-alpha.1"
edition = "2021"

[dependencies]
Expand All @@ -15,7 +15,7 @@ ctrlc = { version = "3", features = ["termination"] }
data-encoding = "2.4"
directories = "5.0"
futures-util = "0.3"
giganto-client = { git = "https://github.com/aicers/giganto-client.git", tag = "0.12.1" }
giganto-client = { git = "https://github.com/aicers/giganto-client.git", tag = "0.12.2" }
humantime = "2.1"
humantime-serde = "1"
libc = "0.2"
Expand Down
16 changes: 1 addition & 15 deletions src/graphql/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1604,7 +1604,6 @@ fn network_connection(
NetworkRawEvents::ConnRawEvent(ConnRawEvent::from_key_value(&key, value)?),
));
conn_data = conn_iter.next();
} else {
};
}
_ if selected == dns_ts => {
Expand All @@ -1614,7 +1613,6 @@ fn network_connection(
NetworkRawEvents::DnsRawEvent(DnsRawEvent::from_key_value(&key, value)?),
));
dns_data = dns_iter.next();
} else {
};
}
_ if selected == http_ts => {
Expand All @@ -1624,7 +1622,6 @@ fn network_connection(
NetworkRawEvents::HttpRawEvent(HttpRawEvent::from_key_value(&key, value)?),
));
http_data = http_iter.next();
} else {
};
}
_ if selected == rdp_ts => {
Expand All @@ -1634,7 +1631,6 @@ fn network_connection(
NetworkRawEvents::RdpRawEvent(RdpRawEvent::from_key_value(&key, value)?),
));
rdp_data = rdp_iter.next();
} else {
};
}
_ if selected == ntlm_ts => {
Expand All @@ -1644,7 +1640,6 @@ fn network_connection(
NetworkRawEvents::NtlmRawEvent(NtlmRawEvent::from_key_value(&key, value)?),
));
ntlm_data = ntlm_iter.next();
} else {
};
}
_ if selected == kerberos_ts => {
Expand All @@ -1656,7 +1651,6 @@ fn network_connection(
)?),
));
kerberos_data = kerberos_iter.next();
} else {
};
}
_ if selected == ssh_ts => {
Expand All @@ -1666,7 +1660,6 @@ fn network_connection(
NetworkRawEvents::SshRawEvent(SshRawEvent::from_key_value(&key, value)?),
));
ssh_data = ssh_iter.next();
} else {
};
}
_ if selected == dce_rpc_ts => {
Expand All @@ -1678,7 +1671,6 @@ fn network_connection(
)?),
));
dce_rpc_data = dce_rpc_iter.next();
} else {
};
}
_ if selected == ftp_ts => {
Expand All @@ -1688,7 +1680,6 @@ fn network_connection(
NetworkRawEvents::FtpRawEvent(FtpRawEvent::from_key_value(&key, value)?),
));
ftp_data = ftp_iter.next();
} else {
};
}
_ if selected == mqtt_ts => {
Expand All @@ -1698,7 +1689,6 @@ fn network_connection(
NetworkRawEvents::MqttRawEvent(MqttRawEvent::from_key_value(&key, value)?),
));
mqtt_data = mqtt_iter.next();
} else {
};
}
_ if selected == ldap_ts => {
Expand All @@ -1708,7 +1698,6 @@ fn network_connection(
NetworkRawEvents::LdapRawEvent(LdapRawEvent::from_key_value(&key, value)?),
));
ldap_data = ldap_iter.next();
} else {
};
}
_ if selected == tls_ts => {
Expand All @@ -1718,7 +1707,6 @@ fn network_connection(
NetworkRawEvents::TlsRawEvent(TlsRawEvent::from_key_value(&key, value)?),
));
tls_data = tls_iter.next();
} else {
};
}
_ if selected == smb_ts => {
Expand All @@ -1728,7 +1716,6 @@ fn network_connection(
NetworkRawEvents::SmbRawEvent(SmbRawEvent::from_key_value(&key, value)?),
));
smb_data = smb_iter.next();
} else {
};
}
_ if selected == nfs_ts => {
Expand All @@ -1738,7 +1725,6 @@ fn network_connection(
NetworkRawEvents::NfsRawEvent(NfsRawEvent::from_key_value(&key, value)?),
));
nfs_data = nfs_iter.next();
} else {
};
}
_ => {}
Expand Down Expand Up @@ -1787,7 +1773,7 @@ fn network_connection(
}
let mut connection: Connection<String, NetworkRawEvents> =
Connection::new(has_previous_page, has_next_page);
connection.edges.extend(result_vec.into_iter());
connection.edges.extend(result_vec);

Ok(connection)
}
Expand Down
16 changes: 10 additions & 6 deletions src/graphql/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,18 @@ impl GigantoStatusQuery {
.get("peers")
.context("peers not found")?
.as_array()
.context("invaild peers format")?;
.context("invalid peers format")?;
let mut peer_list = Vec::new();
for peer in peers_value.iter() {
for peer in peers_value {
if let Some(peer_data) = peer.as_inline_table() {
let (Some(address_val),Some(host_name_val)) = (peer_data.get("address"),peer_data.get("host_name")) else{
let (Some(address_val), Some(host_name_val)) =
(peer_data.get("address"), peer_data.get("host_name"))
else {
return Err(anyhow!("Invalid address/hostname Value format").into());
};
let (Some(address),Some(host_name)) = (address_val.as_str(),host_name_val.as_str()) else{
let (Some(address), Some(host_name)) =
(address_val.as_str(), host_name_val.as_str())
else {
return Err(anyhow!("Invalid address/hostname String format").into());
};
peer_list.push(PeerList {
Expand Down Expand Up @@ -223,10 +227,10 @@ pub fn write_toml_file(doc: &Document, path: &str) -> Result<()> {

fn parse_toml_element(key: &str, doc: &Document) -> Result<String> {
let Some(item) = doc.get(key) else {
return Err(anyhow!("{} not found.",key).into());
return Err(anyhow!("{} not found.", key).into());
};
let Some(value) = item.as_str() else {
return Err(anyhow!("parse failed: {}'s format is not available.",key).into());
return Err(anyhow!("parse failed: {}'s format is not available.", key).into());
};
Ok(value.to_string())
}
Expand Down
10 changes: 5 additions & 5 deletions src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ async fn client_run(
local_host_name: String,
wait_shutdown: Arc<Notify>,
) {
for peer in peer_conn_info.peer_list.read().await.iter() {
for peer in &*peer_conn_info.peer_list.read().await {
tokio::spawn(client_connection(
client_endpoint.clone(),
peer.clone(),
Expand Down Expand Up @@ -300,7 +300,7 @@ async fn client_connection(
.await?;

// Share the received peer list with connected peers.
for (_, conn) in peer_conn_info.peer_conn.read().await.iter() {
for conn in (*peer_conn_info.peer_conn.read().await).values() {
tokio::spawn(update_peer_info::<HashSet<PeerInfo>>(
conn.clone(),
PeerCode::UpdatePeerList,
Expand Down Expand Up @@ -345,7 +345,7 @@ async fn client_connection(
},
_ = peer_conn_info.notify_source.notified() => {
let source_list: HashSet<String> = peer_conn_info.sources.read().await.keys().cloned().collect();
for (_, conn) in peer_conn_info.peer_conn.write().await.iter() {
for conn in (*peer_conn_info.peer_conn.write().await).values() {
tokio::spawn(update_peer_info::<HashSet<String>>(
conn.clone(),
PeerCode::UpdateSourceList,
Expand Down Expand Up @@ -456,7 +456,7 @@ async fn server_connection(
.await?;

// Share the received peer list with your connected peers.
for (_, conn) in peer_conn_info.peer_conn.read().await.iter() {
for conn in (*peer_conn_info.peer_conn.read().await).values() {
tokio::spawn(update_peer_info::<HashSet<PeerInfo>>(
conn.clone(),
PeerCode::UpdatePeerList,
Expand Down Expand Up @@ -501,7 +501,7 @@ async fn server_connection(
},
_ = peer_conn_info.notify_source.notified() => {
let source_list: HashSet<String> = peer_conn_info.sources.read().await.keys().cloned().collect();
for (_, conn) in peer_conn_info.peer_conn.read().await.iter() {
for conn in (*peer_conn_info.peer_conn.read().await).values() {
tokio::spawn(update_peer_info::<HashSet<String>>(
conn.clone(),
PeerCode::UpdateSourceList,
Expand Down
42 changes: 41 additions & 1 deletion src/publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,46 @@ where
error!("Failed to open nfs store");
}
}
RequestStreamRecord::FileCreate => {
if let Ok(store) = db.file_create_store() {
if let Err(e) = send_stream(
store,
conn,
record_type,
request_msg,
source,
kind,
node_type,
stream_direct_channel,
)
.await
{
error!("Failed to send sysmon stream : {}", e);
}
} else {
error!("Failed to open file_create store");
}
}
RequestStreamRecord::FileDelete => {
if let Ok(store) = db.file_delete_store() {
if let Err(e) = send_stream(
store,
conn,
record_type,
request_msg,
source,
kind,
node_type,
stream_direct_channel,
)
.await
{
error!("Failed to send sysmon stream : {}", e);
}
} else {
error!("Failed to open file_delete store");
}
}
RequestStreamRecord::Pcap => {}
};
Ok(())
Expand All @@ -628,7 +668,7 @@ pub async fn send_direct_stream(
source: &str,
stream_direct_channel: StreamDirectChannel,
) -> Result<()> {
for (req_key, sender) in stream_direct_channel.read().await.iter() {
for (req_key, sender) in &*stream_direct_channel.read().await {
if req_key.contains(&network_key.source_key) || req_key.contains(&network_key.all_key) {
let raw_len = u32::try_from(raw_event.len())?.to_le_bytes();
let mut send_buf: Vec<u8> = Vec::new();
Expand Down
5 changes: 4 additions & 1 deletion src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ pub fn extract_cert_from_conn(connection: &Connection) -> Result<Vec<Certificate
let Some(conn_info) = connection.peer_identity() else {
bail!("no peer identity");
};
let Some(cert_info) = conn_info.downcast_ref::<Vec<rustls::Certificate>>().cloned() else {
let Some(cert_info) = conn_info
.downcast_ref::<Vec<rustls::Certificate>>()
.cloned()
else {
bail!("non-certificate identity");
};
Ok(cert_info)
Expand Down
2 changes: 1 addition & 1 deletion src/storage/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::{
};
use tracing::info;

const COMPATIBLE_VERSION_REQ: &str = ">0.12.4-alpha,<0.13.0-alpha";
const COMPATIBLE_VERSION_REQ: &str = ">0.12.4-alpha,<=0.13.0-alpha.1";

/// Migrates the data directory to the up-to-date format if necessary.
///
Expand Down
1 change: 1 addition & 0 deletions src/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use warp::{http::Response as HttpResponse, Filter};
///
/// Note that `key` is not compatible with the DER-encoded key extracted by
/// rustls-pemfile.
#[allow(clippy::unused_async)]
pub async fn serve(
schema: Schema,
addr: SocketAddr,
Expand Down

0 comments on commit 80f8e81

Please sign in to comment.