From 57aee93f8e312392ed30cf2ab838dadf37357c57 Mon Sep 17 00:00:00 2001 From: Bly Kim Date: Mon, 18 Sep 2023 16:22:11 +0900 Subject: [PATCH] Release 0.13.1 - Modify deprecated function --- CHANGELOG.md | 4 +- Cargo.lock | 58 +++++----- src/graphql/export.rs | 78 +++++++------ src/graphql/network.rs | 226 ++++++++++++++++++++------------------ src/graphql/packet.rs | 19 ++-- src/graphql/statistics.rs | 2 +- src/ingest/tests.rs | 182 ++++++++++++++++++++---------- src/publish/tests.rs | 222 ++++++++++++++++++------------------- src/storage.rs | 23 ++-- src/storage/migration.rs | 2 +- 10 files changed, 461 insertions(+), 355 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index dc7a5b3b..3a1d4cda 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,7 @@ file is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [Unreleased] +## [0.13.1] - 2023-09-18 ### Changed @@ -21,6 +21,7 @@ Versioning](https://semver.org/spec/v2.0.0.html). - If filter has no value for the time field, it will provide the most `recent` statistics. - Add feature to generate benchmark statistics for ingest events. +- Modify to execute flush when giganto down. ## [0.13.0] - 2023-08-28 @@ -282,6 +283,7 @@ Versioning](https://semver.org/spec/v2.0.0.html). - Initial release. +[0.13.1]: https://github.com/aicers/giganto/compare/0.13.0...0.13.1 [0.13.0]: https://github.com/aicers/giganto/compare/0.12.3...0.13.0 [0.12.3]: https://github.com/aicers/giganto/compare/0.12.2...0.12.3 [0.12.2]: https://github.com/aicers/giganto/compare/0.12.1...0.12.2 diff --git a/Cargo.lock b/Cargo.lock index fd3ac3cd..0b534bd2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -149,7 +149,7 @@ dependencies = [ "proc-macro2", "quote", "strum", - "syn 2.0.33", + "syn 2.0.37", "thiserror", ] @@ -208,7 +208,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.37", ] [[package]] @@ -219,7 +219,7 @@ checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.37", ] [[package]] @@ -282,7 +282,7 @@ dependencies = [ "regex", "rustc-hash", "shlex", - "syn 2.0.33", + "syn 2.0.37", ] [[package]] @@ -308,9 +308,9 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.13.0" +version = "3.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3e2c3daef883ecc1b5d58c15adae93470a91d425f3532ba1695849656af3fc1" +checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec" [[package]] name = "byteorder" @@ -365,9 +365,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.30" +version = "0.4.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "defd4e7873dbddba6c7c91e199c7fcb946abc4a6a4ac3195400bcfb01b5de877" +checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38" dependencies = [ "android-tzdata", "iana-time-zone", @@ -512,7 +512,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn 2.0.33", + "syn 2.0.37", ] [[package]] @@ -523,7 +523,7 @@ checksum = "836a9bbc7ad63342d6d6e7b815ccab164bc77a2d95d84bc3117a8c0d5c98e2d5" dependencies = [ "darling_core", "quote", - "syn 2.0.33", + "syn 2.0.37", ] [[package]] @@ -594,7 +594,7 @@ checksum = "487585f4d0c6655fe74905e2504d8ad6908e4db67f744eb140876906c2f3175d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.37", ] [[package]] @@ -710,7 +710,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.37", ] [[package]] @@ -1406,7 +1406,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.37", ] [[package]] @@ -1515,7 +1515,7 @@ dependencies = [ "pest_meta", "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.37", ] [[package]] @@ -1546,7 +1546,7 @@ checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.37", ] [[package]] @@ -1612,7 +1612,7 @@ dependencies = [ "proc-macro2", "quote", "regex", - "syn 2.0.33", + "syn 2.0.37", ] [[package]] @@ -1671,7 +1671,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ae005bd773ab59b4725093fd7df83fd7892f7d8eafb48dbd7de6e024e4215f9d" dependencies = [ "proc-macro2", - "syn 2.0.33", + "syn 2.0.37", ] [[package]] @@ -2106,7 +2106,7 @@ checksum = "4eca7ac642d82aa35b60049a6eccb4be6be75e599bd2e9adb5f875a737654af2" dependencies = [ "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.37", ] [[package]] @@ -2158,7 +2158,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.37", ] [[package]] @@ -2289,7 +2289,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.33", + "syn 2.0.37", ] [[package]] @@ -2305,9 +2305,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.33" +version = "2.0.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9caece70c63bfba29ec2fed841a09851b14a235c60010fa4de58089b6c025668" +checksum = "7303ef2c05cd654186cb250d29049a24840ca25d2747c25c0381c8d9e2f582e8" dependencies = [ "proc-macro2", "quote", @@ -2382,7 +2382,7 @@ checksum = "49922ecae66cc8a249b77e68d1d0623c1b2c514f0060c27cdc68bd62a1219d35" dependencies = [ "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.37", ] [[package]] @@ -2463,7 +2463,7 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.37", ] [[package]] @@ -2589,7 +2589,7 @@ checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab" dependencies = [ "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.37", ] [[package]] @@ -2658,9 +2658,9 @@ dependencies = [ [[package]] name = "typenum" -version = "1.16.0" +version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba" +checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" [[package]] name = "ucd-trie" @@ -2829,7 +2829,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.37", "wasm-bindgen-shared", ] @@ -2851,7 +2851,7 @@ checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.37", "wasm-bindgen-backend", "wasm-bindgen-shared", ] diff --git a/src/graphql/export.rs b/src/graphql/export.rs index fb6e61ed..e0112da5 100644 --- a/src/graphql/export.rs +++ b/src/graphql/export.rs @@ -2204,8 +2204,8 @@ mod tests { let schema = TestSchema::new(); let store = schema.db.conn_store().unwrap(); - insert_conn_raw_event(&store, "src1", Utc::now().timestamp_nanos()); - insert_conn_raw_event(&store, "src2", Utc::now().timestamp_nanos()); + insert_conn_raw_event(&store, "src1", Utc::now().timestamp_nanos_opt().unwrap()); + insert_conn_raw_event(&store, "src2", Utc::now().timestamp_nanos_opt().unwrap()); // export csv file let query = r#" @@ -2274,8 +2274,8 @@ mod tests { let schema = TestSchema::new(); let store = schema.db.dns_store().unwrap(); - insert_dns_raw_event(&store, "src1", Utc::now().timestamp_nanos()); - insert_dns_raw_event(&store, "src2", Utc::now().timestamp_nanos()); + insert_dns_raw_event(&store, "src1", Utc::now().timestamp_nanos_opt().unwrap()); + insert_dns_raw_event(&store, "src2", Utc::now().timestamp_nanos_opt().unwrap()); // export csv file let query = r#" @@ -2350,8 +2350,8 @@ mod tests { let schema = TestSchema::new(); let store = schema.db.http_store().unwrap(); - insert_http_raw_event(&store, "src1", Utc::now().timestamp_nanos()); - insert_http_raw_event(&store, "src2", Utc::now().timestamp_nanos()); + insert_http_raw_event(&store, "src1", Utc::now().timestamp_nanos_opt().unwrap()); + insert_http_raw_event(&store, "src2", Utc::now().timestamp_nanos_opt().unwrap()); // export csv file let query = r#" @@ -2434,8 +2434,8 @@ mod tests { let schema = TestSchema::new(); let store = schema.db.rdp_store().unwrap(); - insert_rdp_raw_event(&store, "src1", Utc::now().timestamp_nanos()); - insert_rdp_raw_event(&store, "src2", Utc::now().timestamp_nanos()); + insert_rdp_raw_event(&store, "src1", Utc::now().timestamp_nanos_opt().unwrap()); + insert_rdp_raw_event(&store, "src2", Utc::now().timestamp_nanos_opt().unwrap()); // export csv file let query = r#" @@ -2499,8 +2499,8 @@ mod tests { let schema = TestSchema::new(); let store = schema.db.smtp_store().unwrap(); - insert_smtp_raw_event(&store, "src1", Utc::now().timestamp_nanos()); - insert_smtp_raw_event(&store, "src2", Utc::now().timestamp_nanos()); + insert_smtp_raw_event(&store, "src1", Utc::now().timestamp_nanos_opt().unwrap()); + insert_smtp_raw_event(&store, "src2", Utc::now().timestamp_nanos_opt().unwrap()); // export csv file let query = r#" @@ -2569,8 +2569,8 @@ mod tests { let schema = TestSchema::new(); let store = schema.db.ntlm_store().unwrap(); - insert_ntlm_raw_event(&store, "src1", Utc::now().timestamp_nanos()); - insert_ntlm_raw_event(&store, "src2", Utc::now().timestamp_nanos()); + insert_ntlm_raw_event(&store, "src1", Utc::now().timestamp_nanos_opt().unwrap()); + insert_ntlm_raw_event(&store, "src2", Utc::now().timestamp_nanos_opt().unwrap()); // export csv file let query = r#" @@ -2640,8 +2640,8 @@ mod tests { let schema = TestSchema::new(); let store = schema.db.kerberos_store().unwrap(); - insert_kerberos_raw_event(&store, "src1", Utc::now().timestamp_nanos()); - insert_kerberos_raw_event(&store, "src2", Utc::now().timestamp_nanos()); + insert_kerberos_raw_event(&store, "src1", Utc::now().timestamp_nanos_opt().unwrap()); + insert_kerberos_raw_event(&store, "src2", Utc::now().timestamp_nanos_opt().unwrap()); // export csv file let query = r#" @@ -2716,8 +2716,8 @@ mod tests { let schema = TestSchema::new(); let store = schema.db.ssh_store().unwrap(); - insert_ssh_raw_event(&store, "src1", Utc::now().timestamp_nanos()); - insert_ssh_raw_event(&store, "src2", Utc::now().timestamp_nanos()); + insert_ssh_raw_event(&store, "src1", Utc::now().timestamp_nanos_opt().unwrap()); + insert_ssh_raw_event(&store, "src2", Utc::now().timestamp_nanos_opt().unwrap()); // export csv file let query = r#" @@ -2791,8 +2791,8 @@ mod tests { let schema = TestSchema::new(); let store = schema.db.dce_rpc_store().unwrap(); - insert_dce_rpc_raw_event(&store, "src1", Utc::now().timestamp_nanos()); - insert_dce_rpc_raw_event(&store, "src2", Utc::now().timestamp_nanos()); + insert_dce_rpc_raw_event(&store, "src1", Utc::now().timestamp_nanos_opt().unwrap()); + insert_dce_rpc_raw_event(&store, "src2", Utc::now().timestamp_nanos_opt().unwrap()); // export csv file let query = r#" @@ -2861,14 +2861,14 @@ mod tests { insert_log_raw_event( &store, "src1", - Utc::now().timestamp_nanos(), + Utc::now().timestamp_nanos_opt().unwrap(), "kind1", b"log1", ); insert_log_raw_event( &store, "src2", - Utc::now().timestamp_nanos(), + Utc::now().timestamp_nanos_opt().unwrap(), "kind2", b"log2", ); @@ -2930,8 +2930,18 @@ mod tests { let schema = TestSchema::new(); let store = schema.db.periodic_time_series_store().unwrap(); - insert_time_series(&store, "1", Utc::now().timestamp_nanos(), vec![0.0; 12]); - insert_time_series(&store, "2", Utc::now().timestamp_nanos(), vec![0.0; 12]); + insert_time_series( + &store, + "1", + Utc::now().timestamp_nanos_opt().unwrap(), + vec![0.0; 12], + ); + insert_time_series( + &store, + "2", + Utc::now().timestamp_nanos_opt().unwrap(), + vec![0.0; 12], + ); // export csv file let query = r#" @@ -3038,8 +3048,8 @@ mod tests { let schema = TestSchema::new(); let store = schema.db.ftp_store().unwrap(); - insert_ftp_raw_event(&store, "src1", Utc::now().timestamp_nanos()); - insert_ftp_raw_event(&store, "src2", Utc::now().timestamp_nanos()); + insert_ftp_raw_event(&store, "src1", Utc::now().timestamp_nanos_opt().unwrap()); + insert_ftp_raw_event(&store, "src2", Utc::now().timestamp_nanos_opt().unwrap()); // export csv file let query = r#" @@ -3114,8 +3124,8 @@ mod tests { let schema = TestSchema::new(); let store = schema.db.mqtt_store().unwrap(); - insert_mqtt_raw_event(&store, "src1", Utc::now().timestamp_nanos()); - insert_mqtt_raw_event(&store, "src2", Utc::now().timestamp_nanos()); + insert_mqtt_raw_event(&store, "src1", Utc::now().timestamp_nanos_opt().unwrap()); + insert_mqtt_raw_event(&store, "src2", Utc::now().timestamp_nanos_opt().unwrap()); // export csv file let query = r#" @@ -3184,8 +3194,8 @@ mod tests { let schema = TestSchema::new(); let store = schema.db.ldap_store().unwrap(); - insert_ldap_raw_event(&store, "src1", Utc::now().timestamp_nanos()); - insert_ldap_raw_event(&store, "src2", Utc::now().timestamp_nanos()); + insert_ldap_raw_event(&store, "src1", Utc::now().timestamp_nanos_opt().unwrap()); + insert_ldap_raw_event(&store, "src2", Utc::now().timestamp_nanos_opt().unwrap()); // export csv file let query = r#" @@ -3255,8 +3265,8 @@ mod tests { let schema = TestSchema::new(); let store = schema.db.tls_store().unwrap(); - insert_tls_raw_event(&store, "src1", Utc::now().timestamp_nanos()); - insert_tls_raw_event(&store, "src2", Utc::now().timestamp_nanos()); + insert_tls_raw_event(&store, "src1", Utc::now().timestamp_nanos_opt().unwrap()); + insert_tls_raw_event(&store, "src2", Utc::now().timestamp_nanos_opt().unwrap()); // export csv file let query = r#" @@ -3337,8 +3347,8 @@ mod tests { let schema = TestSchema::new(); let store = schema.db.smb_store().unwrap(); - insert_smb_raw_event(&store, "src1", Utc::now().timestamp_nanos()); - insert_smb_raw_event(&store, "src2", Utc::now().timestamp_nanos()); + insert_smb_raw_event(&store, "src1", Utc::now().timestamp_nanos_opt().unwrap()); + insert_smb_raw_event(&store, "src2", Utc::now().timestamp_nanos_opt().unwrap()); // export csv file let query = r#" @@ -3412,8 +3422,8 @@ mod tests { let schema = TestSchema::new(); let store = schema.db.nfs_store().unwrap(); - insert_nfs_raw_event(&store, "src1", Utc::now().timestamp_nanos()); - insert_nfs_raw_event(&store, "src2", Utc::now().timestamp_nanos()); + insert_nfs_raw_event(&store, "src1", Utc::now().timestamp_nanos_opt().unwrap()); + insert_nfs_raw_event(&store, "src2", Utc::now().timestamp_nanos_opt().unwrap()); // export csv file let query = r#" diff --git a/src/graphql/network.rs b/src/graphql/network.rs index 7976c272..dd458f1e 100644 --- a/src/graphql/network.rs +++ b/src/graphql/network.rs @@ -1831,8 +1831,8 @@ mod tests { let schema = TestSchema::new(); let store = schema.db.conn_store().unwrap(); - insert_conn_raw_event(&store, "src 1", Utc::now().timestamp_nanos()); - insert_conn_raw_event(&store, "src 1", Utc::now().timestamp_nanos()); + insert_conn_raw_event(&store, "src 1", Utc::now().timestamp_nanos_opt().unwrap()); + insert_conn_raw_event(&store, "src 1", Utc::now().timestamp_nanos_opt().unwrap()); let query = r#" { @@ -1928,8 +1928,8 @@ mod tests { let schema = TestSchema::new(); let store = schema.db.dns_store().unwrap(); - insert_dns_raw_event(&store, "src 1", Utc::now().timestamp_nanos()); - insert_dns_raw_event(&store, "src 1", Utc::now().timestamp_nanos()); + insert_dns_raw_event(&store, "src 1", Utc::now().timestamp_nanos_opt().unwrap()); + insert_dns_raw_event(&store, "src 1", Utc::now().timestamp_nanos_opt().unwrap()); let query = r#" { @@ -2023,8 +2023,8 @@ mod tests { let schema = TestSchema::new(); let store = schema.db.http_store().unwrap(); - insert_http_raw_event(&store, "src 1", Utc::now().timestamp_nanos()); - insert_http_raw_event(&store, "src 1", Utc::now().timestamp_nanos()); + insert_http_raw_event(&store, "src 1", Utc::now().timestamp_nanos_opt().unwrap()); + insert_http_raw_event(&store, "src 1", Utc::now().timestamp_nanos_opt().unwrap()); let query = r#" { @@ -2126,8 +2126,8 @@ mod tests { let schema = TestSchema::new(); let store = schema.db.rdp_store().unwrap(); - insert_rdp_raw_event(&store, "src 1", Utc::now().timestamp_nanos()); - insert_rdp_raw_event(&store, "src 1", Utc::now().timestamp_nanos()); + insert_rdp_raw_event(&store, "src 1", Utc::now().timestamp_nanos_opt().unwrap()); + insert_rdp_raw_event(&store, "src 1", Utc::now().timestamp_nanos_opt().unwrap()); let query = r#" { @@ -2182,8 +2182,8 @@ mod tests { let schema = TestSchema::new(); let store = schema.db.smtp_store().unwrap(); - insert_smtp_raw_event(&store, "src 1", Utc::now().timestamp_nanos()); - insert_smtp_raw_event(&store, "src 1", Utc::now().timestamp_nanos()); + insert_smtp_raw_event(&store, "src 1", Utc::now().timestamp_nanos_opt().unwrap()); + insert_smtp_raw_event(&store, "src 1", Utc::now().timestamp_nanos_opt().unwrap()); let query = r#" { @@ -2237,8 +2237,8 @@ mod tests { let schema = TestSchema::new(); let store = schema.db.ntlm_store().unwrap(); - insert_ntlm_raw_event(&store, "src 1", Utc::now().timestamp_nanos()); - insert_ntlm_raw_event(&store, "src 1", Utc::now().timestamp_nanos()); + insert_ntlm_raw_event(&store, "src 1", Utc::now().timestamp_nanos_opt().unwrap()); + insert_ntlm_raw_event(&store, "src 1", Utc::now().timestamp_nanos_opt().unwrap()); let query = r#" { @@ -2293,8 +2293,8 @@ mod tests { let schema = TestSchema::new(); let store = schema.db.kerberos_store().unwrap(); - insert_kerberos_raw_event(&store, "src 1", Utc::now().timestamp_nanos()); - insert_kerberos_raw_event(&store, "src 1", Utc::now().timestamp_nanos()); + insert_kerberos_raw_event(&store, "src 1", Utc::now().timestamp_nanos_opt().unwrap()); + insert_kerberos_raw_event(&store, "src 1", Utc::now().timestamp_nanos_opt().unwrap()); let query = r#" { @@ -2354,8 +2354,8 @@ mod tests { let schema = TestSchema::new(); let store = schema.db.ssh_store().unwrap(); - insert_ssh_raw_event(&store, "src 1", Utc::now().timestamp_nanos()); - insert_ssh_raw_event(&store, "src 1", Utc::now().timestamp_nanos()); + insert_ssh_raw_event(&store, "src 1", Utc::now().timestamp_nanos_opt().unwrap()); + insert_ssh_raw_event(&store, "src 1", Utc::now().timestamp_nanos_opt().unwrap()); let query = r#" { @@ -2415,8 +2415,8 @@ mod tests { let schema = TestSchema::new(); let store = schema.db.dce_rpc_store().unwrap(); - insert_dce_rpc_raw_event(&store, "src 1", Utc::now().timestamp_nanos()); - insert_dce_rpc_raw_event(&store, "src 1", Utc::now().timestamp_nanos()); + insert_dce_rpc_raw_event(&store, "src 1", Utc::now().timestamp_nanos_opt().unwrap()); + insert_dce_rpc_raw_event(&store, "src 1", Utc::now().timestamp_nanos_opt().unwrap()); let query = r#" { @@ -2468,8 +2468,8 @@ mod tests { let schema = TestSchema::new(); let store = schema.db.ftp_store().unwrap(); - insert_ftp_raw_event(&store, "src 1", Utc::now().timestamp_nanos()); - insert_ftp_raw_event(&store, "src 1", Utc::now().timestamp_nanos()); + insert_ftp_raw_event(&store, "src 1", Utc::now().timestamp_nanos_opt().unwrap()); + insert_ftp_raw_event(&store, "src 1", Utc::now().timestamp_nanos_opt().unwrap()); let query = r#" { @@ -2529,8 +2529,8 @@ mod tests { let schema = TestSchema::new(); let store = schema.db.mqtt_store().unwrap(); - insert_mqtt_raw_event(&store, "src 1", Utc::now().timestamp_nanos()); - insert_mqtt_raw_event(&store, "src 1", Utc::now().timestamp_nanos()); + insert_mqtt_raw_event(&store, "src 1", Utc::now().timestamp_nanos_opt().unwrap()); + insert_mqtt_raw_event(&store, "src 1", Utc::now().timestamp_nanos_opt().unwrap()); let query = r#" { @@ -2584,8 +2584,8 @@ mod tests { let schema = TestSchema::new(); let store = schema.db.ldap_store().unwrap(); - insert_ldap_raw_event(&store, "src 1", Utc::now().timestamp_nanos()); - insert_ldap_raw_event(&store, "src 1", Utc::now().timestamp_nanos()); + insert_ldap_raw_event(&store, "src 1", Utc::now().timestamp_nanos_opt().unwrap()); + insert_ldap_raw_event(&store, "src 1", Utc::now().timestamp_nanos_opt().unwrap()); let query = r#" { @@ -2640,8 +2640,8 @@ mod tests { let schema = TestSchema::new(); let store = schema.db.tls_store().unwrap(); - insert_tls_raw_event(&store, "src 1", Utc::now().timestamp_nanos()); - insert_tls_raw_event(&store, "src 1", Utc::now().timestamp_nanos()); + insert_tls_raw_event(&store, "src 1", Utc::now().timestamp_nanos_opt().unwrap()); + insert_tls_raw_event(&store, "src 1", Utc::now().timestamp_nanos_opt().unwrap()); let query = r#" { @@ -2707,8 +2707,8 @@ mod tests { let schema = TestSchema::new(); let store = schema.db.smb_store().unwrap(); - insert_smb_raw_event(&store, "src 1", Utc::now().timestamp_nanos()); - insert_smb_raw_event(&store, "src 1", Utc::now().timestamp_nanos()); + insert_smb_raw_event(&store, "src 1", Utc::now().timestamp_nanos_opt().unwrap()); + insert_smb_raw_event(&store, "src 1", Utc::now().timestamp_nanos_opt().unwrap()); let query = r#" { @@ -2767,8 +2767,8 @@ mod tests { let schema = TestSchema::new(); let store = schema.db.nfs_store().unwrap(); - insert_nfs_raw_event(&store, "src 1", Utc::now().timestamp_nanos()); - insert_nfs_raw_event(&store, "src 1", Utc::now().timestamp_nanos()); + insert_nfs_raw_event(&store, "src 1", Utc::now().timestamp_nanos_opt().unwrap()); + insert_nfs_raw_event(&store, "src 1", Utc::now().timestamp_nanos_opt().unwrap()); let query = r#" { @@ -2818,8 +2818,8 @@ mod tests { let schema = TestSchema::new(); let store = schema.db.conn_store().unwrap(); - insert_conn_raw_event(&store, "src 1", Utc::now().timestamp_nanos()); - insert_conn_raw_event(&store, "src 1", Utc::now().timestamp_nanos()); + insert_conn_raw_event(&store, "src 1", Utc::now().timestamp_nanos_opt().unwrap()); + insert_conn_raw_event(&store, "src 1", Utc::now().timestamp_nanos_opt().unwrap()); let query = r#" { @@ -2872,98 +2872,112 @@ mod tests { "src 1", Utc.with_ymd_and_hms(2020, 1, 1, 0, 1, 1) .unwrap() - .timestamp_nanos(), + .timestamp_nanos_opt() + .unwrap(), ); insert_dns_raw_event( &dns_store, "src 1", Utc.with_ymd_and_hms(2021, 1, 1, 0, 1, 1) .unwrap() - .timestamp_nanos(), + .timestamp_nanos_opt() + .unwrap(), ); insert_http_raw_event( &http_store, "src 1", Utc.with_ymd_and_hms(2020, 6, 1, 0, 1, 1) .unwrap() - .timestamp_nanos(), + .timestamp_nanos_opt() + .unwrap(), ); insert_rdp_raw_event( &rdp_store, "src 1", Utc.with_ymd_and_hms(2020, 1, 5, 0, 1, 1) .unwrap() - .timestamp_nanos(), + .timestamp_nanos_opt() + .unwrap(), ); insert_ntlm_raw_event( &ntlm_store, "src 1", Utc.with_ymd_and_hms(2022, 1, 5, 0, 1, 1) .unwrap() - .timestamp_nanos(), + .timestamp_nanos_opt() + .unwrap(), ); insert_kerberos_raw_event( &kerberos_store, "src 1", Utc.with_ymd_and_hms(2023, 1, 5, 0, 1, 1) .unwrap() - .timestamp_nanos(), + .timestamp_nanos_opt() + .unwrap(), ); insert_ssh_raw_event( &ssh_store, "src 1", Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, 1) .unwrap() - .timestamp_nanos(), + .timestamp_nanos_opt() + .unwrap(), ); insert_dce_rpc_raw_event( &dce_rpc_store, "src 1", Utc.with_ymd_and_hms(2020, 1, 5, 6, 5, 0) .unwrap() - .timestamp_nanos(), + .timestamp_nanos_opt() + .unwrap(), ); insert_ftp_raw_event( &ftp_store, "src 1", Utc.with_ymd_and_hms(2023, 1, 5, 12, 12, 0) .unwrap() - .timestamp_nanos(), + .timestamp_nanos_opt() + .unwrap(), ); insert_mqtt_raw_event( &mqtt_store, "src 1", Utc.with_ymd_and_hms(2023, 1, 5, 12, 12, 0) .unwrap() - .timestamp_nanos(), + .timestamp_nanos_opt() + .unwrap(), ); insert_ldap_raw_event( &ldap_store, "src 1", Utc.with_ymd_and_hms(2023, 1, 6, 12, 12, 0) .unwrap() - .timestamp_nanos(), + .timestamp_nanos_opt() + .unwrap(), ); insert_tls_raw_event( &tls_store, "src 1", Utc.with_ymd_and_hms(2023, 1, 6, 11, 11, 0) .unwrap() - .timestamp_nanos(), + .timestamp_nanos_opt() + .unwrap(), ); insert_smb_raw_event( &smb_store, "src 1", Utc.with_ymd_and_hms(2023, 1, 6, 12, 12, 10) .unwrap() - .timestamp_nanos(), + .timestamp_nanos_opt() + .unwrap(), ); insert_nfs_raw_event( &nfs_store, "src 1", Utc.with_ymd_and_hms(2023, 1, 6, 12, 13, 0) .unwrap() - .timestamp_nanos(), + .timestamp_nanos_opt() + .unwrap(), ); // order: ssh, conn, rdp, dce_rpc, http, dns, ntlm, kerberos, ftp, mqtt, tls, ldap, smb, nfs @@ -3060,10 +3074,10 @@ mod tests { let timestamp3 = Utc.with_ymd_and_hms(2020, 1, 1, 1, 1, 1).unwrap(); //2020-01-01T01:01:01Z let timestamp4 = Utc.with_ymd_and_hms(2020, 1, 2, 0, 0, 1).unwrap(); //2020-01-02T00:00:01Z - insert_http_raw_event(&store, "src 1", timestamp1.timestamp_nanos()); - insert_http_raw_event(&store, "src 1", timestamp2.timestamp_nanos()); - insert_http_raw_event(&store, "src 1", timestamp3.timestamp_nanos()); - insert_http_raw_event(&store, "src 1", timestamp4.timestamp_nanos()); + insert_http_raw_event(&store, "src 1", timestamp1.timestamp_nanos_opt().unwrap()); + insert_http_raw_event(&store, "src 1", timestamp2.timestamp_nanos_opt().unwrap()); + insert_http_raw_event(&store, "src 1", timestamp3.timestamp_nanos_opt().unwrap()); + insert_http_raw_event(&store, "src 1", timestamp4.timestamp_nanos_opt().unwrap()); let query = r#" { @@ -3096,10 +3110,10 @@ mod tests { let timestamp3 = Utc.with_ymd_and_hms(2020, 1, 1, 1, 1, 1).unwrap(); //2020-01-01T01:01:01Z let timestamp4 = Utc.with_ymd_and_hms(2020, 1, 2, 0, 0, 1).unwrap(); //2020-01-02T00:00:01Z - insert_conn_raw_event(&store, "src 1", timestamp1.timestamp_nanos()); - insert_conn_raw_event(&store, "src 1", timestamp2.timestamp_nanos()); - insert_conn_raw_event(&store, "src 1", timestamp3.timestamp_nanos()); - insert_conn_raw_event(&store, "src 1", timestamp4.timestamp_nanos()); + insert_conn_raw_event(&store, "src 1", timestamp1.timestamp_nanos_opt().unwrap()); + insert_conn_raw_event(&store, "src 1", timestamp2.timestamp_nanos_opt().unwrap()); + insert_conn_raw_event(&store, "src 1", timestamp3.timestamp_nanos_opt().unwrap()); + insert_conn_raw_event(&store, "src 1", timestamp4.timestamp_nanos_opt().unwrap()); let query = r#" { @@ -3132,10 +3146,10 @@ mod tests { let timestamp3 = Utc.with_ymd_and_hms(2020, 1, 1, 1, 1, 1).unwrap(); //2020-01-01T01:01:01Z let timestamp4 = Utc.with_ymd_and_hms(2020, 1, 2, 0, 0, 1).unwrap(); //2020-01-02T00:00:01Z - insert_dns_raw_event(&store, "src 1", timestamp1.timestamp_nanos()); - insert_dns_raw_event(&store, "src 1", timestamp2.timestamp_nanos()); - insert_dns_raw_event(&store, "src 1", timestamp3.timestamp_nanos()); - insert_dns_raw_event(&store, "src 1", timestamp4.timestamp_nanos()); + insert_dns_raw_event(&store, "src 1", timestamp1.timestamp_nanos_opt().unwrap()); + insert_dns_raw_event(&store, "src 1", timestamp2.timestamp_nanos_opt().unwrap()); + insert_dns_raw_event(&store, "src 1", timestamp3.timestamp_nanos_opt().unwrap()); + insert_dns_raw_event(&store, "src 1", timestamp4.timestamp_nanos_opt().unwrap()); let query = r#" { @@ -3168,10 +3182,10 @@ mod tests { let timestamp3 = Utc.with_ymd_and_hms(2020, 1, 1, 1, 1, 1).unwrap(); //2020-01-01T01:01:01Z let timestamp4 = Utc.with_ymd_and_hms(2020, 1, 2, 0, 0, 1).unwrap(); //2020-01-02T00:00:01Z - insert_rdp_raw_event(&store, "src 1", timestamp1.timestamp_nanos()); - insert_rdp_raw_event(&store, "src 1", timestamp2.timestamp_nanos()); - insert_rdp_raw_event(&store, "src 1", timestamp3.timestamp_nanos()); - insert_rdp_raw_event(&store, "src 1", timestamp4.timestamp_nanos()); + insert_rdp_raw_event(&store, "src 1", timestamp1.timestamp_nanos_opt().unwrap()); + insert_rdp_raw_event(&store, "src 1", timestamp2.timestamp_nanos_opt().unwrap()); + insert_rdp_raw_event(&store, "src 1", timestamp3.timestamp_nanos_opt().unwrap()); + insert_rdp_raw_event(&store, "src 1", timestamp4.timestamp_nanos_opt().unwrap()); let query = r#" { @@ -3204,10 +3218,10 @@ mod tests { let timestamp3 = Utc.with_ymd_and_hms(2020, 1, 1, 1, 1, 1).unwrap(); //2020-01-01T01:01:01Z let timestamp4 = Utc.with_ymd_and_hms(2020, 1, 2, 0, 0, 1).unwrap(); //2020-01-02T00:00:01Z - insert_smtp_raw_event(&store, "src 1", timestamp1.timestamp_nanos()); - insert_smtp_raw_event(&store, "src 1", timestamp2.timestamp_nanos()); - insert_smtp_raw_event(&store, "src 1", timestamp3.timestamp_nanos()); - insert_smtp_raw_event(&store, "src 1", timestamp4.timestamp_nanos()); + insert_smtp_raw_event(&store, "src 1", timestamp1.timestamp_nanos_opt().unwrap()); + insert_smtp_raw_event(&store, "src 1", timestamp2.timestamp_nanos_opt().unwrap()); + insert_smtp_raw_event(&store, "src 1", timestamp3.timestamp_nanos_opt().unwrap()); + insert_smtp_raw_event(&store, "src 1", timestamp4.timestamp_nanos_opt().unwrap()); let query = r#" { @@ -3240,10 +3254,10 @@ mod tests { let timestamp3 = Utc.with_ymd_and_hms(2020, 1, 1, 1, 1, 1).unwrap(); //2020-01-01T01:01:01Z let timestamp4 = Utc.with_ymd_and_hms(2020, 1, 2, 0, 0, 1).unwrap(); //2020-01-02T00:00:01Z - insert_ntlm_raw_event(&store, "src 1", timestamp1.timestamp_nanos()); - insert_ntlm_raw_event(&store, "src 1", timestamp2.timestamp_nanos()); - insert_ntlm_raw_event(&store, "src 1", timestamp3.timestamp_nanos()); - insert_ntlm_raw_event(&store, "src 1", timestamp4.timestamp_nanos()); + insert_ntlm_raw_event(&store, "src 1", timestamp1.timestamp_nanos_opt().unwrap()); + insert_ntlm_raw_event(&store, "src 1", timestamp2.timestamp_nanos_opt().unwrap()); + insert_ntlm_raw_event(&store, "src 1", timestamp3.timestamp_nanos_opt().unwrap()); + insert_ntlm_raw_event(&store, "src 1", timestamp4.timestamp_nanos_opt().unwrap()); let query = r#" { @@ -3276,10 +3290,10 @@ mod tests { let timestamp3 = Utc.with_ymd_and_hms(2020, 1, 1, 1, 1, 1).unwrap(); //2020-01-01T01:01:01Z let timestamp4 = Utc.with_ymd_and_hms(2020, 1, 2, 0, 0, 1).unwrap(); //2020-01-02T00:00:01Z - insert_kerberos_raw_event(&store, "src 1", timestamp1.timestamp_nanos()); - insert_kerberos_raw_event(&store, "src 1", timestamp2.timestamp_nanos()); - insert_kerberos_raw_event(&store, "src 1", timestamp3.timestamp_nanos()); - insert_kerberos_raw_event(&store, "src 1", timestamp4.timestamp_nanos()); + insert_kerberos_raw_event(&store, "src 1", timestamp1.timestamp_nanos_opt().unwrap()); + insert_kerberos_raw_event(&store, "src 1", timestamp2.timestamp_nanos_opt().unwrap()); + insert_kerberos_raw_event(&store, "src 1", timestamp3.timestamp_nanos_opt().unwrap()); + insert_kerberos_raw_event(&store, "src 1", timestamp4.timestamp_nanos_opt().unwrap()); let query = r#" { @@ -3312,10 +3326,10 @@ mod tests { let timestamp3 = Utc.with_ymd_and_hms(2020, 1, 1, 1, 1, 1).unwrap(); //2020-01-01T01:01:01Z let timestamp4 = Utc.with_ymd_and_hms(2020, 1, 2, 0, 0, 1).unwrap(); //2020-01-02T00:00:01Z - insert_ssh_raw_event(&store, "src 1", timestamp1.timestamp_nanos()); - insert_ssh_raw_event(&store, "src 1", timestamp2.timestamp_nanos()); - insert_ssh_raw_event(&store, "src 1", timestamp3.timestamp_nanos()); - insert_ssh_raw_event(&store, "src 1", timestamp4.timestamp_nanos()); + insert_ssh_raw_event(&store, "src 1", timestamp1.timestamp_nanos_opt().unwrap()); + insert_ssh_raw_event(&store, "src 1", timestamp2.timestamp_nanos_opt().unwrap()); + insert_ssh_raw_event(&store, "src 1", timestamp3.timestamp_nanos_opt().unwrap()); + insert_ssh_raw_event(&store, "src 1", timestamp4.timestamp_nanos_opt().unwrap()); let query = r#" { @@ -3348,10 +3362,10 @@ mod tests { let timestamp3 = Utc.with_ymd_and_hms(2020, 1, 1, 1, 1, 1).unwrap(); //2020-01-01T01:01:01Z let timestamp4 = Utc.with_ymd_and_hms(2020, 1, 2, 0, 0, 1).unwrap(); //2020-01-02T00:00:01Z - insert_dce_rpc_raw_event(&store, "src 1", timestamp1.timestamp_nanos()); - insert_dce_rpc_raw_event(&store, "src 1", timestamp2.timestamp_nanos()); - insert_dce_rpc_raw_event(&store, "src 1", timestamp3.timestamp_nanos()); - insert_dce_rpc_raw_event(&store, "src 1", timestamp4.timestamp_nanos()); + insert_dce_rpc_raw_event(&store, "src 1", timestamp1.timestamp_nanos_opt().unwrap()); + insert_dce_rpc_raw_event(&store, "src 1", timestamp2.timestamp_nanos_opt().unwrap()); + insert_dce_rpc_raw_event(&store, "src 1", timestamp3.timestamp_nanos_opt().unwrap()); + insert_dce_rpc_raw_event(&store, "src 1", timestamp4.timestamp_nanos_opt().unwrap()); let query = r#" { @@ -3384,10 +3398,10 @@ mod tests { let timestamp3 = Utc.with_ymd_and_hms(2020, 1, 1, 1, 1, 1).unwrap(); //2020-01-01T01:01:01Z let timestamp4 = Utc.with_ymd_and_hms(2020, 1, 2, 0, 0, 1).unwrap(); //2020-01-02T00:00:01Z - insert_ftp_raw_event(&store, "src 1", timestamp1.timestamp_nanos()); - insert_ftp_raw_event(&store, "src 1", timestamp2.timestamp_nanos()); - insert_ftp_raw_event(&store, "src 1", timestamp3.timestamp_nanos()); - insert_ftp_raw_event(&store, "src 1", timestamp4.timestamp_nanos()); + insert_ftp_raw_event(&store, "src 1", timestamp1.timestamp_nanos_opt().unwrap()); + insert_ftp_raw_event(&store, "src 1", timestamp2.timestamp_nanos_opt().unwrap()); + insert_ftp_raw_event(&store, "src 1", timestamp3.timestamp_nanos_opt().unwrap()); + insert_ftp_raw_event(&store, "src 1", timestamp4.timestamp_nanos_opt().unwrap()); let query = r#" { @@ -3420,10 +3434,10 @@ mod tests { let timestamp3 = Utc.with_ymd_and_hms(2020, 1, 1, 1, 1, 1).unwrap(); //2020-01-01T01:01:01Z let timestamp4 = Utc.with_ymd_and_hms(2020, 1, 2, 0, 0, 1).unwrap(); //2020-01-02T00:00:01Z - insert_mqtt_raw_event(&store, "src 1", timestamp1.timestamp_nanos()); - insert_mqtt_raw_event(&store, "src 1", timestamp2.timestamp_nanos()); - insert_mqtt_raw_event(&store, "src 1", timestamp3.timestamp_nanos()); - insert_mqtt_raw_event(&store, "src 1", timestamp4.timestamp_nanos()); + insert_mqtt_raw_event(&store, "src 1", timestamp1.timestamp_nanos_opt().unwrap()); + insert_mqtt_raw_event(&store, "src 1", timestamp2.timestamp_nanos_opt().unwrap()); + insert_mqtt_raw_event(&store, "src 1", timestamp3.timestamp_nanos_opt().unwrap()); + insert_mqtt_raw_event(&store, "src 1", timestamp4.timestamp_nanos_opt().unwrap()); let query = r#" { @@ -3456,10 +3470,10 @@ mod tests { let timestamp3 = Utc.with_ymd_and_hms(2020, 1, 1, 1, 1, 1).unwrap(); //2020-01-01T01:01:01Z let timestamp4 = Utc.with_ymd_and_hms(2020, 1, 2, 0, 0, 1).unwrap(); //2020-01-02T00:00:01Z - insert_ldap_raw_event(&store, "src 1", timestamp1.timestamp_nanos()); - insert_ldap_raw_event(&store, "src 1", timestamp2.timestamp_nanos()); - insert_ldap_raw_event(&store, "src 1", timestamp3.timestamp_nanos()); - insert_ldap_raw_event(&store, "src 1", timestamp4.timestamp_nanos()); + insert_ldap_raw_event(&store, "src 1", timestamp1.timestamp_nanos_opt().unwrap()); + insert_ldap_raw_event(&store, "src 1", timestamp2.timestamp_nanos_opt().unwrap()); + insert_ldap_raw_event(&store, "src 1", timestamp3.timestamp_nanos_opt().unwrap()); + insert_ldap_raw_event(&store, "src 1", timestamp4.timestamp_nanos_opt().unwrap()); let query = r#" { @@ -3492,10 +3506,10 @@ mod tests { let timestamp3 = Utc.with_ymd_and_hms(2020, 1, 1, 1, 1, 1).unwrap(); //2020-01-01T01:01:01Z let timestamp4 = Utc.with_ymd_and_hms(2020, 1, 2, 0, 0, 1).unwrap(); //2020-01-02T00:00:01Z - insert_tls_raw_event(&store, "src 1", timestamp1.timestamp_nanos()); - insert_tls_raw_event(&store, "src 1", timestamp2.timestamp_nanos()); - insert_tls_raw_event(&store, "src 1", timestamp3.timestamp_nanos()); - insert_tls_raw_event(&store, "src 1", timestamp4.timestamp_nanos()); + insert_tls_raw_event(&store, "src 1", timestamp1.timestamp_nanos_opt().unwrap()); + insert_tls_raw_event(&store, "src 1", timestamp2.timestamp_nanos_opt().unwrap()); + insert_tls_raw_event(&store, "src 1", timestamp3.timestamp_nanos_opt().unwrap()); + insert_tls_raw_event(&store, "src 1", timestamp4.timestamp_nanos_opt().unwrap()); let query = r#" { @@ -3528,10 +3542,10 @@ mod tests { let timestamp3 = Utc.with_ymd_and_hms(2020, 1, 1, 1, 1, 1).unwrap(); //2020-01-01T01:01:01Z let timestamp4 = Utc.with_ymd_and_hms(2020, 1, 2, 0, 0, 1).unwrap(); //2020-01-02T00:00:01Z - insert_smb_raw_event(&store, "src 1", timestamp1.timestamp_nanos()); - insert_smb_raw_event(&store, "src 1", timestamp2.timestamp_nanos()); - insert_smb_raw_event(&store, "src 1", timestamp3.timestamp_nanos()); - insert_smb_raw_event(&store, "src 1", timestamp4.timestamp_nanos()); + insert_smb_raw_event(&store, "src 1", timestamp1.timestamp_nanos_opt().unwrap()); + insert_smb_raw_event(&store, "src 1", timestamp2.timestamp_nanos_opt().unwrap()); + insert_smb_raw_event(&store, "src 1", timestamp3.timestamp_nanos_opt().unwrap()); + insert_smb_raw_event(&store, "src 1", timestamp4.timestamp_nanos_opt().unwrap()); let query = r#" { @@ -3564,10 +3578,10 @@ mod tests { let timestamp3 = Utc.with_ymd_and_hms(2020, 1, 1, 1, 1, 1).unwrap(); //2020-01-01T01:01:01Z let timestamp4 = Utc.with_ymd_and_hms(2020, 1, 2, 0, 0, 1).unwrap(); //2020-01-02T00:00:01Z - insert_nfs_raw_event(&store, "src 1", timestamp1.timestamp_nanos()); - insert_nfs_raw_event(&store, "src 1", timestamp2.timestamp_nanos()); - insert_nfs_raw_event(&store, "src 1", timestamp3.timestamp_nanos()); - insert_nfs_raw_event(&store, "src 1", timestamp4.timestamp_nanos()); + insert_nfs_raw_event(&store, "src 1", timestamp1.timestamp_nanos_opt().unwrap()); + insert_nfs_raw_event(&store, "src 1", timestamp2.timestamp_nanos_opt().unwrap()); + insert_nfs_raw_event(&store, "src 1", timestamp3.timestamp_nanos_opt().unwrap()); + insert_nfs_raw_event(&store, "src 1", timestamp4.timestamp_nanos_opt().unwrap()); let query = r#" { diff --git a/src/graphql/packet.rs b/src/graphql/packet.rs index 917be14a..b1ee7ecf 100644 --- a/src/graphql/packet.rs +++ b/src/graphql/packet.rs @@ -29,7 +29,12 @@ impl KeyExtractor for PacketFilter { } fn get_mid_key(&self) -> Option> { - Some(self.request_time.timestamp_nanos().to_be_bytes().to_vec()) + Some( + self.request_time + .timestamp_nanos_opt()? + .to_be_bytes() + .to_vec(), + ) } fn get_range_end_key(&self) -> (Option>, Option>) { @@ -176,9 +181,9 @@ mod tests { let dt2 = Utc.with_ymd_and_hms(2023, 1, 20, 0, 0, 1).unwrap(); let dt3 = Utc.with_ymd_and_hms(2023, 1, 20, 0, 0, 2).unwrap(); - let ts1 = dt1.timestamp_nanos(); - let ts2 = dt2.timestamp_nanos(); - let ts3 = dt3.timestamp_nanos(); + let ts1 = dt1.timestamp_nanos_opt().unwrap(); + let ts2 = dt2.timestamp_nanos_opt().unwrap(); + let ts3 = dt3.timestamp_nanos_opt().unwrap(); insert_packet(&store, "src 1", ts1, ts1); insert_packet(&store, "src 1", ts1, ts2); @@ -261,9 +266,9 @@ mod tests { let dt2 = Utc.with_ymd_and_hms(2023, 1, 20, 0, 0, 1).unwrap(); let dt3 = Utc.with_ymd_and_hms(2023, 1, 20, 0, 0, 2).unwrap(); - let ts1 = dt1.timestamp_nanos(); - let ts2 = dt2.timestamp_nanos(); - let ts3 = dt3.timestamp_nanos(); + let ts1 = dt1.timestamp_nanos_opt().unwrap(); + let ts2 = dt2.timestamp_nanos_opt().unwrap(); + let ts3 = dt3.timestamp_nanos_opt().unwrap(); insert_packet(&store, "src 1", ts1, ts1); insert_packet(&store, "src 1", ts1, ts2); diff --git a/src/graphql/statistics.rs b/src/graphql/statistics.rs index ea5cbc9f..e07a0e86 100644 --- a/src/graphql/statistics.rs +++ b/src/graphql/statistics.rs @@ -271,7 +271,7 @@ mod tests { async fn test_statistics() { let schema = TestSchema::new(); let store = schema.db.statistics_store().unwrap(); - let now = Utc::now().timestamp_nanos(); + let now = Utc::now().timestamp_nanos_opt().unwrap(); insert_statistics_raw_event(&store, now, "src 1", 0, 600, 1000000, 300000000); insert_statistics_raw_event(&store, now, "src 1", 1, 600, 2000000, 600000000); insert_statistics_raw_event(&store, now, "src 1", 2, 600, 3000000, 900000000); diff --git a/src/ingest/tests.rs b/src/ingest/tests.rs index 2850f674..7e465ff6 100644 --- a/src/ingest/tests.rs +++ b/src/ingest/tests.rs @@ -189,9 +189,13 @@ async fn conn() { send_record_header(&mut send_conn, RECORD_TYPE_CONN) .await .unwrap(); - send_event(&mut send_conn, Utc::now().timestamp_nanos(), conn_body) - .await - .unwrap(); + send_event( + &mut send_conn, + Utc::now().timestamp_nanos_opt().unwrap(), + conn_body, + ) + .await + .unwrap(); send_conn.finish().await.expect("failed to shutdown stream"); @@ -234,9 +238,13 @@ async fn dns() { send_record_header(&mut send_dns, RECORD_TYPE_DNS) .await .unwrap(); - send_event(&mut send_dns, Utc::now().timestamp_nanos(), dns_body) - .await - .unwrap(); + send_event( + &mut send_dns, + Utc::now().timestamp_nanos_opt().unwrap(), + dns_body, + ) + .await + .unwrap(); send_dns.finish().await.expect("failed to shutdown stream"); @@ -263,9 +271,13 @@ async fn log() { send_record_header(&mut send_log, RECORD_TYPE_LOG) .await .unwrap(); - send_event(&mut send_log, Utc::now().timestamp_nanos(), log_body) - .await - .unwrap(); + send_event( + &mut send_log, + Utc::now().timestamp_nanos_opt().unwrap(), + log_body, + ) + .await + .unwrap(); send_log.finish().await.expect("failed to shutdown stream"); @@ -315,9 +327,13 @@ async fn http() { send_record_header(&mut send_http, RECORD_TYPE_HTTP) .await .unwrap(); - send_event(&mut send_http, Utc::now().timestamp_nanos(), http_body) - .await - .unwrap(); + send_event( + &mut send_http, + Utc::now().timestamp_nanos_opt().unwrap(), + http_body, + ) + .await + .unwrap(); send_http.finish().await.expect("failed to shutdown stream"); @@ -348,9 +364,13 @@ async fn rdp() { send_record_header(&mut send_rdp, RECORD_TYPE_RDP) .await .unwrap(); - send_event(&mut send_rdp, Utc::now().timestamp_nanos(), rdp_body) - .await - .unwrap(); + send_event( + &mut send_rdp, + Utc::now().timestamp_nanos_opt().unwrap(), + rdp_body, + ) + .await + .unwrap(); send_rdp.finish().await.expect("failed to shutdown stream"); @@ -382,7 +402,7 @@ async fn periodic_time_series() { .unwrap(); send_event( &mut send_periodic_time_series, - Utc::now().timestamp_nanos(), + Utc::now().timestamp_nanos_opt().unwrap(), periodic_time_series_body, ) .await @@ -425,9 +445,13 @@ async fn smtp() { send_record_header(&mut send_smtp, RECORD_TYPE_SMTP) .await .unwrap(); - send_event(&mut send_smtp, Utc::now().timestamp_nanos(), smtp_body) - .await - .unwrap(); + send_event( + &mut send_smtp, + Utc::now().timestamp_nanos_opt().unwrap(), + smtp_body, + ) + .await + .unwrap(); send_smtp.finish().await.expect("failed to shutdown stream"); @@ -464,9 +488,13 @@ async fn ntlm() { send_record_header(&mut send_ntlm, RECORD_TYPE_NTLM) .await .unwrap(); - send_event(&mut send_ntlm, Utc::now().timestamp_nanos(), ntlm_body) - .await - .unwrap(); + send_event( + &mut send_ntlm, + Utc::now().timestamp_nanos_opt().unwrap(), + ntlm_body, + ) + .await + .unwrap(); send_ntlm.finish().await.expect("failed to shutdown stream"); @@ -510,7 +538,7 @@ async fn kerberos() { .unwrap(); send_event( &mut send_kerberos, - Utc::now().timestamp_nanos(), + Utc::now().timestamp_nanos_opt().unwrap(), kerberos_body, ) .await @@ -559,9 +587,13 @@ async fn ssh() { send_record_header(&mut send_ssh, RECORD_TYPE_SSH) .await .unwrap(); - send_event(&mut send_ssh, Utc::now().timestamp_nanos(), ssh_body) - .await - .unwrap(); + send_event( + &mut send_ssh, + Utc::now().timestamp_nanos_opt().unwrap(), + ssh_body, + ) + .await + .unwrap(); send_ssh.finish().await.expect("failed to shutdown stream"); @@ -597,7 +629,7 @@ async fn dce_rpc() { .unwrap(); send_event( &mut send_dce_rpc, - Utc::now().timestamp_nanos(), + Utc::now().timestamp_nanos_opt().unwrap(), dce_rpc_body, ) .await @@ -632,9 +664,13 @@ async fn oplog() { send_record_header(&mut send_oplog, RECORD_TYPE_OPLOG) .await .unwrap(); - send_event(&mut send_oplog, Utc::now().timestamp_nanos(), oplog_body) - .await - .unwrap(); + send_event( + &mut send_oplog, + Utc::now().timestamp_nanos_opt().unwrap(), + oplog_body, + ) + .await + .unwrap(); send_oplog .finish() @@ -658,16 +694,20 @@ async fn packet() { let packet: Vec = vec![0, 1, 0, 1, 0, 1]; let packet_body = Packet { - packet_timestamp: Utc::now().timestamp_nanos(), + packet_timestamp: Utc::now().timestamp_nanos_opt().unwrap(), packet, }; send_record_header(&mut send_packet, RECORD_TYPE_PACKET) .await .unwrap(); - send_event(&mut send_packet, Utc::now().timestamp_nanos(), packet_body) - .await - .unwrap(); + send_event( + &mut send_packet, + Utc::now().timestamp_nanos_opt().unwrap(), + packet_body, + ) + .await + .unwrap(); send_packet .finish() @@ -712,9 +752,13 @@ async fn ftp() { send_record_header(&mut send_ftp, RECORD_TYPE_FTP) .await .unwrap(); - send_event(&mut send_ftp, Utc::now().timestamp_nanos(), ftp_body) - .await - .unwrap(); + send_event( + &mut send_ftp, + Utc::now().timestamp_nanos_opt().unwrap(), + ftp_body, + ) + .await + .unwrap(); send_ftp.finish().await.expect("failed to shutdown stream"); @@ -750,9 +794,13 @@ async fn mqtt() { send_record_header(&mut send_mqtt, RECORD_TYPE_MQTT) .await .unwrap(); - send_event(&mut send_mqtt, Utc::now().timestamp_nanos(), mqtt_body) - .await - .unwrap(); + send_event( + &mut send_mqtt, + Utc::now().timestamp_nanos_opt().unwrap(), + mqtt_body, + ) + .await + .unwrap(); send_mqtt.finish().await.expect("failed to shutdown stream"); @@ -789,9 +837,13 @@ async fn ldap() { send_record_header(&mut send_ldap, RECORD_TYPE_LDAP) .await .unwrap(); - send_event(&mut send_ldap, Utc::now().timestamp_nanos(), ldap_body) - .await - .unwrap(); + send_event( + &mut send_ldap, + Utc::now().timestamp_nanos_opt().unwrap(), + ldap_body, + ) + .await + .unwrap(); send_ldap.finish().await.expect("failed to shutdown stream"); @@ -839,9 +891,13 @@ async fn tls() { send_record_header(&mut send_tls, RECORD_TYPE_TLS) .await .unwrap(); - send_event(&mut send_tls, Utc::now().timestamp_nanos(), tls_body) - .await - .unwrap(); + send_event( + &mut send_tls, + Utc::now().timestamp_nanos_opt().unwrap(), + tls_body, + ) + .await + .unwrap(); send_tls.finish().await.expect("failed to shutdown stream"); @@ -882,9 +938,13 @@ async fn smb() { send_record_header(&mut send_smb, RECORD_TYPE_SMB) .await .unwrap(); - send_event(&mut send_smb, Utc::now().timestamp_nanos(), smb_body) - .await - .unwrap(); + send_event( + &mut send_smb, + Utc::now().timestamp_nanos_opt().unwrap(), + smb_body, + ) + .await + .unwrap(); send_smb.finish().await.expect("failed to shutdown stream"); @@ -916,9 +976,13 @@ async fn nfs() { send_record_header(&mut send_nfs, RECORD_TYPE_NFS) .await .unwrap(); - send_event(&mut send_nfs, Utc::now().timestamp_nanos(), nfs_body) - .await - .unwrap(); + send_event( + &mut send_nfs, + Utc::now().timestamp_nanos_opt().unwrap(), + nfs_body, + ) + .await + .unwrap(); send_nfs.finish().await.expect("failed to shutdown stream"); @@ -947,7 +1011,7 @@ async fn statistics() { .unwrap(); send_event( &mut send_statistics, - Utc::now().timestamp_nanos(), + Utc::now().timestamp_nanos_opt().unwrap(), statistics_body, ) .await @@ -981,9 +1045,13 @@ async fn ack_info() { send_record_header(&mut send_log, RECORD_TYPE_LOG) .await .unwrap(); - send_event(&mut send_log, Utc::now().timestamp_nanos(), log_body) - .await - .unwrap(); + send_event( + &mut send_log, + Utc::now().timestamp_nanos_opt().unwrap(), + log_body, + ) + .await + .unwrap(); let mut last_timestamp: i64 = 0; for _ in 0..127 { @@ -992,7 +1060,7 @@ async fn ack_info() { log: vec![0; 10], }; - last_timestamp = Utc::now().timestamp_nanos(); + last_timestamp = Utc::now().timestamp_nanos_opt().unwrap(); send_event(&mut send_log, last_timestamp, log_body) .await .unwrap(); diff --git a/src/publish/tests.rs b/src/publish/tests.rs index da75128e..53cb0e07 100644 --- a/src/publish/tests.rs +++ b/src/publish/tests.rs @@ -701,7 +701,7 @@ async fn request_range_data_with_protocol() { let (mut send_pub_req, mut recv_pub_resp) = publish.conn.open_bi().await.expect("failed to open stream"); let conn_store = db.conn_store().unwrap(); - let send_conn_time = Utc::now().timestamp_nanos(); + let send_conn_time = Utc::now().timestamp_nanos_opt().unwrap(); let conn_data = bincode::deserialize::(&insert_conn_raw_event( &conn_store, SOURCE, @@ -726,8 +726,8 @@ async fn request_range_data_with_protocol() { let message = RequestRange { source: String::from(SOURCE), kind: String::from(CONN_KIND), - start: start.timestamp_nanos(), - end: end.timestamp_nanos(), + start: start.timestamp_nanos_opt().unwrap(), + end: end.timestamp_nanos_opt().unwrap(), count: 5, }; @@ -765,7 +765,7 @@ async fn request_range_data_with_protocol() { let (mut send_pub_req, mut recv_pub_resp) = publish.conn.open_bi().await.expect("failed to open stream"); let dns_store = db.dns_store().unwrap(); - let send_dns_time = Utc::now().timestamp_nanos(); + let send_dns_time = Utc::now().timestamp_nanos_opt().unwrap(); let dns_data = bincode::deserialize::(&insert_dns_raw_event(&dns_store, SOURCE, send_dns_time)) .unwrap(); @@ -787,8 +787,8 @@ async fn request_range_data_with_protocol() { let message = RequestRange { source: String::from(SOURCE), kind: String::from(DNS_KIND), - start: start.timestamp_nanos(), - end: end.timestamp_nanos(), + start: start.timestamp_nanos_opt().unwrap(), + end: end.timestamp_nanos_opt().unwrap(), count: 5, }; @@ -826,7 +826,7 @@ async fn request_range_data_with_protocol() { let (mut send_pub_req, mut recv_pub_resp) = publish.conn.open_bi().await.expect("failed to open stream"); let http_store = db.http_store().unwrap(); - let send_http_time = Utc::now().timestamp_nanos(); + let send_http_time = Utc::now().timestamp_nanos_opt().unwrap(); let http_data = bincode::deserialize::(&insert_http_raw_event( &http_store, SOURCE, @@ -851,8 +851,8 @@ async fn request_range_data_with_protocol() { let message = RequestRange { source: String::from(SOURCE), kind: String::from(HTTP_KIND), - start: start.timestamp_nanos(), - end: end.timestamp_nanos(), + start: start.timestamp_nanos_opt().unwrap(), + end: end.timestamp_nanos_opt().unwrap(), count: 5, }; @@ -890,7 +890,7 @@ async fn request_range_data_with_protocol() { let (mut send_pub_req, mut recv_pub_resp) = publish.conn.open_bi().await.expect("failed to open stream"); let rdp_store = db.rdp_store().unwrap(); - let send_rdp_time = Utc::now().timestamp_nanos(); + let send_rdp_time = Utc::now().timestamp_nanos_opt().unwrap(); let rdp_data = bincode::deserialize::(&insert_rdp_raw_event(&rdp_store, SOURCE, send_rdp_time)) .unwrap(); @@ -912,8 +912,8 @@ async fn request_range_data_with_protocol() { let message = RequestRange { source: String::from(SOURCE), kind: String::from(RDP_KIND), - start: start.timestamp_nanos(), - end: end.timestamp_nanos(), + start: start.timestamp_nanos_opt().unwrap(), + end: end.timestamp_nanos_opt().unwrap(), count: 5, }; @@ -951,7 +951,7 @@ async fn request_range_data_with_protocol() { let (mut send_pub_req, mut recv_pub_resp) = publish.conn.open_bi().await.expect("failed to open stream"); let smtp_store = db.smtp_store().unwrap(); - let send_smtp_time = Utc::now().timestamp_nanos(); + let send_smtp_time = Utc::now().timestamp_nanos_opt().unwrap(); let smtp_data = bincode::deserialize::(&insert_smtp_raw_event( &smtp_store, SOURCE, @@ -976,8 +976,8 @@ async fn request_range_data_with_protocol() { let message = RequestRange { source: String::from(SOURCE), kind: String::from(SMTP_KIND), - start: start.timestamp_nanos(), - end: end.timestamp_nanos(), + start: start.timestamp_nanos_opt().unwrap(), + end: end.timestamp_nanos_opt().unwrap(), count: 5, }; @@ -1015,7 +1015,7 @@ async fn request_range_data_with_protocol() { let (mut send_pub_req, mut recv_pub_resp) = publish.conn.open_bi().await.expect("failed to open stream"); let ntlm_store = db.ntlm_store().unwrap(); - let send_ntlm_time = Utc::now().timestamp_nanos(); + let send_ntlm_time = Utc::now().timestamp_nanos_opt().unwrap(); let ntlm_data = bincode::deserialize::(&insert_ntlm_raw_event( &ntlm_store, SOURCE, @@ -1040,8 +1040,8 @@ async fn request_range_data_with_protocol() { let message = RequestRange { source: String::from(SOURCE), kind: String::from(NTLM_KIND), - start: start.timestamp_nanos(), - end: end.timestamp_nanos(), + start: start.timestamp_nanos_opt().unwrap(), + end: end.timestamp_nanos_opt().unwrap(), count: 5, }; @@ -1079,7 +1079,7 @@ async fn request_range_data_with_protocol() { let (mut send_pub_req, mut recv_pub_resp) = publish.conn.open_bi().await.expect("failed to open stream"); let kerberos_store = db.kerberos_store().unwrap(); - let send_kerberos_time = Utc::now().timestamp_nanos(); + let send_kerberos_time = Utc::now().timestamp_nanos_opt().unwrap(); let kerberos_data = bincode::deserialize::(&insert_kerberos_raw_event( &kerberos_store, SOURCE, @@ -1104,8 +1104,8 @@ async fn request_range_data_with_protocol() { let message = RequestRange { source: String::from(SOURCE), kind: String::from(KERBEROS_KIND), - start: start.timestamp_nanos(), - end: end.timestamp_nanos(), + start: start.timestamp_nanos_opt().unwrap(), + end: end.timestamp_nanos_opt().unwrap(), count: 5, }; send_range_data_request(&mut send_pub_req, PUBLISH_RANGE_MESSAGE_CODE, message) @@ -1144,7 +1144,7 @@ async fn request_range_data_with_protocol() { let (mut send_pub_req, mut recv_pub_resp) = publish.conn.open_bi().await.expect("failed to open stream"); let ssh_store = db.ssh_store().unwrap(); - let send_ssh_time = Utc::now().timestamp_nanos(); + let send_ssh_time = Utc::now().timestamp_nanos_opt().unwrap(); let ssh_data = bincode::deserialize::(&insert_ssh_raw_event(&ssh_store, SOURCE, send_ssh_time)) .unwrap(); @@ -1166,8 +1166,8 @@ async fn request_range_data_with_protocol() { let message = RequestRange { source: String::from(SOURCE), kind: String::from(SSH_KIND), - start: start.timestamp_nanos(), - end: end.timestamp_nanos(), + start: start.timestamp_nanos_opt().unwrap(), + end: end.timestamp_nanos_opt().unwrap(), count: 5, }; @@ -1205,7 +1205,7 @@ async fn request_range_data_with_protocol() { let (mut send_pub_req, mut recv_pub_resp) = publish.conn.open_bi().await.expect("failed to open stream"); let dce_rpc_store = db.dce_rpc_store().unwrap(); - let send_dce_rpc_time = Utc::now().timestamp_nanos(); + let send_dce_rpc_time = Utc::now().timestamp_nanos_opt().unwrap(); let dce_rpc_data = bincode::deserialize::(&insert_dce_rpc_raw_event( &dce_rpc_store, SOURCE, @@ -1230,8 +1230,8 @@ async fn request_range_data_with_protocol() { let message = RequestRange { source: String::from(SOURCE), kind: String::from(DCE_RPC_KIND), - start: start.timestamp_nanos(), - end: end.timestamp_nanos(), + start: start.timestamp_nanos_opt().unwrap(), + end: end.timestamp_nanos_opt().unwrap(), count: 5, }; @@ -1271,7 +1271,7 @@ async fn request_range_data_with_protocol() { let (mut send_pub_req, mut recv_pub_resp) = publish.conn.open_bi().await.expect("failed to open stream"); let ftp_store = db.ftp_store().unwrap(); - let send_ftp_time = Utc::now().timestamp_nanos(); + let send_ftp_time = Utc::now().timestamp_nanos_opt().unwrap(); let ftp_data = bincode::deserialize::(&insert_ftp_raw_event(&ftp_store, SOURCE, send_ftp_time)) .unwrap(); @@ -1293,8 +1293,8 @@ async fn request_range_data_with_protocol() { let message = RequestRange { source: String::from(SOURCE), kind: String::from(FTP_KIND), - start: start.timestamp_nanos(), - end: end.timestamp_nanos(), + start: start.timestamp_nanos_opt().unwrap(), + end: end.timestamp_nanos_opt().unwrap(), count: 5, }; @@ -1332,7 +1332,7 @@ async fn request_range_data_with_protocol() { let (mut send_pub_req, mut recv_pub_resp) = publish.conn.open_bi().await.expect("failed to open stream"); let mqtt_store = db.mqtt_store().unwrap(); - let send_mqtt_time = Utc::now().timestamp_nanos(); + let send_mqtt_time = Utc::now().timestamp_nanos_opt().unwrap(); let mqtt_data = bincode::deserialize::(&insert_mqtt_raw_event( &mqtt_store, SOURCE, @@ -1357,8 +1357,8 @@ async fn request_range_data_with_protocol() { let message = RequestRange { source: String::from(SOURCE), kind: String::from(MQTT_KIND), - start: start.timestamp_nanos(), - end: end.timestamp_nanos(), + start: start.timestamp_nanos_opt().unwrap(), + end: end.timestamp_nanos_opt().unwrap(), count: 5, }; @@ -1396,7 +1396,7 @@ async fn request_range_data_with_protocol() { let (mut send_pub_req, mut recv_pub_resp) = publish.conn.open_bi().await.expect("failed to open stream"); let ldap_store = db.ldap_store().unwrap(); - let send_ldap_time = Utc::now().timestamp_nanos(); + let send_ldap_time = Utc::now().timestamp_nanos_opt().unwrap(); let ldap_data = bincode::deserialize::(&insert_ldap_raw_event( &ldap_store, SOURCE, @@ -1421,8 +1421,8 @@ async fn request_range_data_with_protocol() { let message = RequestRange { source: String::from(SOURCE), kind: String::from(LDAP_KIND), - start: start.timestamp_nanos(), - end: end.timestamp_nanos(), + start: start.timestamp_nanos_opt().unwrap(), + end: end.timestamp_nanos_opt().unwrap(), count: 5, }; @@ -1460,7 +1460,7 @@ async fn request_range_data_with_protocol() { let (mut send_pub_req, mut recv_pub_resp) = publish.conn.open_bi().await.expect("failed to open stream"); let tls_store = db.tls_store().unwrap(); - let send_tls_time = Utc::now().timestamp_nanos(); + let send_tls_time = Utc::now().timestamp_nanos_opt().unwrap(); let tls_data = bincode::deserialize::(&insert_tls_raw_event(&tls_store, SOURCE, send_tls_time)) .unwrap(); @@ -1482,8 +1482,8 @@ async fn request_range_data_with_protocol() { let message = RequestRange { source: String::from(SOURCE), kind: String::from(TLS_KIND), - start: start.timestamp_nanos(), - end: end.timestamp_nanos(), + start: start.timestamp_nanos_opt().unwrap(), + end: end.timestamp_nanos_opt().unwrap(), count: 5, }; @@ -1521,7 +1521,7 @@ async fn request_range_data_with_protocol() { let (mut send_pub_req, mut recv_pub_resp) = publish.conn.open_bi().await.expect("failed to open stream"); let smb_store = db.smb_store().unwrap(); - let send_smb_time = Utc::now().timestamp_nanos(); + let send_smb_time = Utc::now().timestamp_nanos_opt().unwrap(); let smb_data = bincode::deserialize::(&insert_smb_raw_event(&smb_store, SOURCE, send_smb_time)) .unwrap(); @@ -1543,8 +1543,8 @@ async fn request_range_data_with_protocol() { let message = RequestRange { source: String::from(SOURCE), kind: String::from(SMB_KIND), - start: start.timestamp_nanos(), - end: end.timestamp_nanos(), + start: start.timestamp_nanos_opt().unwrap(), + end: end.timestamp_nanos_opt().unwrap(), count: 5, }; @@ -1582,7 +1582,7 @@ async fn request_range_data_with_protocol() { let (mut send_pub_req, mut recv_pub_resp) = publish.conn.open_bi().await.expect("failed to open stream"); let nfs_store = db.nfs_store().unwrap(); - let send_nfs_time = Utc::now().timestamp_nanos(); + let send_nfs_time = Utc::now().timestamp_nanos_opt().unwrap(); let nfs_data = bincode::deserialize::(&insert_nfs_raw_event(&nfs_store, SOURCE, send_nfs_time)) .unwrap(); @@ -1604,8 +1604,8 @@ async fn request_range_data_with_protocol() { let message = RequestRange { source: String::from(SOURCE), kind: String::from(NFS_KIND), - start: start.timestamp_nanos(), - end: end.timestamp_nanos(), + start: start.timestamp_nanos_opt().unwrap(), + end: end.timestamp_nanos_opt().unwrap(), count: 5, }; @@ -1673,7 +1673,7 @@ async fn request_range_data_with_log() { publish.conn.open_bi().await.expect("failed to open stream"); let log_store = db.log_store().unwrap(); - let send_log_time = Utc::now().timestamp_nanos(); + let send_log_time = Utc::now().timestamp_nanos_opt().unwrap(); let log_data = bincode::deserialize::(&insert_log_raw_event( &log_store, SOURCE, @@ -1699,8 +1699,8 @@ async fn request_range_data_with_log() { let message = RequestRange { source: String::from(SOURCE), kind: String::from(KIND), - start: start.timestamp_nanos(), - end: end.timestamp_nanos(), + start: start.timestamp_nanos_opt().unwrap(), + end: end.timestamp_nanos_opt().unwrap(), count: 5, }; @@ -1755,7 +1755,7 @@ async fn request_range_data_with_period_time_series() { publish.conn.open_bi().await.expect("failed to open stream"); let time_series_store = db.periodic_time_series_store().unwrap(); - let send_time_series_time = Utc::now().timestamp_nanos(); + let send_time_series_time = Utc::now().timestamp_nanos_opt().unwrap(); let time_series_data = bincode::deserialize::(&insert_periodic_time_series_raw_event( &time_series_store, @@ -1781,8 +1781,8 @@ async fn request_range_data_with_period_time_series() { let message = RequestRange { source: String::from(SAMPLING_POLICY_ID), kind: String::from(KIND), - start: start.timestamp_nanos(), - end: end.timestamp_nanos(), + start: start.timestamp_nanos_opt().unwrap(), + end: end.timestamp_nanos_opt().unwrap(), count: 5, }; @@ -1894,7 +1894,7 @@ async fn request_network_event_stream() { .unwrap(); assert_eq!(conn_start_msg, NETWORK_STREAM_CONN); - let send_conn_time = Utc::now().timestamp_nanos(); + let send_conn_time = Utc::now().timestamp_nanos_opt().unwrap(); let key = NetworkKey::new(SOURCE_HOG_ONE, "conn"); let conn_data = gen_conn_raw_event(); send_direct_stream( @@ -1912,7 +1912,7 @@ async fn request_network_event_stream() { .unwrap(); assert_eq!(conn_data, recv_data[20..]); - let send_conn_time = Utc::now().timestamp_nanos(); + let send_conn_time = Utc::now().timestamp_nanos_opt().unwrap(); let key = NetworkKey::new(SOURCE_HOG_TWO, "conn"); let conn_data = gen_conn_raw_event(); send_direct_stream( @@ -1930,7 +1930,7 @@ async fn request_network_event_stream() { assert_eq!(conn_data, recv_data[20..]); // database conn network event for crusher - let send_conn_time = Utc::now().timestamp_nanos(); + let send_conn_time = Utc::now().timestamp_nanos_opt().unwrap(); let conn_data = insert_conn_raw_event(&conn_store, SOURCE_CRUSHER_THREE, send_conn_time); send_stream_request( &mut publish.send, @@ -1957,7 +1957,7 @@ async fn request_network_event_stream() { assert_eq!(conn_data, recv_data); // direct conn network event for crusher - let send_conn_time = Utc::now().timestamp_nanos(); + let send_conn_time = Utc::now().timestamp_nanos_opt().unwrap(); let key = NetworkKey::new(SOURCE_CRUSHER_THREE, "conn"); let conn_data = gen_conn_raw_event(); @@ -1999,7 +1999,7 @@ async fn request_network_event_stream() { .unwrap(); assert_eq!(dns_start_msg, NETWORK_STREAM_DNS); - let send_dns_time = Utc::now().timestamp_nanos(); + let send_dns_time = Utc::now().timestamp_nanos_opt().unwrap(); let key = NetworkKey::new(SOURCE_HOG_ONE, "dns"); let dns_data = gen_conn_raw_event(); send_direct_stream( @@ -2017,7 +2017,7 @@ async fn request_network_event_stream() { .unwrap(); assert_eq!(dns_data, recv_data[20..]); - let send_dns_time = Utc::now().timestamp_nanos(); + let send_dns_time = Utc::now().timestamp_nanos_opt().unwrap(); let key = NetworkKey::new(SOURCE_HOG_TWO, "dns"); let dns_data = gen_conn_raw_event(); send_direct_stream( @@ -2036,7 +2036,7 @@ async fn request_network_event_stream() { assert_eq!(dns_data, recv_data[20..]); // database dns network event for crusher - let send_dns_time = Utc::now().timestamp_nanos(); + let send_dns_time = Utc::now().timestamp_nanos_opt().unwrap(); let dns_data = insert_dns_raw_event(&dns_store, SOURCE_CRUSHER_THREE, send_dns_time); send_stream_request( @@ -2064,7 +2064,7 @@ async fn request_network_event_stream() { assert_eq!(dns_data, recv_data); // direct dns network event for crusher - let send_dns_time = Utc::now().timestamp_nanos(); + let send_dns_time = Utc::now().timestamp_nanos_opt().unwrap(); let key = NetworkKey::new(SOURCE_CRUSHER_THREE, "dns"); let dns_data = gen_dns_raw_event(); @@ -2106,7 +2106,7 @@ async fn request_network_event_stream() { .unwrap(); assert_eq!(rdp_start_msg, NETWORK_STREAM_RDP); - let send_rdp_time = Utc::now().timestamp_nanos(); + let send_rdp_time = Utc::now().timestamp_nanos_opt().unwrap(); let key = NetworkKey::new(SOURCE_HOG_ONE, "rdp"); let rdp_data = gen_conn_raw_event(); send_direct_stream( @@ -2124,7 +2124,7 @@ async fn request_network_event_stream() { .unwrap(); assert_eq!(rdp_data, recv_data[20..]); - let send_rdp_time = Utc::now().timestamp_nanos(); + let send_rdp_time = Utc::now().timestamp_nanos_opt().unwrap(); let key = NetworkKey::new(SOURCE_HOG_TWO, "rdp"); let rdp_data = gen_conn_raw_event(); send_direct_stream( @@ -2143,7 +2143,7 @@ async fn request_network_event_stream() { assert_eq!(rdp_data, recv_data[20..]); // database rdp network event for crusher - let send_rdp_time = Utc::now().timestamp_nanos(); + let send_rdp_time = Utc::now().timestamp_nanos_opt().unwrap(); let rdp_data = insert_rdp_raw_event(&rdp_store, SOURCE_CRUSHER_THREE, send_rdp_time); send_stream_request( @@ -2171,7 +2171,7 @@ async fn request_network_event_stream() { assert_eq!(rdp_data, recv_data); // direct rdp network event for crusher - let send_rdp_time = Utc::now().timestamp_nanos(); + let send_rdp_time = Utc::now().timestamp_nanos_opt().unwrap(); let key = NetworkKey::new(SOURCE_CRUSHER_THREE, "rdp"); let rdp_data = gen_rdp_raw_event(); send_direct_stream( @@ -2213,7 +2213,7 @@ async fn request_network_event_stream() { .unwrap(); assert_eq!(http_start_msg, NETWORK_STREAM_HTTP); - let send_http_time = Utc::now().timestamp_nanos(); + let send_http_time = Utc::now().timestamp_nanos_opt().unwrap(); let key = NetworkKey::new(SOURCE_HOG_ONE, "http"); let http_data = gen_conn_raw_event(); @@ -2232,7 +2232,7 @@ async fn request_network_event_stream() { .unwrap(); assert_eq!(http_data, recv_data[20..]); - let send_http_time = Utc::now().timestamp_nanos(); + let send_http_time = Utc::now().timestamp_nanos_opt().unwrap(); let key = NetworkKey::new(SOURCE_HOG_TWO, "http"); let http_data = gen_conn_raw_event(); @@ -2252,7 +2252,7 @@ async fn request_network_event_stream() { assert_eq!(http_data, recv_data[20..]); // database http network event for crusher - let send_http_time = Utc::now().timestamp_nanos(); + let send_http_time = Utc::now().timestamp_nanos_opt().unwrap(); let http_data = insert_http_raw_event(&http_store, SOURCE_CRUSHER_THREE, send_http_time); send_stream_request( @@ -2280,7 +2280,7 @@ async fn request_network_event_stream() { assert_eq!(http_data, recv_data); // direct http network event for crusher - let send_http_time = Utc::now().timestamp_nanos(); + let send_http_time = Utc::now().timestamp_nanos_opt().unwrap(); let key = NetworkKey::new(SOURCE_CRUSHER_THREE, "http"); let http_data = gen_http_raw_event(); send_direct_stream( @@ -2322,7 +2322,7 @@ async fn request_network_event_stream() { .unwrap(); assert_eq!(smtp_start_msg, NETWORK_STREAM_SMTP); - let send_smtp_time = Utc::now().timestamp_nanos(); + let send_smtp_time = Utc::now().timestamp_nanos_opt().unwrap(); let key = NetworkKey::new(SOURCE_HOG_ONE, "smtp"); let smtp_data = gen_smtp_raw_event(); @@ -2341,7 +2341,7 @@ async fn request_network_event_stream() { .unwrap(); assert_eq!(smtp_data, recv_data[20..]); - let send_smtp_time = Utc::now().timestamp_nanos(); + let send_smtp_time = Utc::now().timestamp_nanos_opt().unwrap(); let key = NetworkKey::new(SOURCE_HOG_TWO, "smtp"); let smtp_data = gen_smtp_raw_event(); @@ -2361,7 +2361,7 @@ async fn request_network_event_stream() { assert_eq!(smtp_data, recv_data[20..]); // database smtp network event for crusher - let send_smtp_time = Utc::now().timestamp_nanos(); + let send_smtp_time = Utc::now().timestamp_nanos_opt().unwrap(); let smtp_data = insert_smtp_raw_event(&smtp_store, SOURCE_CRUSHER_THREE, send_smtp_time); send_stream_request( @@ -2389,7 +2389,7 @@ async fn request_network_event_stream() { assert_eq!(smtp_data, recv_data); // direct smtp network event for crusher - let send_smtp_time = Utc::now().timestamp_nanos(); + let send_smtp_time = Utc::now().timestamp_nanos_opt().unwrap(); let key = NetworkKey::new(SOURCE_CRUSHER_THREE, "smtp"); let smtp_data = gen_smtp_raw_event(); send_direct_stream( @@ -2431,7 +2431,7 @@ async fn request_network_event_stream() { .unwrap(); assert_eq!(ntlm_start_msg, NETWORK_STREAM_NTLM); - let send_ntlm_time = Utc::now().timestamp_nanos(); + let send_ntlm_time = Utc::now().timestamp_nanos_opt().unwrap(); let key = NetworkKey::new(SOURCE_HOG_ONE, "ntlm"); let ntlm_data = gen_ntlm_raw_event(); @@ -2450,7 +2450,7 @@ async fn request_network_event_stream() { .unwrap(); assert_eq!(ntlm_data, recv_data[20..]); - let send_ntlm_time = Utc::now().timestamp_nanos(); + let send_ntlm_time = Utc::now().timestamp_nanos_opt().unwrap(); let key = NetworkKey::new(SOURCE_HOG_TWO, "ntlm"); let ntlm_data = gen_ntlm_raw_event(); @@ -2470,7 +2470,7 @@ async fn request_network_event_stream() { assert_eq!(ntlm_data, recv_data[20..]); // database ntlm network event for crusher - let send_ntlm_time = Utc::now().timestamp_nanos(); + let send_ntlm_time = Utc::now().timestamp_nanos_opt().unwrap(); let ntlm_data = insert_ntlm_raw_event(&ntlm_store, SOURCE_CRUSHER_THREE, send_ntlm_time); send_stream_request( @@ -2498,7 +2498,7 @@ async fn request_network_event_stream() { assert_eq!(ntlm_data, recv_data); //direct ntlm network event for crusher - let send_ntlm_time = Utc::now().timestamp_nanos(); + let send_ntlm_time = Utc::now().timestamp_nanos_opt().unwrap(); let key = NetworkKey::new(SOURCE_CRUSHER_THREE, "ntlm"); let ntlm_data = gen_ntlm_raw_event(); send_direct_stream( @@ -2539,7 +2539,7 @@ async fn request_network_event_stream() { .unwrap(); assert_eq!(kerberos_start_msg, NETWORK_STREAM_KERBEROS); - let send_kerberos_time = Utc::now().timestamp_nanos(); + let send_kerberos_time = Utc::now().timestamp_nanos_opt().unwrap(); let key = NetworkKey::new(SOURCE_HOG_ONE, "kerberos"); let kerberos_data = gen_kerberos_raw_event(); @@ -2558,7 +2558,7 @@ async fn request_network_event_stream() { .unwrap(); assert_eq!(kerberos_data, recv_data[20..]); - let send_kerberos_time = Utc::now().timestamp_nanos(); + let send_kerberos_time = Utc::now().timestamp_nanos_opt().unwrap(); let key = NetworkKey::new(SOURCE_HOG_TWO, "kerberos"); let kerberos_data = gen_kerberos_raw_event(); @@ -2578,7 +2578,7 @@ async fn request_network_event_stream() { assert_eq!(kerberos_data, recv_data[20..]); // database kerberos network event for crusher - let send_kerberos_time = Utc::now().timestamp_nanos(); + let send_kerberos_time = Utc::now().timestamp_nanos_opt().unwrap(); let kerberos_data = insert_kerberos_raw_event(&kerberos_store, SOURCE_CRUSHER_THREE, send_kerberos_time); @@ -2607,7 +2607,7 @@ async fn request_network_event_stream() { assert_eq!(kerberos_data, recv_data); //direct kerberos network event for crusher - let send_kerberos_time = Utc::now().timestamp_nanos(); + let send_kerberos_time = Utc::now().timestamp_nanos_opt().unwrap(); let key = NetworkKey::new(SOURCE_CRUSHER_THREE, "kerberos"); let kerberos_data = gen_kerberos_raw_event(); send_direct_stream( @@ -2648,7 +2648,7 @@ async fn request_network_event_stream() { .unwrap(); assert_eq!(ssh_start_msg, NETWORK_STREAM_SSH); - let send_ssh_time = Utc::now().timestamp_nanos(); + let send_ssh_time = Utc::now().timestamp_nanos_opt().unwrap(); let key = NetworkKey::new(SOURCE_HOG_ONE, "ssh"); let ssh_data = gen_ssh_raw_event(); @@ -2667,7 +2667,7 @@ async fn request_network_event_stream() { .unwrap(); assert_eq!(ssh_data, recv_data[20..]); - let send_ssh_time = Utc::now().timestamp_nanos(); + let send_ssh_time = Utc::now().timestamp_nanos_opt().unwrap(); let key = NetworkKey::new(SOURCE_HOG_TWO, "ssh"); let ssh_data = gen_ssh_raw_event(); @@ -2687,7 +2687,7 @@ async fn request_network_event_stream() { assert_eq!(ssh_data, recv_data[20..]); // database ssh network event for crusher - let send_ssh_time = Utc::now().timestamp_nanos(); + let send_ssh_time = Utc::now().timestamp_nanos_opt().unwrap(); let ssh_data = insert_ssh_raw_event(&ssh_store, SOURCE_CRUSHER_THREE, send_ssh_time); send_stream_request( @@ -2715,7 +2715,7 @@ async fn request_network_event_stream() { assert_eq!(ssh_data, recv_data); //direct ssh network event for crusher - let send_ssh_time = Utc::now().timestamp_nanos(); + let send_ssh_time = Utc::now().timestamp_nanos_opt().unwrap(); let key = NetworkKey::new(SOURCE_CRUSHER_THREE, "ssh"); let ssh_data = gen_ssh_raw_event(); send_direct_stream( @@ -2757,7 +2757,7 @@ async fn request_network_event_stream() { .unwrap(); assert_eq!(dce_rpc_start_msg, NETWORK_STREAM_DCE_RPC); - let send_dce_rpc_time = Utc::now().timestamp_nanos(); + let send_dce_rpc_time = Utc::now().timestamp_nanos_opt().unwrap(); let key = NetworkKey::new(SOURCE_HOG_ONE, "dce rpc"); let dce_rpc_data = gen_dce_rpc_raw_event(); @@ -2776,7 +2776,7 @@ async fn request_network_event_stream() { .unwrap(); assert_eq!(dce_rpc_data, recv_data[20..]); - let send_dce_rpc_time = Utc::now().timestamp_nanos(); + let send_dce_rpc_time = Utc::now().timestamp_nanos_opt().unwrap(); let key = NetworkKey::new(SOURCE_HOG_TWO, "dce rpc"); let dce_rpc_data = gen_dce_rpc_raw_event(); @@ -2796,7 +2796,7 @@ async fn request_network_event_stream() { assert_eq!(dce_rpc_data, recv_data[20..]); // database dce_rpc network event for crusher - let send_dce_rpc_time = Utc::now().timestamp_nanos(); + let send_dce_rpc_time = Utc::now().timestamp_nanos_opt().unwrap(); let dce_rpc_data = insert_dce_rpc_raw_event(&dce_rpc_store, SOURCE_CRUSHER_THREE, send_dce_rpc_time); @@ -2825,7 +2825,7 @@ async fn request_network_event_stream() { assert_eq!(dce_rpc_data, recv_data); //direct dce_rpc network event for crusher - let send_dce_rpc_time = Utc::now().timestamp_nanos(); + let send_dce_rpc_time = Utc::now().timestamp_nanos_opt().unwrap(); let key = NetworkKey::new(SOURCE_CRUSHER_THREE, "dce rpc"); let dce_rpc_data = gen_dce_rpc_raw_event(); send_direct_stream( @@ -2866,7 +2866,7 @@ async fn request_network_event_stream() { .unwrap(); assert_eq!(ftp_start_msg, NETWORK_STREAM_FTP); - let send_ftp_time = Utc::now().timestamp_nanos(); + let send_ftp_time = Utc::now().timestamp_nanos_opt().unwrap(); let key = NetworkKey::new(SOURCE_HOG_ONE, "ftp"); let ftp_data = gen_ftp_raw_event(); @@ -2885,7 +2885,7 @@ async fn request_network_event_stream() { .unwrap(); assert_eq!(ftp_data, recv_data[20..]); - let send_ftp_time = Utc::now().timestamp_nanos(); + let send_ftp_time = Utc::now().timestamp_nanos_opt().unwrap(); let key = NetworkKey::new(SOURCE_HOG_TWO, "ftp"); let ftp_data = gen_ftp_raw_event(); @@ -2905,7 +2905,7 @@ async fn request_network_event_stream() { assert_eq!(ftp_data, recv_data[20..]); // database ftp network event for crusher - let send_ftp_time = Utc::now().timestamp_nanos(); + let send_ftp_time = Utc::now().timestamp_nanos_opt().unwrap(); let ftp_data = insert_ftp_raw_event(&ftp_store, SOURCE_CRUSHER_THREE, send_ftp_time); send_stream_request( @@ -2933,7 +2933,7 @@ async fn request_network_event_stream() { assert_eq!(ftp_data, recv_data); //direct ftp network event for crusher - let send_ftp_time = Utc::now().timestamp_nanos(); + let send_ftp_time = Utc::now().timestamp_nanos_opt().unwrap(); let key = NetworkKey::new(SOURCE_CRUSHER_THREE, "ftp"); let ftp_data = gen_ftp_raw_event(); send_direct_stream( @@ -2975,7 +2975,7 @@ async fn request_network_event_stream() { .unwrap(); assert_eq!(mqtt_start_msg, NETWORK_STREAM_MQTT); - let send_mqtt_time = Utc::now().timestamp_nanos(); + let send_mqtt_time = Utc::now().timestamp_nanos_opt().unwrap(); let key = NetworkKey::new(SOURCE_HOG_ONE, "mqtt"); let mqtt_data = gen_mqtt_raw_event(); @@ -2994,7 +2994,7 @@ async fn request_network_event_stream() { .unwrap(); assert_eq!(mqtt_data, recv_data[20..]); - let send_mqtt_time = Utc::now().timestamp_nanos(); + let send_mqtt_time = Utc::now().timestamp_nanos_opt().unwrap(); let key = NetworkKey::new(SOURCE_HOG_TWO, "mqtt"); let mqtt_data = gen_mqtt_raw_event(); @@ -3014,7 +3014,7 @@ async fn request_network_event_stream() { assert_eq!(mqtt_data, recv_data[20..]); // database mqtt network event for crusher - let send_mqtt_time = Utc::now().timestamp_nanos(); + let send_mqtt_time = Utc::now().timestamp_nanos_opt().unwrap(); let mqtt_data = insert_mqtt_raw_event(&mqtt_store, SOURCE_CRUSHER_THREE, send_mqtt_time); send_stream_request( @@ -3042,7 +3042,7 @@ async fn request_network_event_stream() { assert_eq!(mqtt_data, recv_data); //direct mqtt network event for crusher - let send_mqtt_time = Utc::now().timestamp_nanos(); + let send_mqtt_time = Utc::now().timestamp_nanos_opt().unwrap(); let key = NetworkKey::new(SOURCE_CRUSHER_THREE, "mqtt"); let mqtt_data = gen_mqtt_raw_event(); send_direct_stream( @@ -3084,7 +3084,7 @@ async fn request_network_event_stream() { .unwrap(); assert_eq!(ldap_start_msg, NETWORK_STREAM_LDAP); - let send_ldap_time = Utc::now().timestamp_nanos(); + let send_ldap_time = Utc::now().timestamp_nanos_opt().unwrap(); let key = NetworkKey::new(SOURCE_HOG_ONE, "ldap"); let ldap_data = gen_ldap_raw_event(); @@ -3103,7 +3103,7 @@ async fn request_network_event_stream() { .unwrap(); assert_eq!(ldap_data, recv_data[20..]); - let send_ldap_time = Utc::now().timestamp_nanos(); + let send_ldap_time = Utc::now().timestamp_nanos_opt().unwrap(); let key = NetworkKey::new(SOURCE_HOG_TWO, "ldap"); let ldap_data = gen_ldap_raw_event(); @@ -3123,7 +3123,7 @@ async fn request_network_event_stream() { assert_eq!(ldap_data, recv_data[20..]); // database ldap network event for crusher - let send_ldap_time = Utc::now().timestamp_nanos(); + let send_ldap_time = Utc::now().timestamp_nanos_opt().unwrap(); let ldap_data = insert_ldap_raw_event(&ldap_store, SOURCE_CRUSHER_THREE, send_ldap_time); send_stream_request( @@ -3151,7 +3151,7 @@ async fn request_network_event_stream() { assert_eq!(ldap_data, recv_data); //direct ldap network event for crusher - let send_ldap_time = Utc::now().timestamp_nanos(); + let send_ldap_time = Utc::now().timestamp_nanos_opt().unwrap(); let key = NetworkKey::new(SOURCE_CRUSHER_THREE, "ldap"); let ldap_data = gen_ldap_raw_event(); send_direct_stream( @@ -3192,7 +3192,7 @@ async fn request_network_event_stream() { .unwrap(); assert_eq!(tls_start_msg, NETWORK_STREAM_TLS); - let send_tls_time = Utc::now().timestamp_nanos(); + let send_tls_time = Utc::now().timestamp_nanos_opt().unwrap(); let key = NetworkKey::new(SOURCE_HOG_ONE, "tls"); let tls_data = gen_tls_raw_event(); @@ -3211,7 +3211,7 @@ async fn request_network_event_stream() { .unwrap(); assert_eq!(tls_data, recv_data[20..]); - let send_tls_time = Utc::now().timestamp_nanos(); + let send_tls_time = Utc::now().timestamp_nanos_opt().unwrap(); let key = NetworkKey::new(SOURCE_HOG_TWO, "tls"); let tls_data = gen_tls_raw_event(); @@ -3231,7 +3231,7 @@ async fn request_network_event_stream() { assert_eq!(tls_data, recv_data[20..]); // database tls network event for crusher - let send_tls_time = Utc::now().timestamp_nanos(); + let send_tls_time = Utc::now().timestamp_nanos_opt().unwrap(); let tls_data = insert_tls_raw_event(&tls_store, SOURCE_CRUSHER_THREE, send_tls_time); send_stream_request( @@ -3259,7 +3259,7 @@ async fn request_network_event_stream() { assert_eq!(tls_data, recv_data); //direct tls network event for crusher - let send_tls_time = Utc::now().timestamp_nanos(); + let send_tls_time = Utc::now().timestamp_nanos_opt().unwrap(); let key = NetworkKey::new(SOURCE_CRUSHER_THREE, "tls"); let tls_data = gen_tls_raw_event(); send_direct_stream( @@ -3300,7 +3300,7 @@ async fn request_network_event_stream() { .unwrap(); assert_eq!(smb_start_msg, NETWORK_STREAM_SMB); - let send_smb_time = Utc::now().timestamp_nanos(); + let send_smb_time = Utc::now().timestamp_nanos_opt().unwrap(); let key = NetworkKey::new(SOURCE_HOG_ONE, "smb"); let smb_data = gen_smb_raw_event(); @@ -3319,7 +3319,7 @@ async fn request_network_event_stream() { .unwrap(); assert_eq!(smb_data, recv_data[20..]); - let send_smb_time = Utc::now().timestamp_nanos(); + let send_smb_time = Utc::now().timestamp_nanos_opt().unwrap(); let key = NetworkKey::new(SOURCE_HOG_TWO, "smb"); let smb_data = gen_smb_raw_event(); @@ -3339,7 +3339,7 @@ async fn request_network_event_stream() { assert_eq!(smb_data, recv_data[20..]); // database smb network event for crusher - let send_smb_time = Utc::now().timestamp_nanos(); + let send_smb_time = Utc::now().timestamp_nanos_opt().unwrap(); let smb_data = insert_smb_raw_event(&smb_store, SOURCE_CRUSHER_THREE, send_smb_time); send_stream_request( @@ -3367,7 +3367,7 @@ async fn request_network_event_stream() { assert_eq!(smb_data, recv_data); //direct smb network event for crusher - let send_smb_time = Utc::now().timestamp_nanos(); + let send_smb_time = Utc::now().timestamp_nanos_opt().unwrap(); let key = NetworkKey::new(SOURCE_CRUSHER_THREE, "smb"); let smb_data = gen_smb_raw_event(); send_direct_stream( @@ -3408,7 +3408,7 @@ async fn request_network_event_stream() { .unwrap(); assert_eq!(nfs_start_msg, NETWORK_STREAM_NFS); - let send_nfs_time = Utc::now().timestamp_nanos(); + let send_nfs_time = Utc::now().timestamp_nanos_opt().unwrap(); let key = NetworkKey::new(SOURCE_HOG_ONE, "nfs"); let nfs_data = gen_nfs_raw_event(); @@ -3427,7 +3427,7 @@ async fn request_network_event_stream() { .unwrap(); assert_eq!(nfs_data, recv_data[20..]); - let send_nfs_time = Utc::now().timestamp_nanos(); + let send_nfs_time = Utc::now().timestamp_nanos_opt().unwrap(); let key = NetworkKey::new(SOURCE_HOG_TWO, "nfs"); let nfs_data = gen_nfs_raw_event(); @@ -3447,7 +3447,7 @@ async fn request_network_event_stream() { assert_eq!(nfs_data, recv_data[20..]); // database nfs network event for crusher - let send_nfs_time = Utc::now().timestamp_nanos(); + let send_nfs_time = Utc::now().timestamp_nanos_opt().unwrap(); let nfs_data = insert_nfs_raw_event(&nfs_store, SOURCE_CRUSHER_THREE, send_nfs_time); send_stream_request( @@ -3475,7 +3475,7 @@ async fn request_network_event_stream() { assert_eq!(nfs_data, recv_data); //direct nfs network event for crusher - let send_nfs_time = Utc::now().timestamp_nanos(); + let send_nfs_time = Utc::now().timestamp_nanos_opt().unwrap(); let key = NetworkKey::new(SOURCE_CRUSHER_THREE, "nfs"); let nfs_data = gen_nfs_raw_event(); send_direct_stream( diff --git a/src/storage.rs b/src/storage.rs index ee45de9b..192e3c20 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -542,7 +542,7 @@ impl<'db, T> RawEventStore<'db, T> { .filter_map(|timestamp| { let key = key_builder .clone() - .end_key(timestamp.timestamp_nanos()) + .end_key(timestamp.timestamp_nanos_opt().expect("valid timestamp")) .build(); self.db .get_cf(&self.cf, key.key()) @@ -602,8 +602,14 @@ impl<'db> SourceStore<'db> { /// /// If the source already exists, its last active time is updated. pub fn insert(&self, name: &str, last_active: DateTime) -> Result<()> { - self.db - .put_cf(self.cf, name, last_active.timestamp_nanos().to_be_bytes())?; + self.db.put_cf( + self.cf, + name, + last_active + .timestamp_nanos_opt() + .context("to_timestamp_nanos")? + .to_be_bytes(), + )?; Ok(()) } @@ -675,7 +681,7 @@ impl StorageKeyBuilder { pub fn lower_closed_bound_end_key(mut self, time: Option>) -> Self { self.pre_key.reserve(TIMESTAMP_SIZE); let end_key = if let Some(time) = time { - time.timestamp_nanos() + time.timestamp_nanos_opt().expect("valid timestamp") } else { 0 }; @@ -686,7 +692,7 @@ impl StorageKeyBuilder { pub fn upper_closed_bound_end_key(mut self, time: Option>) -> Self { self.pre_key.reserve(TIMESTAMP_SIZE); let end_key = if let Some(time) = time { - time.timestamp_nanos() + time.timestamp_nanos_opt().expect("valid timestamp") } else { i64::MAX }; @@ -697,7 +703,7 @@ impl StorageKeyBuilder { pub fn upper_open_bound_end_key(mut self, time: Option>) -> Self { self.pre_key.reserve(TIMESTAMP_SIZE); if let Some(time) = time { - let ns = time.timestamp_nanos(); + let ns = time.timestamp_nanos_opt().expect("valid timestamp"); if let Some(ns) = ns.checked_sub(1) { if ns >= 0 { self.pre_key.extend_from_slice(&ns.to_be_bytes()); @@ -861,12 +867,13 @@ pub async fn retain_periodically( NaiveDateTime::from_timestamp_opt(61, 0).expect("valid time"), Utc, ) - .timestamp_nanos() + .timestamp_nanos_opt() + .context("to_timestamp_nanos")? .to_be_bytes(); loop { select! { _ = itv.tick() => { - let standard_duration = Utc::now().timestamp_nanos() - retention_duration; + let standard_duration = Utc::now().timestamp_nanos_opt().context("to_timestamp_nanos")? - retention_duration; let standard_duration_vec = standard_duration.to_be_bytes().to_vec(); let sources = db.sources_store()?.names(); let all_store = db.retain_period_store()?; diff --git a/src/storage/migration.rs b/src/storage/migration.rs index e874fafe..5dca3e3b 100644 --- a/src/storage/migration.rs +++ b/src/storage/migration.rs @@ -271,7 +271,7 @@ mod tests { let store = db.http_store().unwrap(); // insert old http raw data - let timestamp = Utc::now().timestamp_nanos(); + let timestamp = Utc::now().timestamp_nanos_opt().unwrap(); let source = "src1"; let mut key = Vec::with_capacity(source.len() + 1 + std::mem::size_of::()); key.extend_from_slice(source.as_bytes());