From 20442a2300d3827a6f0e7eb7ead370b34a36e893 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Tue, 18 Apr 2023 11:40:46 +0530 Subject: [PATCH 1/8] Add in-memory writer for ingestion flow --- Cargo.lock | 251 +++--- server/Cargo.toml | 12 +- server/src/alerts/mod.rs | 4 +- server/src/event.rs | 18 +- server/src/event/writer.rs | 269 +++--- server/src/event/writer/file_writer.rs | 79 ++ server/src/event/writer/mem_writer.rs | 57 ++ server/src/event/writer/mutable.rs | 817 ++++++++++++++++++ server/src/main.rs | 7 +- server/src/metadata.rs | 19 +- server/src/option.rs | 7 +- server/src/query.rs | 2 - server/src/query/table_provider.rs | 227 ----- server/src/storage.rs | 157 +--- server/src/storage/file_link.rs | 104 --- server/src/storage/object_storage.rs | 212 +---- server/src/storage/staging.rs | 282 ++++++ server/src/utils.rs | 1 - server/src/utils/arrow.rs | 6 + server/src/utils/{ => arrow}/batch_adapter.rs | 0 server/src/utils/arrow/merged_reader.rs | 85 ++ 21 files changed, 1598 insertions(+), 1018 deletions(-) create mode 100644 server/src/event/writer/file_writer.rs create mode 100644 server/src/event/writer/mem_writer.rs create mode 100644 server/src/event/writer/mutable.rs delete mode 100644 server/src/query/table_provider.rs delete mode 100644 server/src/storage/file_link.rs create mode 100644 server/src/storage/staging.rs rename server/src/utils/{ => arrow}/batch_adapter.rs (100%) create mode 100644 server/src/utils/arrow/merged_reader.rs diff --git a/Cargo.lock b/Cargo.lock index a81b7851d..35a8565a8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -355,68 +355,51 @@ checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6" [[package]] name = "arrow" -version = "34.0.0" +version = "36.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f410d3907b6b3647b9e7bca4551274b2e3d716aa940afb67b7287257401da921" +checksum = "990dfa1a9328504aa135820da1c95066537b69ad94c04881b785f64328e0fa6b" dependencies = [ "ahash 0.8.3", "arrow-arith", - "arrow-array 34.0.0", - "arrow-buffer 34.0.0", + "arrow-array", + "arrow-buffer", "arrow-cast", "arrow-csv", - "arrow-data 34.0.0", + "arrow-data", "arrow-ipc", "arrow-json", "arrow-ord", "arrow-row", - "arrow-schema 34.0.0", + "arrow-schema", "arrow-select", "arrow-string", - "comfy-table", ] [[package]] name = "arrow-arith" -version = "34.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f87391cf46473c9bc53dab68cb8872c3a81d4dfd1703f1c8aa397dba9880a043" -dependencies = [ - "arrow-array 34.0.0", - "arrow-buffer 34.0.0", - "arrow-data 34.0.0", - "arrow-schema 34.0.0", - "chrono", - "half", - "num", -] - -[[package]] -name = "arrow-array" -version = "34.0.0" +version = "36.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d35d5475e65c57cffba06d0022e3006b677515f99b54af33a7cd54f6cdd4a5b5" +checksum = "f2b2e52de0ab54173f9b08232b7184c26af82ee7ab4ac77c83396633c90199fa" dependencies = [ - "ahash 0.8.3", - "arrow-buffer 34.0.0", - "arrow-data 34.0.0", - "arrow-schema 34.0.0", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", "chrono", "half", - "hashbrown 0.13.2", "num", ] [[package]] name = "arrow-array" -version = "35.0.0" +version = "36.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43489bbff475545b78b0e20bde1d22abd6c99e54499839f9e815a2fa5134a51b" +checksum = "e10849b60c17dbabb334be1f4ef7550701aa58082b71335ce1ed586601b2f423" dependencies = [ "ahash 0.8.3", - "arrow-buffer 35.0.0", - "arrow-data 35.0.0", - "arrow-schema 35.0.0", + "arrow-buffer", + "arrow-data", + "arrow-schema", "chrono", "chrono-tz", "half", @@ -426,19 +409,9 @@ dependencies = [ [[package]] name = "arrow-buffer" -version = "34.0.0" +version = "36.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68b4ec72eda7c0207727df96cf200f539749d736b21f3e782ece113e18c1a0a7" -dependencies = [ - "half", - "num", -] - -[[package]] -name = "arrow-buffer" -version = "35.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3759e4a52c593281184787af5435671dc8b1e78333e5a30242b2e2d6e3c9d1f" +checksum = "b0746ae991b186be39933147117f8339eb1c4bbbea1c8ad37e7bf5851a1a06ba" dependencies = [ "half", "num", @@ -446,31 +419,32 @@ dependencies = [ [[package]] name = "arrow-cast" -version = "34.0.0" +version = "36.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a7285272c9897321dfdba59de29f5b05aeafd3cdedf104a941256d155f6d304" +checksum = "b88897802515d7b193e38b27ddd9d9e43923d410a9e46307582d756959ee9595" dependencies = [ - "arrow-array 34.0.0", - "arrow-buffer 34.0.0", - "arrow-data 34.0.0", - "arrow-schema 34.0.0", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", "arrow-select", "chrono", + "comfy-table", "lexical-core", "num", ] [[package]] name = "arrow-csv" -version = "34.0.0" +version = "36.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "981ee4e7f6a120da04e00d0b39182e1eeacccb59c8da74511de753c56b7fddf7" +checksum = "1c8220d9741fc37961262710ceebd8451a5b393de57c464f0267ffdda1775c0a" dependencies = [ - "arrow-array 34.0.0", - "arrow-buffer 34.0.0", + "arrow-array", + "arrow-buffer", "arrow-cast", - "arrow-data 34.0.0", - "arrow-schema 34.0.0", + "arrow-data", + "arrow-schema", "chrono", "csv", "csv-core", @@ -481,53 +455,41 @@ dependencies = [ [[package]] name = "arrow-data" -version = "34.0.0" +version = "36.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "27cc673ee6989ea6e4b4e8c7d461f7e06026a096c8f0b1a7288885ff71ae1e56" +checksum = "533f937efa1aaad9dc86f6a0e382c2fa736a4943e2090c946138079bdf060cef" dependencies = [ - "arrow-buffer 34.0.0", - "arrow-schema 34.0.0", - "half", - "num", -] - -[[package]] -name = "arrow-data" -version = "35.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19c7787c6cdbf9539b1ffb860bfc18c5848926ec3d62cbd52dc3b1ea35c874fd" -dependencies = [ - "arrow-buffer 35.0.0", - "arrow-schema 35.0.0", + "arrow-buffer", + "arrow-schema", "half", "num", ] [[package]] name = "arrow-ipc" -version = "34.0.0" +version = "36.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e37b8b69d9e59116b6b538e8514e0ec63a30f08b617ce800d31cb44e3ef64c1a" +checksum = "18b75296ff01833f602552dff26a423fc213db8e5049b540ca4a00b1c957e41c" dependencies = [ - "arrow-array 34.0.0", - "arrow-buffer 34.0.0", + "arrow-array", + "arrow-buffer", "arrow-cast", - "arrow-data 34.0.0", - "arrow-schema 34.0.0", + "arrow-data", + "arrow-schema", "flatbuffers", ] [[package]] name = "arrow-json" -version = "34.0.0" +version = "36.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "80c3fa0bed7cfebf6d18e46b733f9cb8a1cb43ce8e6539055ca3e1e48a426266" +checksum = "e501d3de4d612c90677594896ca6c0fa075665a7ff980dc4189bb531c17e19f6" dependencies = [ - "arrow-array 34.0.0", - "arrow-buffer 34.0.0", + "arrow-array", + "arrow-buffer", "arrow-cast", - "arrow-data 34.0.0", - "arrow-schema 34.0.0", + "arrow-data", + "arrow-schema", "chrono", "half", "indexmap", @@ -538,71 +500,66 @@ dependencies = [ [[package]] name = "arrow-ord" -version = "34.0.0" +version = "36.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d247dce7bed6a8d6a3c6debfa707a3a2f694383f0c692a39d736a593eae5ef94" +checksum = "33d2671eb3793f9410230ac3efb0e6d36307be8a2dac5fad58ac9abde8e9f01e" dependencies = [ - "arrow-array 34.0.0", - "arrow-buffer 34.0.0", - "arrow-data 34.0.0", - "arrow-schema 34.0.0", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", "arrow-select", + "half", "num", ] [[package]] name = "arrow-row" -version = "34.0.0" +version = "36.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d609c0181f963cea5c70fddf9a388595b5be441f3aa1d1cdbf728ca834bbd3a" +checksum = "fc11fa039338cebbf4e29cf709c8ac1d6a65c7540063d4a25f991ab255ca85c8" dependencies = [ "ahash 0.8.3", - "arrow-array 34.0.0", - "arrow-buffer 34.0.0", - "arrow-data 34.0.0", - "arrow-schema 34.0.0", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", "half", "hashbrown 0.13.2", ] [[package]] name = "arrow-schema" -version = "34.0.0" +version = "36.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64951898473bfb8e22293e83a44f02874d2257514d49cd95f9aa4afcff183fbc" +checksum = "d04f17f7b86ded0b5baf98fe6123391c4343e031acc3ccc5fa604cc180bff220" dependencies = [ "serde", ] -[[package]] -name = "arrow-schema" -version = "35.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf6b26f6a6f8410e3b9531cbd1886399b99842701da77d4b4cf2013f7708f20f" - [[package]] name = "arrow-select" -version = "34.0.0" +version = "36.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a513d89c2e1ac22b28380900036cf1f3992c6443efc5e079de631dcf83c6888" +checksum = "163e35de698098ff5f5f672ada9dc1f82533f10407c7a11e2cd09f3bcf31d18a" dependencies = [ - "arrow-array 34.0.0", - "arrow-buffer 34.0.0", - "arrow-data 34.0.0", - "arrow-schema 34.0.0", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", "num", ] [[package]] name = "arrow-string" -version = "34.0.0" +version = "36.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5288979b2705dae1114c864d73150629add9153b9b8f1d7ee3963db94c372ba5" +checksum = "bfdfbed1b10209f0dc68e6aa4c43dc76079af65880965c7c3b73f641f23d4aba" dependencies = [ - "arrow-array 34.0.0", - "arrow-buffer 34.0.0", - "arrow-data 34.0.0", - "arrow-schema 34.0.0", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", "arrow-select", "regex", "regex-syntax", @@ -1545,9 +1502,9 @@ dependencies = [ [[package]] name = "datafusion" -version = "21.0.0" +version = "22.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c187589ce9ddf0bbc90e2e3dc0a89b90cc3d4bfdeefc7cf2aaa8ac15f7725811" +checksum = "9bdb93fee4f30368f1f71bfd5cd28882ec9fab0183db7924827b76129d33227c" dependencies = [ "ahash 0.8.3", "arrow", @@ -1593,12 +1550,12 @@ dependencies = [ [[package]] name = "datafusion-common" -version = "21.0.0" +version = "22.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ecbbfb88a799beca6a361c1282795f0f185b96201dab496d733a49bdf4684f7f" +checksum = "e82401ce129e601d406012b6d718f8978ba84c386e1c342fa155877120d68824" dependencies = [ "arrow", - "arrow-array 35.0.0", + "arrow-array", "chrono", "num_cpus", "object_store", @@ -1608,9 +1565,9 @@ dependencies = [ [[package]] name = "datafusion-execution" -version = "21.0.0" +version = "22.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73a38825b879024a87937b3b5ea8e43287ab3432db8786a2839dcbf141b6d938" +checksum = "b08b2078aed21a27239cd93f3015e492a58b0d50ebeeaf8d2236cf108ef583ce" dependencies = [ "dashmap", "datafusion-common", @@ -1626,9 +1583,9 @@ dependencies = [ [[package]] name = "datafusion-expr" -version = "21.0.0" +version = "22.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05454741d8496faf9f433a666e97ce693807e8374e0fd513eda5a8218ba8456d" +checksum = "16b5b977ce9695fb4c67614266ec57f384fc11e9a9f9b3e6d0e62b9c5a9f2c1f" dependencies = [ "ahash 0.8.3", "arrow", @@ -1638,9 +1595,9 @@ dependencies = [ [[package]] name = "datafusion-optimizer" -version = "21.0.0" +version = "22.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d5d551c428b8557790cceecb59615f624b24dddf60b4d843c5994f8120b48c7f" +checksum = "a0b2bb9e73ed778d1bc5af63a270f0154bf6eab5099c77668a6362296888e46b" dependencies = [ "arrow", "async-trait", @@ -1656,14 +1613,15 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" -version = "21.0.0" +version = "22.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08aa1047edf92d59f97b18dfbb1cade176a970b1a98b0a27f909409ceb05906e" +checksum = "80cd8ea5ab0a07b1b2a3e17d5909f1b1035bd129ffeeb5c66842a32e682f8f79" dependencies = [ "ahash 0.8.3", "arrow", - "arrow-buffer 34.0.0", - "arrow-schema 34.0.0", + "arrow-array", + "arrow-buffer", + "arrow-schema", "blake2", "blake3", "chrono", @@ -1687,9 +1645,9 @@ dependencies = [ [[package]] name = "datafusion-row" -version = "21.0.0" +version = "22.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2fc83ac8761c251617c1b7e1122adf79ebbf215ecabc4e2346cda1c4307d5152" +checksum = "2a95d6badab19fd6e9195fdc5209ac0a7e5ce9bcdedc67767b9ffc1b4e645760" dependencies = [ "arrow", "datafusion-common", @@ -1699,11 +1657,12 @@ dependencies = [ [[package]] name = "datafusion-sql" -version = "21.0.0" +version = "22.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46d6cbfa8c6ac06202badbac6e4675c33b91d299f711a4fee23327b83906e2ee" +checksum = "37a78f8fc67123c4357e63bc0c87622a2a663d26f074958d749a633d0ecde90f" dependencies = [ - "arrow-schema 34.0.0", + "arrow", + "arrow-schema", "datafusion-common", "datafusion-expr", "log", @@ -2969,17 +2928,17 @@ dependencies = [ [[package]] name = "parquet" -version = "34.0.0" +version = "36.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ac135ecf63ebb5f53dda0921b0b76d6048b3ef631a5f4760b9e8f863ff00cfa" +checksum = "321a15f8332645759f29875b07f8233d16ed8ec1b3582223de81625a9f8506b7" dependencies = [ "ahash 0.8.3", - "arrow-array 34.0.0", - "arrow-buffer 34.0.0", + "arrow-array", + "arrow-buffer", "arrow-cast", - "arrow-data 34.0.0", + "arrow-data", "arrow-ipc", - "arrow-schema 34.0.0", + "arrow-schema", "arrow-select", "base64 0.21.0", "brotli", @@ -3019,10 +2978,10 @@ dependencies = [ "actix-web-prometheus", "actix-web-static-files", "anyhow", - "arrow-array 34.0.0", + "arrow-array", "arrow-ipc", "arrow-json", - "arrow-schema 34.0.0", + "arrow-schema", "async-trait", "base64 0.21.0", "bytes", diff --git a/server/Cargo.toml b/server/Cargo.toml index faa543e0f..1bded2ebe 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -13,10 +13,10 @@ actix-cors = "0.6" actix-web-prometheus = { version = "0.1" } prometheus = { version = "0.13", features = ["process"] } anyhow = { version = "1.0", features = ["backtrace"] } -arrow-schema = { version = "34.0.0", features = ["serde"] } -arrow-array = { version = "34.0.0" } -arrow-json = "34.0.0" -arrow-ipc = "34.0.0" +arrow-schema = { version = "36.0.0", features = ["serde"] } +arrow-array = { version = "36.0.0" } +arrow-json = "36.0.0" +arrow-ipc = "36.0.0" async-trait = "0.1" base64 = "0.21" bytes = "1.4" @@ -32,7 +32,7 @@ clap = { version = "4.1", default-features = false, features = [ "error-context", ] } crossterm = "0.26" -datafusion = "21.0.0" +datafusion = "22.0.0" object_store = { version = "0.5.6", features = ["aws", "aws_profile"] } derive_more = "0.99" env_logger = "0.10" @@ -70,7 +70,7 @@ xxhash-rust = { version = "0.8", features = ["xxh3"] } xz2 = { version = "*", features=["static"] } bzip2 = { version = "*", features=["static"] } once_cell = "1.17.1" -parquet = "34.0.0" +parquet = "36.0.0" pyroscope = { version = "0.5.3", optional = true } pyroscope_pprofrs = { version = "0.2", optional = true } uptime_lib = "0.2.2" diff --git a/server/src/alerts/mod.rs b/server/src/alerts/mod.rs index e62269c21..c0c1c65a3 100644 --- a/server/src/alerts/mod.rs +++ b/server/src/alerts/mod.rs @@ -64,7 +64,7 @@ pub struct Alert { } impl Alert { - pub fn check_alert(&self, stream_name: String, events: RecordBatch) { + pub fn check_alert(&self, stream_name: &str, events: RecordBatch) { let resolves = self.rule.resolves(events.clone()); for (index, state) in resolves.into_iter().enumerate() { @@ -72,7 +72,7 @@ impl Alert { AlertState::Listening | AlertState::Firing => (), alert_state @ (AlertState::SetToFiring | AlertState::Resolved) => { let context = self.get_context( - stream_name.clone(), + stream_name.to_owned(), alert_state, &self.rule, events.slice(index, 1), diff --git a/server/src/event.rs b/server/src/event.rs index 64494d98b..dcc061c7a 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -48,21 +48,25 @@ pub struct Event { impl Event { pub async fn process(self) -> Result<(), EventError> { let key = get_schema_key(&self.rb.schema().fields); + let num_rows = self.rb.num_rows() as u64; if self.is_first_event(metadata::STREAM_INFO.schema(&self.stream_name)?.as_ref()) { commit_schema(&self.stream_name, self.rb.schema())?; } - self.process_event(&key)?; + Self::process_event(&self.stream_name, &key, self.rb.clone())?; metadata::STREAM_INFO.update_stats( &self.stream_name, self.origin_format, self.origin_size, - self.rb.num_rows() as u64, + num_rows, )?; - if let Err(e) = metadata::STREAM_INFO.check_alerts(&self).await { + if let Err(e) = metadata::STREAM_INFO + .check_alerts(&self.stream_name, self.rb) + .await + { log::error!("Error checking for alerts. {:?}", e); } @@ -90,8 +94,12 @@ impl Event { // event process all events after the 1st event. Concatenates record batches // and puts them in memory store for each event. - fn process_event(&self, schema_key: &str) -> Result<(), EventError> { - STREAM_WRITERS.append_to_local(&self.stream_name, schema_key, &self.rb)?; + fn process_event( + stream_name: &str, + schema_key: &str, + rb: RecordBatch, + ) -> Result<(), EventError> { + STREAM_WRITERS.append_to_local(stream_name, schema_key, rb)?; Ok(()) } } diff --git a/server/src/event/writer.rs b/server/src/event/writer.rs index 05bdbbf0c..2a1f39868 100644 --- a/server/src/event/writer.rs +++ b/server/src/event/writer.rs @@ -17,211 +17,150 @@ * */ +mod file_writer; +mod mem_writer; +mod mutable; + +use std::{ + collections::HashMap, + sync::{Mutex, RwLock}, +}; + +use crate::storage::staging::{self, ReadBuf}; + +use self::{errors::StreamWriterError, file_writer::FileWriter}; use arrow_array::RecordBatch; -use arrow_ipc::writer::StreamWriter; +use chrono::{NaiveDateTime, Utc}; +use derive_more::{Deref, DerefMut}; +use mem_writer::MemWriter; use once_cell::sync::Lazy; -use std::borrow::Borrow; -use std::collections::HashMap; -use std::fmt::{self, Debug, Formatter}; -use std::fs::{File, OpenOptions}; -use std::io::Write; -use std::ops::{Deref, DerefMut}; -use std::sync::{Mutex, RwLock}; - -use crate::storage::StorageDir; -use self::errors::StreamWriterError; +type InMemWriter = MemWriter<8192>; -type ArrowWriter = StreamWriter; -type LocalWriter = Mutex>>; +pub static STREAM_WRITERS: Lazy = Lazy::new(WriterTable::default); -pub static STREAM_WRITERS: Lazy = - Lazy::new(|| InnerStreamWriter(RwLock::new(WriterTable::new()))); +pub enum StreamWriter { + Mem(InMemWriter), + Disk(FileWriter), +} -/* - A wrapper type for global struct to implement methods over -*/ -pub struct InnerStreamWriter(RwLock>); - -impl Deref for InnerStreamWriter { - type Target = RwLock>; - fn deref(&self) -> &Self::Target { - &self.0 +impl Default for StreamWriter { + fn default() -> Self { + StreamWriter::Mem(MemWriter::default()) } } -impl DerefMut for InnerStreamWriter { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 + +impl StreamWriter { + pub fn push( + &mut self, + stream_name: &str, + schema_key: &str, + rb: RecordBatch, + ) -> Result<(), StreamWriterError> { + match self { + StreamWriter::Mem(mem) => { + mem.push(rb); + } + StreamWriter::Disk(disk) => { + disk.push(stream_name, schema_key, &rb)?; + } + } + Ok(()) } } -/* - Manually implmenting for the Type - since it depends on the types which are missing it -*/ -impl Debug for InnerStreamWriter { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - f.write_str("InnerStreamWriter { __private_field: () }") - } + +// Each entry in writer table is initialized with some context +// This is helpful for generating prefix when writer is finalized +pub struct WriterContext { + stream_name: String, + time: NaiveDateTime, } -impl InnerStreamWriter { +#[derive(Deref, DerefMut, Default)] +pub struct WriterTable(RwLock, WriterContext)>>); + +impl WriterTable { // append to a existing stream pub fn append_to_local( &self, - stream: &str, + stream_name: &str, schema_key: &str, - record: &RecordBatch, + record: RecordBatch, ) -> Result<(), StreamWriterError> { - let hashmap_guard = self.read().map_err(|_| StreamWriterError::RwPoisoned)?; + let hashmap_guard = self.read().unwrap(); - match hashmap_guard.get(stream, schema_key) { - Some(localwriter) => { - let mut writer_guard = localwriter + match hashmap_guard.get(stream_name) { + Some((stream_writer, _)) => { + stream_writer .lock() - .map_err(|_| StreamWriterError::MutexPoisoned)?; - - // if it's some writer then we write without dropping any lock - // hashmap cannot be brought mutably at any point until this finishes - if let Some(ref mut writer) = *writer_guard { - writer.write(record).map_err(StreamWriterError::Writer)?; - } else { - // pass on this mutex to set entry so that it can be reused - // we have a guard for underlying entry thus - // hashmap must not be availible as mutable to any other thread - let writer = init_new_stream_writer_file(stream, schema_key, record)?; - writer_guard.replace(writer); // replace the stream writer behind this mutex - } + .unwrap() + .push(stream_name, schema_key, record)?; } - // entry is not present thus we create it None => { - // this requires mutable borrow of the map so we drop this read lock and wait for write lock drop(hashmap_guard); - self.create_entry(stream.to_owned(), schema_key.to_owned(), record)?; + let mut map = self.write().unwrap(); + // check for race condition + // if map contains entry then just + if let Some((writer, _)) = map.get(stream_name) { + writer + .lock() + .unwrap() + .push(stream_name, schema_key, record)?; + } else { + // there is no entry so this can be inserted safely + let context = WriterContext { + stream_name: stream_name.to_owned(), + time: Utc::now().naive_utc(), + }; + let mut writer = StreamWriter::default(); + writer.push(stream_name, schema_key, record)?; + map.insert(stream_name.to_owned(), (Mutex::new(writer), context)); + } } }; Ok(()) } - // create a new entry with new stream_writer - // Only create entry for valid streams - fn create_entry( - &self, - stream: String, - schema_key: String, - record: &RecordBatch, - ) -> Result<(), StreamWriterError> { - let mut hashmap_guard = self.write().map_err(|_| StreamWriterError::RwPoisoned)?; - - let writer = init_new_stream_writer_file(&stream, &schema_key, record)?; - - hashmap_guard.insert(stream, schema_key, Mutex::new(Some(writer))); - - Ok(()) - } - - pub fn delete_stream(&self, stream: &str) { - self.write().unwrap().delete_stream(stream); + pub fn delete_stream(&self, stream_name: &str) { + self.write().unwrap().remove(stream_name); } - pub fn unset_all(&self) -> Result<(), StreamWriterError> { - let table = self.read().map_err(|_| StreamWriterError::RwPoisoned)?; - - for writer in table.iter() { - if let Some(mut streamwriter) = writer - .lock() - .map_err(|_| StreamWriterError::MutexPoisoned)? - .take() - { - let _ = streamwriter.finish(); + pub fn unset_all(&self) { + let mut table = self.write().unwrap(); + let map = std::mem::take(&mut *table); + drop(table); + dbg!(map.len()); + for (writer, context) in map.into_values() { + let writer = writer.into_inner().unwrap(); + match writer { + StreamWriter::Mem(mem) => { + let rb = mem.finalize(); + dbg!(rb.len()); + let mut read_bufs = staging::MEMORY_READ_BUFFERS.write().unwrap(); + + read_bufs + .entry(context.stream_name) + .or_insert(Vec::default()) + .push(ReadBuf { + time: context.time, + buf: rb, + }); + + dbg!(read_bufs.len()); + } + StreamWriter::Disk(disk) => disk.close_all(), } } - - Ok(()) - } -} - -pub struct WriterTable -where - A: Eq + std::hash::Hash, - B: Eq + std::hash::Hash, - T: Write, -{ - table: HashMap>>, -} - -impl WriterTable -where - A: Eq + std::hash::Hash, - B: Eq + std::hash::Hash, - T: Write, -{ - pub fn new() -> Self { - let table = HashMap::new(); - Self { table } - } - - fn get(&self, a: &X, b: &Y) -> Option<&LocalWriter> - where - A: Borrow, - B: Borrow, - X: Eq + std::hash::Hash + ?Sized, - Y: Eq + std::hash::Hash + ?Sized, - { - self.table.get(a)?.get(b) } - - fn insert(&mut self, a: A, b: B, v: LocalWriter) { - let inner = self.table.entry(a).or_default(); - inner.insert(b, v); - } - - pub fn delete_stream(&mut self, stream: &X) - where - A: Borrow, - X: Eq + std::hash::Hash + ?Sized, - { - self.table.remove(stream); - } - - fn iter(&self) -> impl Iterator> { - self.table.values().flat_map(|inner| inner.values()) - } -} - -fn init_new_stream_writer_file( - stream_name: &str, - schema_key: &str, - record: &RecordBatch, -) -> Result, StreamWriterError> { - let dir = StorageDir::new(stream_name); - let path = dir.path_by_current_time(schema_key); - - std::fs::create_dir_all(dir.data_path)?; - - let file = OpenOptions::new().create(true).append(true).open(path)?; - - let mut stream_writer = StreamWriter::try_new(file, &record.schema()) - .expect("File and RecordBatch both are checked"); - - stream_writer - .write(record) - .map_err(StreamWriterError::Writer)?; - - Ok(stream_writer) } pub mod errors { - use arrow_schema::ArrowError; #[derive(Debug, thiserror::Error)] pub enum StreamWriterError { #[error("Arrow writer failed: {0}")] - Writer(#[from] ArrowError), + Writer(#[from] arrow_schema::ArrowError), #[error("Io Error when creating new file: {0}")] Io(#[from] std::io::Error), - #[error("RwLock was poisoned")] - RwPoisoned, - #[error("Mutex was poisoned")] - MutexPoisoned, } } diff --git a/server/src/event/writer/file_writer.rs b/server/src/event/writer/file_writer.rs new file mode 100644 index 000000000..b4645aa2c --- /dev/null +++ b/server/src/event/writer/file_writer.rs @@ -0,0 +1,79 @@ +use arrow_array::RecordBatch; +use arrow_ipc::writer::StreamWriter; +use derive_more::{Deref, DerefMut}; +use std::collections::HashMap; +use std::fs::{File, OpenOptions}; +use std::path::PathBuf; + +use crate::storage::staging::StorageDir; + +use super::errors::StreamWriterError; + +pub struct ArrowWriter { + pub file_path: PathBuf, + pub writer: StreamWriter, +} + +#[derive(Deref, DerefMut, Default)] +pub struct FileWriter(HashMap); + +impl FileWriter { + // append to a existing stream + pub fn push( + &mut self, + stream_name: &str, + schema_key: &str, + record: &RecordBatch, + ) -> Result<(), StreamWriterError> { + match self.get_mut(schema_key) { + Some(writer) => { + writer + .writer + .write(record) + .map_err(StreamWriterError::Writer)?; + } + // entry is not present thus we create it + None => { + // this requires mutable borrow of the map so we drop this read lock and wait for write lock + let (path, writer) = init_new_stream_writer_file(stream_name, schema_key, record)?; + self.insert( + schema_key.to_owned(), + ArrowWriter { + file_path: path, + writer, + }, + ); + } + }; + + Ok(()) + } + + pub fn close_all(self) { + for mut writer in self.0.into_values() { + _ = writer.writer.finish(); + } + } +} + +fn init_new_stream_writer_file( + stream_name: &str, + schema_key: &str, + record: &RecordBatch, +) -> Result<(PathBuf, StreamWriter), StreamWriterError> { + let dir = StorageDir::new(stream_name); + let path = dir.path_by_current_time(schema_key); + + std::fs::create_dir_all(dir.data_path)?; + + let file = OpenOptions::new().create(true).append(true).open(&path)?; + + let mut stream_writer = StreamWriter::try_new(file, &record.schema()) + .expect("File and RecordBatch both are checked"); + + stream_writer + .write(record) + .map_err(StreamWriterError::Writer)?; + + Ok((path, stream_writer)) +} diff --git a/server/src/event/writer/mem_writer.rs b/server/src/event/writer/mem_writer.rs new file mode 100644 index 000000000..1409d9d50 --- /dev/null +++ b/server/src/event/writer/mem_writer.rs @@ -0,0 +1,57 @@ +use std::sync::Arc; + +use arrow_array::RecordBatch; + +use crate::utils::arrow::adapt_batch; + +use super::mutable::MutableColumns; + +#[derive(Default)] +pub struct MemWriter { + read_buffer: Vec, + mutable_buffer: MutableColumns, +} + +impl MemWriter { + pub fn push(&mut self, rb: RecordBatch) { + if self.mutable_buffer.len() + rb.num_rows() < N { + self.mutable_buffer.push(rb) + } else { + let schema = self.mutable_buffer.current_schema(); + let mut new_mutable_buffer = MutableColumns::default(); + new_mutable_buffer.push(RecordBatch::new_empty(Arc::new(schema))); + let mutable_buffer = std::mem::replace(&mut self.mutable_buffer, new_mutable_buffer); + let rb = mutable_buffer.into_recordbatch(); + self.read_buffer.push(rb); + } + } + + #[allow(unused)] + pub fn recordbatch_cloned(&self) -> Vec { + let mut read_buffer = self.read_buffer.clone(); + let rb = self.mutable_buffer.recordbatch_cloned(); + let schema = rb.schema(); + if rb.num_rows() > 0 { + read_buffer.push(rb) + } + + read_buffer + .into_iter() + .map(|rb| adapt_batch(&schema, rb)) + .collect() + } + + pub fn finalize(self) -> Vec { + let mut read_buffer = self.read_buffer; + let rb = self.mutable_buffer.into_recordbatch(); + let schema = rb.schema(); + if rb.num_rows() > 0 { + read_buffer.push(rb) + } + + read_buffer + .into_iter() + .map(|rb| adapt_batch(&schema, rb)) + .collect() + } +} diff --git a/server/src/event/writer/mutable.rs b/server/src/event/writer/mutable.rs new file mode 100644 index 000000000..716f9e446 --- /dev/null +++ b/server/src/event/writer/mutable.rs @@ -0,0 +1,817 @@ +use std::{cmp::Ordering, sync::Arc}; + +use arrow_array::{ + builder::{ + BooleanBuilder, Float64Builder, Int64Builder, ListBuilder, StringBuilder, + TimestampMillisecondBuilder, UInt64Builder, + }, + new_null_array, Array, BooleanArray, Float64Array, Int64Array, ListArray, RecordBatch, + StringArray, TimestampMillisecondArray, UInt64Array, +}; +use arrow_schema::{DataType, Field, Schema, TimeUnit}; +use itertools::Itertools; + +macro_rules! nested_list { + ($t:expr) => { + DataType::List(Box::new(Field::new( + "item", + DataType::List(Box::new(Field::new("item", $t, true))), + true, + ))) + }; +} + +macro_rules! unit_list { + ($t:expr) => { + DataType::List(Box::new(Field::new("item", $t, true))) + }; +} + +#[derive(Debug)] +pub enum NestedListBuilder { + Utf8(ListBuilder>), + Boolean(ListBuilder>), + Int64(ListBuilder>), + UInt64(ListBuilder>), + Timestamp(ListBuilder>), + Float64(ListBuilder>), +} + +impl NestedListBuilder { + pub fn new(data_type: &DataType) -> Self { + match data_type { + DataType::Boolean => NestedListBuilder::Boolean(ListBuilder::new(ListBuilder::new( + BooleanBuilder::new(), + ))), + DataType::Int64 => { + NestedListBuilder::Int64(ListBuilder::new(ListBuilder::new(Int64Builder::new()))) + } + DataType::UInt64 => { + NestedListBuilder::UInt64(ListBuilder::new(ListBuilder::new(UInt64Builder::new()))) + } + DataType::Float64 => NestedListBuilder::Float64(ListBuilder::new(ListBuilder::new( + Float64Builder::new(), + ))), + DataType::Timestamp(_, _) => NestedListBuilder::Timestamp(ListBuilder::new( + ListBuilder::new(TimestampMillisecondBuilder::new()), + )), + DataType::Utf8 => { + NestedListBuilder::Utf8(ListBuilder::new(ListBuilder::new(StringBuilder::new()))) + } + _ => unreachable!(), + } + } + + pub fn data_type(&self) -> DataType { + match self { + NestedListBuilder::Utf8(_) => nested_list!(DataType::Utf8), + NestedListBuilder::Boolean(_) => nested_list!(DataType::Boolean), + NestedListBuilder::Int64(_) => nested_list!(DataType::Int64), + NestedListBuilder::UInt64(_) => nested_list!(DataType::UInt64), + NestedListBuilder::Timestamp(_) => { + nested_list!(DataType::Timestamp(TimeUnit::Millisecond, None)) + } + NestedListBuilder::Float64(_) => nested_list!(DataType::Float64), + } + } +} + +#[derive(Debug)] +pub enum UnitListBuilder { + Utf8(ListBuilder), + Boolean(ListBuilder), + Int64(ListBuilder), + UInt64(ListBuilder), + Timestamp(ListBuilder), + Float64(ListBuilder), + List(NestedListBuilder), +} + +impl UnitListBuilder { + pub fn new(data_type: &DataType) -> Self { + match data_type { + DataType::Boolean => UnitListBuilder::Boolean(ListBuilder::new(BooleanBuilder::new())), + DataType::Int64 => UnitListBuilder::Int64(ListBuilder::new(Int64Builder::new())), + DataType::UInt64 => UnitListBuilder::UInt64(ListBuilder::new(UInt64Builder::new())), + DataType::Float64 => UnitListBuilder::Float64(ListBuilder::new(Float64Builder::new())), + DataType::Timestamp(_, _) => { + UnitListBuilder::Timestamp(ListBuilder::new(TimestampMillisecondBuilder::new())) + } + DataType::Utf8 => UnitListBuilder::Utf8(ListBuilder::new(StringBuilder::new())), + DataType::List(field) => { + UnitListBuilder::List(NestedListBuilder::new(field.data_type())) + } + _ => unreachable!(), + } + } + + pub fn data_type(&self) -> DataType { + match self { + UnitListBuilder::Utf8(_) => unit_list!(DataType::Utf8), + UnitListBuilder::Boolean(_) => unit_list!(DataType::Boolean), + UnitListBuilder::Int64(_) => unit_list!(DataType::Int64), + UnitListBuilder::UInt64(_) => unit_list!(DataType::UInt64), + UnitListBuilder::Timestamp(_) => { + unit_list!(DataType::Timestamp(TimeUnit::Millisecond, None)) + } + UnitListBuilder::Float64(_) => unit_list!(DataType::Float64), + UnitListBuilder::List(inner) => inner.data_type(), + } + } +} + +#[derive(Debug)] +pub enum MutableColumnArray { + Utf8(StringBuilder), + Boolean(BooleanBuilder), + Int64(Int64Builder), + UInt64(UInt64Builder), + Timestamp(TimestampMillisecondBuilder), + Float64(Float64Builder), + List(UnitListBuilder), +} + +impl MutableColumnArray { + pub fn new(data_type: &DataType) -> Self { + match data_type { + DataType::Boolean => MutableColumnArray::Boolean(BooleanBuilder::new()), + DataType::Int64 => MutableColumnArray::Int64(Int64Builder::new()), + DataType::UInt64 => MutableColumnArray::UInt64(UInt64Builder::new()), + DataType::Float64 => MutableColumnArray::Float64(Float64Builder::new()), + DataType::Timestamp(_, _) => { + MutableColumnArray::Timestamp(TimestampMillisecondBuilder::new()) + } + DataType::Utf8 => MutableColumnArray::Utf8(StringBuilder::new()), + DataType::List(field) => { + MutableColumnArray::List(UnitListBuilder::new(field.data_type())) + } + _ => unreachable!(), + } + } + + pub fn data_type(&self) -> DataType { + match self { + MutableColumnArray::Utf8(_) => DataType::Utf8, + MutableColumnArray::Boolean(_) => DataType::Boolean, + MutableColumnArray::Int64(_) => DataType::Int64, + MutableColumnArray::UInt64(_) => DataType::UInt64, + MutableColumnArray::Timestamp(_) => DataType::Timestamp(TimeUnit::Millisecond, None), + MutableColumnArray::Float64(_) => DataType::Float64, + MutableColumnArray::List(inner) => inner.data_type(), + } + } + + fn push_nulls(&mut self, n: usize) { + match self { + MutableColumnArray::Utf8(col) => (0..n).for_each(|_| col.append_null()), + MutableColumnArray::Boolean(col) => col.append_nulls(n), + MutableColumnArray::Int64(col) => col.append_nulls(n), + MutableColumnArray::UInt64(col) => col.append_nulls(n), + MutableColumnArray::Timestamp(col) => col.append_nulls(n), + MutableColumnArray::Float64(col) => col.append_nulls(n), + MutableColumnArray::List(col) => match col { + UnitListBuilder::Utf8(col) => (0..n).for_each(|_| col.append(false)), + UnitListBuilder::Boolean(col) => (0..n).for_each(|_| col.append(false)), + UnitListBuilder::Int64(col) => (0..n).for_each(|_| col.append(false)), + UnitListBuilder::UInt64(col) => (0..n).for_each(|_| col.append(false)), + UnitListBuilder::Timestamp(col) => (0..n).for_each(|_| col.append(false)), + UnitListBuilder::Float64(col) => (0..n).for_each(|_| col.append(false)), + UnitListBuilder::List(col) => match col { + NestedListBuilder::Utf8(col) => (0..n).for_each(|_| col.append(false)), + NestedListBuilder::Boolean(col) => (0..n).for_each(|_| col.append(false)), + NestedListBuilder::Int64(col) => (0..n).for_each(|_| col.append(false)), + NestedListBuilder::UInt64(col) => (0..n).for_each(|_| col.append(false)), + NestedListBuilder::Timestamp(col) => (0..n).for_each(|_| col.append(false)), + NestedListBuilder::Float64(col) => (0..n).for_each(|_| col.append(false)), + }, + }, + } + } + + fn cloned_array(&self) -> Arc { + match self { + MutableColumnArray::Utf8(col) => Arc::new(col.finish_cloned()), + MutableColumnArray::Boolean(col) => Arc::new(col.finish_cloned()), + MutableColumnArray::Int64(col) => Arc::new(col.finish_cloned()), + MutableColumnArray::UInt64(col) => Arc::new(col.finish_cloned()), + MutableColumnArray::Timestamp(col) => Arc::new(col.finish_cloned()), + MutableColumnArray::Float64(col) => Arc::new(col.finish_cloned()), + MutableColumnArray::List(col) => match col { + UnitListBuilder::Utf8(col) => Arc::new(col.finish_cloned()), + UnitListBuilder::Boolean(col) => Arc::new(col.finish_cloned()), + UnitListBuilder::Int64(col) => Arc::new(col.finish_cloned()), + UnitListBuilder::UInt64(col) => Arc::new(col.finish_cloned()), + UnitListBuilder::Timestamp(col) => Arc::new(col.finish_cloned()), + UnitListBuilder::Float64(col) => Arc::new(col.finish_cloned()), + UnitListBuilder::List(col) => match col { + NestedListBuilder::Utf8(col) => Arc::new(col.finish_cloned()), + NestedListBuilder::Boolean(col) => Arc::new(col.finish_cloned()), + NestedListBuilder::Int64(col) => Arc::new(col.finish_cloned()), + NestedListBuilder::UInt64(col) => Arc::new(col.finish_cloned()), + NestedListBuilder::Timestamp(col) => Arc::new(col.finish_cloned()), + NestedListBuilder::Float64(col) => Arc::new(col.finish_cloned()), + }, + }, + } + } + + fn into_array(mut self) -> Arc { + match &mut self { + MutableColumnArray::Utf8(col) => Arc::new(col.finish()), + MutableColumnArray::Boolean(col) => Arc::new(col.finish()), + MutableColumnArray::Int64(col) => Arc::new(col.finish()), + MutableColumnArray::UInt64(col) => Arc::new(col.finish()), + MutableColumnArray::Timestamp(col) => Arc::new(col.finish()), + MutableColumnArray::Float64(col) => Arc::new(col.finish()), + MutableColumnArray::List(col) => match col { + UnitListBuilder::Utf8(col) => Arc::new(col.finish()), + UnitListBuilder::Boolean(col) => Arc::new(col.finish()), + UnitListBuilder::Int64(col) => Arc::new(col.finish()), + UnitListBuilder::UInt64(col) => Arc::new(col.finish()), + UnitListBuilder::Timestamp(col) => Arc::new(col.finish()), + UnitListBuilder::Float64(col) => Arc::new(col.finish()), + UnitListBuilder::List(col) => match col { + NestedListBuilder::Utf8(col) => Arc::new(col.finish()), + NestedListBuilder::Boolean(col) => Arc::new(col.finish()), + NestedListBuilder::Int64(col) => Arc::new(col.finish()), + NestedListBuilder::UInt64(col) => Arc::new(col.finish()), + NestedListBuilder::Timestamp(col) => Arc::new(col.finish()), + NestedListBuilder::Float64(col) => Arc::new(col.finish()), + }, + }, + } + } +} + +#[derive(Debug)] +pub struct MutableColumn { + name: String, + column: MutableColumnArray, +} + +impl MutableColumn { + pub fn new(name: String, column: MutableColumnArray) -> Self { + Self { name, column } + } + + pub fn name(&self) -> &str { + &self.name + } + + pub fn feild(&self) -> Field { + Field::new(&self.name, self.column.data_type(), true) + } +} + +#[derive(Debug, Default)] +pub struct MutableColumns { + columns: Vec, + len: usize, +} + +impl MutableColumns { + pub fn push(&mut self, rb: RecordBatch) { + let num_rows = rb.num_rows(); + let schema = rb.schema(); + let rb = schema.fields().iter().zip(rb.columns().iter()); + + // start index map to next location in self columns + let mut index = 0; + 'rb: for (field, arr) in rb { + // for field in rb look at same field in columns or insert. + // fill with null while traversing if rb field name is greater than column name + while let Some(col) = self.columns.get_mut(index) { + match col.name().cmp(field.name()) { + Ordering::Equal => { + update_column(&mut col.column, Arc::clone(arr)); + // goto next field in rb + index += 1; + continue 'rb; + } + Ordering::Greater => { + let mut new_column = MutableColumn::new( + field.name().to_owned(), + MutableColumnArray::new(field.data_type()), + ); + update_column( + &mut new_column.column, + new_null_array(field.data_type(), self.len), + ); + update_column(&mut new_column.column, Arc::clone(arr)); + self.columns.insert(index, new_column); + index += 1; + continue 'rb; + } + Ordering::Less => { + col.column.push_nulls(num_rows); + index += 1; + } + } + } + + // if inner loop finishes this means this column is suppose to be at the end of columns + let mut new_column = MutableColumn::new( + field.name().to_owned(), + MutableColumnArray::new(field.data_type()), + ); + update_column( + &mut new_column.column, + new_null_array(field.data_type(), self.len), + ); + update_column(&mut new_column.column, Arc::clone(arr)); + self.columns.push(new_column); + index += 1; + } + + // fill any columns yet to be updated with nulls + for col in self.columns[index..].iter_mut() { + col.column.push_nulls(num_rows) + } + + self.len += num_rows + } + + pub fn into_recordbatch(self) -> RecordBatch { + let mut fields = Vec::with_capacity(self.columns.len()); + let mut arrays = Vec::with_capacity(self.columns.len()); + + for MutableColumn { name, column } in self.columns { + let field = Field::new(name, column.data_type(), true); + fields.push(field); + arrays.push(column.into_array()); + } + + RecordBatch::try_new(Arc::new(Schema::new(fields)), arrays).unwrap() + } + + pub fn recordbatch_cloned(&self) -> RecordBatch { + let mut fields = Vec::with_capacity(self.columns.len()); + let mut arrays = Vec::with_capacity(self.columns.len()); + + for MutableColumn { name, column } in &self.columns { + let field = Field::new(name, column.data_type(), true); + fields.push(field); + arrays.push(column.cloned_array()); + } + + RecordBatch::try_new(Arc::new(Schema::new(fields)), arrays).unwrap() + } + + pub fn len(&self) -> usize { + self.len + } + + pub fn current_schema(&self) -> Schema { + Schema::new(self.columns.iter().map(|x| x.feild()).collect_vec()) + } +} + +fn update_column(col: &mut MutableColumnArray, arr: Arc) { + match col { + MutableColumnArray::Utf8(col) => downcast::(&arr) + .iter() + .for_each(|v| col.append_option(v)), + MutableColumnArray::Boolean(col) => downcast::(&arr) + .iter() + .for_each(|v| col.append_option(v)), + MutableColumnArray::Int64(col) => downcast::(&arr) + .iter() + .for_each(|v| col.append_option(v)), + MutableColumnArray::UInt64(col) => downcast::(&arr) + .iter() + .for_each(|v| col.append_option(v)), + MutableColumnArray::Timestamp(col) => downcast::(&arr) + .iter() + .for_each(|v| col.append_option(v)), + MutableColumnArray::Float64(col) => downcast::(&arr) + .iter() + .for_each(|v| col.append_option(v)), + MutableColumnArray::List(col) => match col { + UnitListBuilder::Utf8(col) => { + let arr = into_vec_array(&arr); + let iter = arr + .iter() + .map(|x| x.as_ref().map(|x| downcast::(x).iter())); + col.extend(iter); + } + UnitListBuilder::Boolean(col) => { + let arr = into_vec_array(&arr); + let iter = arr + .iter() + .map(|x| x.as_ref().map(|x| downcast::(x).iter())); + col.extend(iter); + } + UnitListBuilder::Int64(col) => { + let arr = into_vec_array(&arr); + let iter = arr + .iter() + .map(|x| x.as_ref().map(|x| downcast::(x).iter())); + col.extend(iter); + } + UnitListBuilder::UInt64(col) => { + let arr = into_vec_array(&arr); + let iter = arr + .iter() + .map(|x| x.as_ref().map(|x| downcast::(x).iter())); + col.extend(iter); + } + UnitListBuilder::Timestamp(col) => { + let arr = into_vec_array(&arr); + let iter = arr.iter().map(|x| { + x.as_ref() + .map(|x| downcast::(x).iter()) + }); + col.extend(iter); + } + UnitListBuilder::Float64(col) => { + let arr = into_vec_array(&arr); + let iter = arr + .iter() + .map(|x| x.as_ref().map(|x| downcast::(x).iter())); + col.extend(iter); + } + UnitListBuilder::List(col) => match col { + NestedListBuilder::Utf8(col) => { + let arr = into_vec_vec_array(&arr); + let iter = arr.iter().map(|x| { + x.as_ref().map(|arr| { + arr.iter() + .map(|x| x.as_ref().map(|x| downcast::(x).iter())) + }) + }); + + col.extend(iter) + } + NestedListBuilder::Boolean(col) => { + let arr = into_vec_vec_array(&arr); + let iter = arr.iter().map(|x| { + x.as_ref().map(|arr| { + arr.iter() + .map(|x| x.as_ref().map(|x| downcast::(x).iter())) + }) + }); + + col.extend(iter) + } + NestedListBuilder::Int64(col) => { + let arr = into_vec_vec_array(&arr); + + let iter = arr.iter().map(|x| { + x.as_ref().map(|arr| { + arr.iter() + .map(|x| x.as_ref().map(|x| downcast::(x).iter())) + }) + }); + + col.extend(iter) + } + NestedListBuilder::UInt64(col) => { + let arr = into_vec_vec_array(&arr); + + let iter = arr.iter().map(|x| { + x.as_ref().map(|arr| { + arr.iter() + .map(|x| x.as_ref().map(|x| downcast::(x).iter())) + }) + }); + + col.extend(iter) + } + NestedListBuilder::Timestamp(col) => { + let arr = into_vec_vec_array(&arr); + + let iter = arr.iter().map(|x| { + x.as_ref().map(|arr| { + arr.iter().map(|x| { + x.as_ref() + .map(|x| downcast::(x).iter()) + }) + }) + }); + + col.extend(iter) + } + NestedListBuilder::Float64(col) => { + let arr = into_vec_vec_array(&arr); + + let iter = arr.iter().map(|x| { + x.as_ref().map(|arr| { + arr.iter() + .map(|x| x.as_ref().map(|x| downcast::(x).iter())) + }) + }); + + col.extend(iter) + } + }, + }, + }; +} + +fn downcast(arr: &dyn Array) -> &T { + arr.as_any().downcast_ref::().unwrap() +} + +type VecArray = Vec>>; + +fn into_vec_array(arr: &dyn Array) -> VecArray { + arr.as_any() + .downcast_ref::() + .unwrap() + .iter() + .collect() +} + +fn into_vec_vec_array(arr: &dyn Array) -> Vec> { + arr.as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|arr| { + arr.map(|arr| { + arr.as_any() + .downcast_ref::() + .unwrap() + .iter() + .collect() + }) + }) + .collect() +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use arrow_array::{BooleanArray, RecordBatch}; + use arrow_schema::{DataType, Field, Schema}; + + use super::{MutableColumnArray, MutableColumns}; + + #[test] + fn empty_columns_push_single_col() { + let mut columns = MutableColumns::default(); + + let schema = Schema::new(vec![Field::new("a", DataType::Boolean, true)]); + let col1 = Arc::new(BooleanArray::from(vec![true, false, true])); + let rb = RecordBatch::try_new(Arc::new(schema), vec![col1]).unwrap(); + + columns.push(rb); + + assert_eq!(columns.columns.len(), 1) + } + + #[test] + fn empty_columns_push_empty_rb() { + let mut columns = MutableColumns::default(); + + let schema = Schema::new(vec![Field::new("a", DataType::Boolean, true)]); + let rb = RecordBatch::new_empty(Arc::new(schema)); + + columns.push(rb); + + assert_eq!(columns.columns.len(), 1); + assert_eq!(columns.len, 0); + } + + #[test] + fn one_empty_column_push_new_empty_column_before() { + let mut columns = MutableColumns::default(); + + let schema = Schema::new(vec![Field::new("b", DataType::Boolean, true)]); + let rb = RecordBatch::new_empty(Arc::new(schema)); + columns.push(rb); + + let schema = Schema::new(vec![Field::new("a", DataType::Boolean, true)]); + let rb = RecordBatch::new_empty(Arc::new(schema)); + columns.push(rb); + + assert_eq!(columns.columns.len(), 2); + assert_eq!(columns.len, 0); + } + + #[test] + fn one_column_push_new_column_before() { + let mut columns = MutableColumns::default(); + + let schema = Schema::new(vec![Field::new("b", DataType::Boolean, true)]); + let col2 = Arc::new(BooleanArray::from(vec![true, false, true])); + let rb = RecordBatch::try_new(Arc::new(schema), vec![col2]).unwrap(); + columns.push(rb); + + assert_eq!(columns.columns.len(), 1); + assert_eq!(columns.len, 3); + + let MutableColumnArray::Boolean(builder) = &columns.columns[0].column else {unreachable!()}; + { + let arr = builder.finish_cloned(); + assert_eq!( + arr.iter().collect::>(), + vec![Some(true), Some(false), Some(true)] + ) + } + + let schema = Schema::new(vec![Field::new("a", DataType::Boolean, true)]); + let col1 = Arc::new(BooleanArray::from(vec![true, true, true])); + let rb = RecordBatch::try_new(Arc::new(schema), vec![col1]).unwrap(); + columns.push(rb); + + assert_eq!(columns.columns.len(), 2); + assert_eq!(columns.len, 6); + + let MutableColumnArray::Boolean(builder) = &mut columns.columns[0].column else {unreachable!()}; + { + let arr = builder.finish(); + assert_eq!( + arr.iter().collect::>(), + vec![None, None, None, Some(true), Some(true), Some(true)] + ) + } + + let MutableColumnArray::Boolean(builder) = &mut columns.columns[1].column else {unreachable!()}; + { + let arr = builder.finish(); + assert_eq!( + arr.iter().collect::>(), + vec![Some(true), Some(false), Some(true), None, None, None] + ) + } + } + + #[test] + fn two_column_push_new_column_before() { + let mut columns = MutableColumns::default(); + let schema = Schema::new(vec![ + Field::new("b", DataType::Boolean, true), + Field::new("c", DataType::Boolean, true), + ]); + let rb = RecordBatch::try_new( + Arc::new(schema), + vec![ + Arc::new(BooleanArray::from(vec![false, true, false])), + Arc::new(BooleanArray::from(vec![false, false, true])), + ], + ) + .unwrap(); + columns.push(rb); + + assert_eq!(columns.columns.len(), 2); + assert_eq!(columns.len, 3); + + let schema = Schema::new(vec![Field::new("a", DataType::Boolean, true)]); + let rb = RecordBatch::try_new( + Arc::new(schema), + vec![Arc::new(BooleanArray::from(vec![true, false, false]))], + ) + .unwrap(); + columns.push(rb); + + assert_eq!(columns.columns.len(), 3); + assert_eq!(columns.len, 6); + + let MutableColumnArray::Boolean(builder) = &mut columns.columns[0].column else {unreachable!()}; + { + let arr = builder.finish(); + assert_eq!( + arr.iter().collect::>(), + vec![None, None, None, Some(true), Some(false), Some(false)] + ) + } + + let MutableColumnArray::Boolean(builder) = &mut columns.columns[1].column else {unreachable!()}; + { + let arr = builder.finish(); + assert_eq!( + arr.iter().collect::>(), + vec![Some(false), Some(true), Some(false), None, None, None] + ) + } + + let MutableColumnArray::Boolean(builder) = &mut columns.columns[2].column else {unreachable!()}; + { + let arr = builder.finish(); + assert_eq!( + arr.iter().collect::>(), + vec![Some(false), Some(false), Some(true), None, None, None] + ) + } + } + + #[test] + fn two_column_push_new_column_middle() { + let mut columns = MutableColumns::default(); + let schema = Schema::new(vec![ + Field::new("a", DataType::Boolean, true), + Field::new("c", DataType::Boolean, true), + ]); + let rb = RecordBatch::try_new( + Arc::new(schema), + vec![ + Arc::new(BooleanArray::from(vec![true, false, false])), + Arc::new(BooleanArray::from(vec![false, false, true])), + ], + ) + .unwrap(); + columns.push(rb); + + assert_eq!(columns.columns.len(), 2); + assert_eq!(columns.len, 3); + + let schema = Schema::new(vec![Field::new("b", DataType::Boolean, true)]); + let rb = RecordBatch::try_new( + Arc::new(schema), + vec![Arc::new(BooleanArray::from(vec![false, true, false]))], + ) + .unwrap(); + columns.push(rb); + + assert_eq!(columns.columns.len(), 3); + assert_eq!(columns.len, 6); + + let MutableColumnArray::Boolean(builder) = &mut columns.columns[0].column else {unreachable!()}; + { + let arr = builder.finish(); + assert_eq!( + arr.iter().collect::>(), + vec![Some(true), Some(false), Some(false), None, None, None] + ) + } + + let MutableColumnArray::Boolean(builder) = &mut columns.columns[1].column else {unreachable!()}; + { + let arr = builder.finish(); + assert_eq!( + arr.iter().collect::>(), + vec![None, None, None, Some(false), Some(true), Some(false)] + ) + } + + let MutableColumnArray::Boolean(builder) = &mut columns.columns[2].column else {unreachable!()}; + { + let arr = builder.finish(); + assert_eq!( + arr.iter().collect::>(), + vec![Some(false), Some(false), Some(true), None, None, None] + ) + } + } + + #[test] + fn two_column_push_new_column_after() { + let mut columns = MutableColumns::default(); + let schema = Schema::new(vec![ + Field::new("a", DataType::Boolean, true), + Field::new("b", DataType::Boolean, true), + ]); + let rb = RecordBatch::try_new( + Arc::new(schema), + vec![ + Arc::new(BooleanArray::from(vec![true, false, false])), + Arc::new(BooleanArray::from(vec![false, true, false])), + ], + ) + .unwrap(); + columns.push(rb); + + assert_eq!(columns.columns.len(), 2); + assert_eq!(columns.len, 3); + + let schema = Schema::new(vec![Field::new("c", DataType::Boolean, true)]); + let rb = RecordBatch::try_new( + Arc::new(schema), + vec![Arc::new(BooleanArray::from(vec![false, false, true]))], + ) + .unwrap(); + columns.push(rb); + + assert_eq!(columns.columns.len(), 3); + assert_eq!(columns.len, 6); + + let MutableColumnArray::Boolean(builder) = &mut columns.columns[0].column else {unreachable!()}; + { + let arr = builder.finish(); + assert_eq!( + arr.iter().collect::>(), + vec![Some(true), Some(false), Some(false), None, None, None] + ) + } + + let MutableColumnArray::Boolean(builder) = &mut columns.columns[1].column else {unreachable!()}; + { + let arr = builder.finish(); + assert_eq!( + arr.iter().collect::>(), + vec![Some(false), Some(true), Some(false), None, None, None] + ) + } + + let MutableColumnArray::Boolean(builder) = &mut columns.columns[2].column else {unreachable!()}; + { + let arr = builder.finish(); + assert_eq!( + arr.iter().collect::>(), + vec![None, None, None, Some(false), Some(false), Some(true)] + ) + } + } +} diff --git a/server/src/main.rs b/server/src/main.rs index 90762cb8e..64d80feb3 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -81,7 +81,6 @@ async fn main() -> anyhow::Result<()> { } // track all parquet files already in the data directory - storage::CACHED_FILES.track_parquet(); storage::retention::load_retention_from_global().await; // load data from stats back to prometheus metrics metrics::load_from_global_stats(); @@ -193,11 +192,7 @@ fn run_local_sync() -> (JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<( let mut scheduler = Scheduler::new(); scheduler .every((storage::LOCAL_SYNC_INTERVAL as u32).seconds()) - .run(move || { - if let Err(e) = crate::event::STREAM_WRITERS.unset_all() { - log::warn!("failed to sync local data. {:?}", e); - } - }); + .run(move || crate::event::STREAM_WRITERS.unset_all()); loop { thread::sleep(Duration::from_millis(50)); diff --git a/server/src/metadata.rs b/server/src/metadata.rs index 2306ce6e9..aec6375da 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -16,16 +16,17 @@ * */ +use arrow_array::RecordBatch; use arrow_schema::Schema; use once_cell::sync::Lazy; use std::collections::HashMap; use std::sync::{Arc, RwLock}; use crate::alerts::Alerts; -use crate::event::Event; use crate::metrics::{EVENTS_INGESTED, EVENTS_INGESTED_SIZE}; use crate::stats::{Stats, StatsCounter}; -use crate::storage::{MergedRecordReader, ObjectStorage, StorageDir}; +use crate::storage::{ObjectStorage, StorageDir}; +use crate::utils::arrow::MergedRecordReader; use self::error::stream_info::{CheckAlertError, LoadError, MetadataError}; use derive_more::{Deref, DerefMut}; @@ -64,16 +65,18 @@ pub const LOCK_EXPECT: &str = "no method in metadata should panic while holding // 4. When first event is sent to stream (update the schema) // 5. When set alert API is called (update the alert) impl StreamInfo { - pub async fn check_alerts(&self, event: &Event) -> Result<(), CheckAlertError> { + pub async fn check_alerts( + &self, + stream_name: &str, + rb: RecordBatch, + ) -> Result<(), CheckAlertError> { let map = self.read().expect(LOCK_EXPECT); let meta = map - .get(&event.stream_name) - .ok_or(MetadataError::StreamMetaNotFound( - event.stream_name.to_owned(), - ))?; + .get(stream_name) + .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_owned()))?; for alert in &meta.alerts.alerts { - alert.check_alert(event.stream_name.clone(), event.rb.clone()) + alert.check_alert(stream_name, rb.clone()) } Ok(()) diff --git a/server/src/option.rs b/server/src/option.rs index f87c68d48..585e29f4d 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -20,6 +20,7 @@ use clap::error::ErrorKind; use clap::{command, value_parser, Arg, Args, Command, FromArgMatches}; use once_cell::sync::Lazy; +use parquet::basic::{BrotliLevel, GzipLevel, ZstdLevel}; use std::path::{Path, PathBuf}; use std::sync::Arc; @@ -409,11 +410,11 @@ impl From for parquet::basic::Compression { match value { Compression::UNCOMPRESSED => parquet::basic::Compression::UNCOMPRESSED, Compression::SNAPPY => parquet::basic::Compression::SNAPPY, - Compression::GZIP => parquet::basic::Compression::GZIP, + Compression::GZIP => parquet::basic::Compression::GZIP(GzipLevel::default()), Compression::LZO => parquet::basic::Compression::LZO, - Compression::BROTLI => parquet::basic::Compression::BROTLI, + Compression::BROTLI => parquet::basic::Compression::BROTLI(BrotliLevel::default()), Compression::LZ4 => parquet::basic::Compression::LZ4, - Compression::ZSTD => parquet::basic::Compression::ZSTD, + Compression::ZSTD => parquet::basic::Compression::ZSTD(ZstdLevel::default()), } } } diff --git a/server/src/query.rs b/server/src/query.rs index 897b2699b..270fa388b 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -16,8 +16,6 @@ * */ -mod table_provider; - use chrono::TimeZone; use chrono::{DateTime, Utc}; use datafusion::arrow::datatypes::Schema; diff --git a/server/src/query/table_provider.rs b/server/src/query/table_provider.rs deleted file mode 100644 index 7dcb19c5e..000000000 --- a/server/src/query/table_provider.rs +++ /dev/null @@ -1,227 +0,0 @@ -/* - * Parseable Server (C) 2022 - 2023 Parseable, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - */ - -#![allow(unused)] - -use async_trait::async_trait; -use datafusion::arrow::datatypes::{Schema, SchemaRef}; -use datafusion::arrow::ipc::reader::StreamReader; -use datafusion::arrow::record_batch::RecordBatch; -use datafusion::datasource::file_format::parquet::ParquetFormat; -use datafusion::datasource::listing::{ - ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, -}; -use datafusion::datasource::{MemTable, TableProvider}; -use datafusion::error::DataFusionError; -use datafusion::execution::context::SessionState; -use datafusion::logical_expr::TableType; -use datafusion::physical_plan::union::UnionExec; -use datafusion::physical_plan::ExecutionPlan; -use datafusion::prelude::Expr; -use itertools::Itertools; -use std::any::Any; -use std::fs::File; -use std::path::PathBuf; -use std::sync::Arc; - -use crate::storage::ObjectStorage; - -pub struct QueryTableProvider { - // parquet - ( arrow files ) - staging_arrows: Vec<(PathBuf, Vec)>, - other_staging_parquet: Vec, - storage_prefixes: Vec, - storage: Arc, - schema: Arc, -} - -impl QueryTableProvider { - pub fn new( - staging_arrows: Vec<(PathBuf, Vec)>, - other_staging_parquet: Vec, - storage_prefixes: Vec, - storage: Arc, - schema: Arc, - ) -> Self { - // By the time this query executes the arrow files could be converted to parquet files - // we want to preserve these files as well in case - let mut parquet_cached = crate::storage::CACHED_FILES.lock().expect("no poisoning"); - - for file in staging_arrows - .iter() - .map(|(p, _)| p) - .chain(other_staging_parquet.iter()) - { - parquet_cached.upsert(file) - } - - Self { - staging_arrows, - other_staging_parquet, - storage_prefixes, - storage, - schema, - } - } - - async fn create_physical_plan( - &self, - ctx: &SessionState, - projection: Option<&Vec>, - filters: &[Expr], - limit: Option, - ) -> Result, DataFusionError> { - let mut mem_records: Vec> = Vec::new(); - let mut parquet_files = Vec::new(); - - for (staging_parquet, arrow_files) in &self.staging_arrows { - if !load_arrows(arrow_files, &self.schema, &mut mem_records) { - parquet_files.push(staging_parquet.clone()) - } - } - - parquet_files.extend(self.other_staging_parquet.clone()); - - let memtable = MemTable::try_new(Arc::clone(&self.schema), mem_records)?; - let memexec = memtable.scan(ctx, projection, filters, limit).await?; - - let cache_exec = if parquet_files.is_empty() { - memexec - } else { - match local_parquet_table(&parquet_files, &self.schema) { - Some(table) => { - let listexec = table.scan(ctx, projection, filters, limit).await?; - Arc::new(UnionExec::new(vec![memexec, listexec])) - } - None => memexec, - } - }; - - let mut exec = vec![cache_exec]; - - let table = self - .storage - .query_table(self.storage_prefixes.clone(), Arc::clone(&self.schema))?; - - if let Some(ref storage_listing) = table { - exec.push( - storage_listing - .scan(ctx, projection, filters, limit) - .await?, - ); - } - - Ok(Arc::new(UnionExec::new(exec))) - } -} - -impl Drop for QueryTableProvider { - fn drop(&mut self) { - let mut parquet_cached = crate::storage::CACHED_FILES.lock().expect("no poisoning"); - for file in self - .staging_arrows - .iter() - .map(|(p, _)| p) - .chain(self.other_staging_parquet.iter()) - { - parquet_cached.remove(file) - } - } -} - -#[async_trait] -impl TableProvider for QueryTableProvider { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema(&self) -> SchemaRef { - Arc::clone(&self.schema) - } - - fn table_type(&self) -> TableType { - TableType::Base - } - - async fn scan( - &self, - ctx: &SessionState, - projection: Option<&Vec>, - filters: &[Expr], - limit: Option, - ) -> datafusion::error::Result> { - self.create_physical_plan(ctx, projection, filters, limit) - .await - } -} - -fn local_parquet_table(parquet_files: &[PathBuf], schema: &SchemaRef) -> Option { - let listing_options = ListingOptions { - file_extension: ".parquet".to_owned(), - format: Arc::new(ParquetFormat::default().with_enable_pruning(Some(true))), - file_sort_order: None, - infinite_source: false, - table_partition_cols: vec![], - collect_stat: true, - target_partitions: 1, - }; - - let paths = parquet_files - .iter() - .flat_map(|path| { - ListingTableUrl::parse(path.to_str().expect("path should is valid unicode")) - }) - .collect_vec(); - - if paths.is_empty() { - return None; - } - - let config = ListingTableConfig::new_with_multi_paths(paths) - .with_listing_options(listing_options) - .with_schema(Arc::clone(schema)); - - match ListingTable::try_new(config) { - Ok(table) => Some(table), - Err(err) => { - log::error!("Local parquet query failed due to err: {err}"); - None - } - } -} - -fn load_arrows( - files: &[PathBuf], - schema: &Schema, - mem_records: &mut Vec>, -) -> bool { - let mut stream_readers = Vec::with_capacity(files.len()); - - for file in files { - let Ok(arrow_file) = File::open(file) else { return false; }; - let Ok(reader)= StreamReader::try_new(arrow_file, None) else { return false; }; - stream_readers.push(reader.into()); - } - - let reader = crate::storage::MergedRecordReader { - readers: stream_readers, - }; - let records = reader.merged_iter(schema).collect(); - mem_records.push(records); - true -} diff --git a/server/src/storage.rs b/server/src/storage.rs index f0bf16e64..46f80b979 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -16,38 +16,27 @@ * */ -use crate::metadata::STREAM_INFO; use crate::option::CONFIG; - use crate::stats::Stats; -use crate::storage::file_link::{FileLink, FileTable}; -use crate::utils; -use chrono::{Local, NaiveDateTime, Timelike, Utc}; -use datafusion::arrow::error::ArrowError; -use datafusion::parquet::errors::ParquetError; -use derive_more::{Deref, DerefMut}; -use once_cell::sync::Lazy; +use chrono::Local; -use std::collections::HashMap; use std::fmt::Debug; use std::fs::create_dir_all; -use std::path::{Path, PathBuf}; -use std::sync::Mutex; -mod file_link; mod localfs; mod object_storage; pub mod retention; mod s3; +pub mod staging; mod store_metadata; pub use localfs::{FSConfig, LocalFS}; -pub use object_storage::MergedRecordReader; pub use object_storage::{ObjectStorage, ObjectStorageProvider}; pub use s3::{S3Config, S3}; pub use store_metadata::StorageMetadata; +pub use self::staging::StorageDir; use self::store_metadata::{put_staging_metadata, EnvChange}; /// local sync interval to move data.records to /tmp dir of that stream. @@ -183,151 +172,11 @@ async fn create_remote_metadata(metadata: &StorageMetadata) -> Result<(), Object client.put_metadata(metadata).await } -pub static CACHED_FILES: Lazy = - Lazy::new(|| CachedFiles(Mutex::new(FileTable::new()))); - -#[derive(Debug, Deref, DerefMut)] -pub struct CachedFiles(Mutex>); - -impl CachedFiles { - pub fn track_parquet(&self) { - let mut table = self.lock().expect("no poisoning"); - STREAM_INFO - .list_streams() - .into_iter() - .flat_map(|ref stream_name| StorageDir::new(stream_name).parquet_files().into_iter()) - .for_each(|ref path| table.upsert(path)) - } -} - #[derive(serde::Serialize)] pub struct LogStream { pub name: String, } -#[derive(Debug)] -pub struct StorageDir { - pub data_path: PathBuf, -} - -impl StorageDir { - pub fn new(stream_name: &str) -> Self { - let data_path = CONFIG.parseable.local_stream_data_path(stream_name); - - Self { data_path } - } - - fn file_time_suffix(time: NaiveDateTime) -> String { - let uri = utils::date_to_prefix(time.date()) - + &utils::hour_to_prefix(time.hour()) - + &utils::minute_to_prefix(time.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(); - let local_uri = str::replace(&uri, "/", "."); - let hostname = utils::hostname_unchecked(); - format!("{local_uri}{hostname}.data.arrows") - } - - fn filename_by_time(stream_hash: &str, time: NaiveDateTime) -> String { - format!("{}.{}", stream_hash, Self::file_time_suffix(time)) - } - - fn filename_by_current_time(stream_hash: &str) -> String { - let datetime = Utc::now(); - Self::filename_by_time(stream_hash, datetime.naive_utc()) - } - - pub fn path_by_current_time(&self, stream_hash: &str) -> PathBuf { - self.data_path - .join(Self::filename_by_current_time(stream_hash)) - } - - pub fn arrow_files(&self) -> Vec { - let Ok(dir) = self.data_path - .read_dir() else { return vec![] }; - - let paths: Vec = dir - .flatten() - .map(|file| file.path()) - .filter(|file| file.extension().map_or(false, |ext| ext.eq("arrows"))) - .collect(); - - paths - } - - #[allow(unused)] - pub fn arrow_files_grouped_by_time(&self) -> HashMap> { - // hashmap - let mut grouped_arrow_file: HashMap> = HashMap::new(); - let arrow_files = self.arrow_files(); - for arrow_file_path in arrow_files { - let key = Self::arrow_path_to_parquet(&arrow_file_path); - grouped_arrow_file - .entry(key) - .or_default() - .push(arrow_file_path); - } - - grouped_arrow_file - } - - pub fn arrow_files_grouped_exclude_time( - &self, - exclude: NaiveDateTime, - ) -> HashMap> { - let hot_filename = StorageDir::file_time_suffix(exclude); - // hashmap but exclude where hotfilename matches - let mut grouped_arrow_file: HashMap> = HashMap::new(); - let mut arrow_files = self.arrow_files(); - arrow_files.retain(|path| { - !path - .file_name() - .unwrap() - .to_str() - .unwrap() - .ends_with(&hot_filename) - }); - for arrow_file_path in arrow_files { - let key = Self::arrow_path_to_parquet(&arrow_file_path); - grouped_arrow_file - .entry(key) - .or_default() - .push(arrow_file_path); - } - - grouped_arrow_file - } - - pub fn parquet_files(&self) -> Vec { - let Ok(dir) = self.data_path - .read_dir() else { return vec![] }; - - dir.flatten() - .map(|file| file.path()) - .filter(|file| file.extension().map_or(false, |ext| ext.eq("parquet"))) - .collect() - } - - fn arrow_path_to_parquet(path: &Path) -> PathBuf { - let filename = path.file_name().unwrap().to_str().unwrap(); - let (_, filename) = filename.split_once('.').unwrap(); - let mut parquet_path = path.to_owned(); - parquet_path.set_file_name(filename); - parquet_path.set_extension("parquet"); - parquet_path - } -} - -#[derive(Debug, thiserror::Error)] -pub enum MoveDataError { - #[error("Unable to create recordbatch stream")] - Arrow(#[from] ArrowError), - #[error("Could not generate parquet file")] - Parquet(#[from] ParquetError), - #[error("Object Storage Error {0}")] - ObjectStorage(#[from] ObjectStorageError), - #[error("Could not generate parquet file")] - Create, -} - #[derive(Debug, thiserror::Error)] pub enum ObjectStorageError { // no such key inside the object storage diff --git a/server/src/storage/file_link.rs b/server/src/storage/file_link.rs deleted file mode 100644 index ebf38977f..000000000 --- a/server/src/storage/file_link.rs +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Parseable Server (C) 2022 - 2023 Parseable, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - */ - -use std::{ - collections::HashMap, - path::{Path, PathBuf}, -}; - -pub trait Link { - fn links(&self) -> usize; - fn increase_link_count(&mut self) -> usize; - fn decreate_link_count(&mut self) -> usize; -} - -#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)] -pub enum CacheState { - #[default] - Idle, - Uploading, - Uploaded, -} - -#[derive(Debug)] -pub struct FileLink { - link: usize, - pub metadata: CacheState, -} - -impl Default for FileLink { - fn default() -> Self { - Self { - link: 1, - metadata: CacheState::Idle, - } - } -} - -impl FileLink { - pub fn set_metadata(&mut self, state: CacheState) { - self.metadata = state - } -} - -impl Link for FileLink { - fn links(&self) -> usize { - self.link - } - - fn increase_link_count(&mut self) -> usize { - self.link.saturating_add(1) - } - - fn decreate_link_count(&mut self) -> usize { - self.link.saturating_sub(1) - } -} - -#[derive(Debug)] -pub struct FileTable { - inner: HashMap, -} - -impl FileTable { - pub fn new() -> Self { - Self { - inner: HashMap::default(), - } - } - - pub fn upsert(&mut self, path: &Path) { - if let Some(entry) = self.inner.get_mut(path) { - entry.increase_link_count(); - } else { - self.inner.insert(path.to_path_buf(), L::default()); - } - } - - pub fn remove(&mut self, path: &Path) { - let Some(link_count) = self.inner.get_mut(path).map(|entry| entry.decreate_link_count()) else { return }; - if link_count == 0 { - let _ = std::fs::remove_file(path); - self.inner.remove(path); - } - } - - pub fn get_mut(&mut self, path: &Path) -> Option<&mut L> { - self.inner.get_mut(path) - } -} diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index e498d960a..892f4902f 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -17,43 +17,30 @@ */ use super::{ - file_link::CacheState, retention::Retention, LogStream, MoveDataError, ObjectStorageError, - ObjectStoreFormat, Permisssion, StorageDir, StorageMetadata, CACHED_FILES, + retention::Retention, + staging::{self, convert_disk_files_to_parquet, convert_mem_to_parquet}, + LogStream, ObjectStorageError, ObjectStoreFormat, Permisssion, StorageDir, StorageMetadata, }; use crate::{ alerts::Alerts, metadata::STREAM_INFO, - metrics::{storage::StorageMetrics, STAGING_FILES, STORAGE_SIZE}, + metrics::{storage::StorageMetrics, STORAGE_SIZE}, option::CONFIG, stats::Stats, - utils::batch_adapter::adapt_batch, }; use actix_web_prometheus::PrometheusMetrics; -use arrow_array::{RecordBatch, TimestampMillisecondArray}; -use arrow_ipc::reader::StreamReader; use arrow_schema::Schema; use async_trait::async_trait; use bytes::Bytes; use datafusion::{ datasource::listing::ListingTable, error::DataFusionError, execution::runtime_env::RuntimeEnv, }; -use itertools::kmerge_by; -use parquet::{ - arrow::ArrowWriter, basic::Encoding, file::properties::WriterProperties, - schema::types::ColumnPath, -}; use relative_path::RelativePath; use relative_path::RelativePathBuf; use serde_json::Value; -use std::{ - collections::HashMap, - fs::{self, File}, - path::{Path, PathBuf}, - process, - sync::Arc, -}; +use std::{collections::HashMap, path::Path, sync::Arc}; // metadata file names in a Stream prefix pub(super) const STREAM_METADATA_FILE_NAME: &str = ".stream.json"; @@ -250,7 +237,7 @@ pub trait ObjectStorage: Sync + 'static { } } - async fn sync(&self) -> Result<(), MoveDataError> { + async fn sync(&self) -> Result<(), ObjectStorageError> { if !Path::new(&CONFIG.staging_dir()).exists() { return Ok(()); } @@ -259,76 +246,28 @@ pub trait ObjectStorage: Sync + 'static { let mut stream_stats = HashMap::new(); - for stream in &streams { - let dir = StorageDir::new(stream); - // walk dir, find all .arrows files and convert to parquet - // Do not include file which is being written to - let time = chrono::Utc::now().naive_utc(); - let staging_files = dir.arrow_files_grouped_exclude_time(time); - if staging_files.is_empty() { - STAGING_FILES.with_label_values(&[stream]).set(0); - } - - for (parquet_path, files) in staging_files { - STAGING_FILES - .with_label_values(&[stream]) - .set(files.len() as i64); - - for file in &files { - let file_size = file.metadata().unwrap().len(); - let file_type = file.extension().unwrap().to_str().unwrap(); - - STORAGE_SIZE - .with_label_values(&["staging", stream, file_type]) - .add(file_size as i64); - } - - let record_reader = MergedRecordReader::try_new(&files).unwrap(); - let parquet_file = { - let mut parquet_table = CACHED_FILES.lock().unwrap(); - let parquet_file = - fs::File::create(&parquet_path).map_err(|_| MoveDataError::Create)?; - parquet_table.upsert(&parquet_path); - parquet_file - }; - let props = WriterProperties::builder() - .set_max_row_group_size(CONFIG.parseable.row_group_size) - .set_compression(CONFIG.parseable.parquet_compression.into()) - .set_column_encoding( - ColumnPath::new(vec!["p_timestamp".to_string()]), - Encoding::DELTA_BINARY_PACKED, - ) - .build(); - let merged_schema = record_reader.merged_schema(); - commit_schema_to_storage(stream, merged_schema.clone()).await?; - let schema = Arc::new(merged_schema); - let mut writer = ArrowWriter::try_new(parquet_file, schema.clone(), Some(props))?; - - for ref record in record_reader.merged_iter(&schema) { - writer.write(record)?; + for (stream_name, bufs) in staging::take_all_read_bufs() { + dbg!(&stream_name, bufs.len()); + for buf in bufs { + dbg!(&buf.time); + let schema = dbg!(convert_mem_to_parquet(&stream_name, buf)) + .map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?; + if let Some(schema) = schema { + commit_schema_to_storage(&stream_name, schema).await?; } + } + } - writer.close()?; + for stream in &streams { + let dir = StorageDir::new(stream); + let schema = convert_disk_files_to_parquet(stream, &dir) + .map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?; - for file in files { - if fs::remove_file(file).is_err() { - log::error!("Failed to delete file. Unstable state"); - process::abort() - } - } + if let Some(schema) = schema { + commit_schema_to_storage(stream, schema).await?; } for file in dir.parquet_files() { - let Some(metadata) = CACHED_FILES - .lock() - .unwrap() - .get_mut(&file) - .map(|fl| fl.metadata) else { continue }; - - if metadata != CacheState::Idle { - continue; - } - let filename = file .file_name() .expect("only parquet files are returned by iterator") @@ -337,42 +276,14 @@ pub trait ObjectStorage: Sync + 'static { let file_suffix = str::replacen(filename, ".", "/", 3); let objectstore_path = format!("{stream}/{file_suffix}"); - CACHED_FILES - .lock() - .unwrap() - .get_mut(&file) - .expect("entry checked at the start") - .set_metadata(CacheState::Uploading); - let compressed_size = file.metadata().map_or(0, |meta| meta.len()); - match self.upload_file(&objectstore_path, &file).await { - Ok(()) => { - CACHED_FILES - .lock() - .unwrap() - .get_mut(&file) - .expect("entry checked at the start") - .set_metadata(CacheState::Uploaded); - } - Err(e) => { - CACHED_FILES - .lock() - .unwrap() - .get_mut(&file) - .expect("entry checked at the start") - .set_metadata(CacheState::Idle); - - return Err(e.into()); - } - } + self.upload_file(&objectstore_path, &file).await?; stream_stats .entry(stream) .and_modify(|size| *size += compressed_size) .or_insert_with(|| compressed_size); - - CACHED_FILES.lock().unwrap().remove(&file); } } @@ -406,83 +317,6 @@ async fn commit_schema_to_storage( storage.put_schema(stream_name, &new_schema).await } -#[derive(Debug)] -pub struct Reader { - reader: StreamReader, - timestamp_col_index: usize, -} - -impl From> for Reader { - fn from(reader: StreamReader) -> Self { - let timestamp_col_index = reader - .schema() - .all_fields() - .binary_search_by(|field| field.name().as_str().cmp("p_timestamp")) - .expect("schema should have this field"); - - Self { - reader, - timestamp_col_index, - } - } -} - -#[derive(Debug)] -pub struct MergedRecordReader { - pub readers: Vec, -} - -impl MergedRecordReader { - pub fn try_new(files: &[PathBuf]) -> Result { - let mut readers = Vec::with_capacity(files.len()); - - for file in files { - let reader = StreamReader::try_new(File::open(file).unwrap(), None)?; - readers.push(reader.into()); - } - - Ok(Self { readers }) - } - - pub fn merged_iter(self, schema: &Schema) -> impl Iterator + '_ { - let adapted_readers = self.readers.into_iter().map(move |reader| { - reader - .reader - .flatten() - .zip(std::iter::repeat(reader.timestamp_col_index)) - }); - - kmerge_by( - adapted_readers, - |(a, a_col): &(RecordBatch, usize), (b, b_col): &(RecordBatch, usize)| { - let a: &TimestampMillisecondArray = a - .column(*a_col) - .as_any() - .downcast_ref::() - .unwrap(); - - let b: &TimestampMillisecondArray = b - .column(*b_col) - .as_any() - .downcast_ref::() - .unwrap(); - - a.value(0) < b.value(0) - }, - ) - .map(|(batch, _)| adapt_batch(schema, batch)) - } - - pub fn merged_schema(&self) -> Schema { - Schema::try_merge( - self.readers - .iter() - .map(|reader| reader.reader.schema().as_ref().clone()), - ) - .unwrap() - } -} - #[inline(always)] fn to_bytes(any: &(impl ?Sized + serde::Serialize)) -> Bytes { serde_json::to_vec(any) diff --git a/server/src/storage/staging.rs b/server/src/storage/staging.rs new file mode 100644 index 000000000..6411e62e5 --- /dev/null +++ b/server/src/storage/staging.rs @@ -0,0 +1,282 @@ +use std::{ + collections::HashMap, + fs, + path::{Path, PathBuf}, + process, + sync::{Arc, RwLock}, +}; + +use arrow_array::RecordBatch; +use arrow_schema::{ArrowError, Schema}; +use chrono::{NaiveDateTime, Timelike, Utc}; +use once_cell::sync::Lazy; +use parquet::{ + arrow::ArrowWriter, + basic::Encoding, + errors::ParquetError, + file::properties::{WriterProperties, WriterPropertiesBuilder}, + schema::types::ColumnPath, +}; + +use crate::{ + metrics, + option::CONFIG, + storage::OBJECT_STORE_DATA_GRANULARITY, + utils::{ + self, + arrow::{adapt_batch, MergedRecordReader}, + }, +}; + +// in mem global that hold all the in mem buffer that are ready to convert +pub static MEMORY_READ_BUFFERS: Lazy>>> = + Lazy::new(RwLock::default); + +// this function takes all the read bufs per stream +pub fn take_all_read_bufs() -> Vec<(String, Vec)> { + let mut res = Vec::new(); + for (stream_name, bufs) in MEMORY_READ_BUFFERS.write().unwrap().iter_mut() { + let stream_name = stream_name.to_owned(); + let bufs = std::mem::take(bufs); + res.push((stream_name, bufs)); + } + res +} + +pub struct ReadBuf { + pub time: NaiveDateTime, + pub buf: Vec, +} + +#[derive(Debug)] +pub struct StorageDir { + pub data_path: PathBuf, +} + +impl StorageDir { + pub fn new(stream_name: &str) -> Self { + let data_path = CONFIG.parseable.local_stream_data_path(stream_name); + + Self { data_path } + } + + pub fn file_time_suffix(time: NaiveDateTime, extention: &str) -> String { + let uri = utils::date_to_prefix(time.date()) + + &utils::hour_to_prefix(time.hour()) + + &utils::minute_to_prefix(time.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(); + let local_uri = str::replace(&uri, "/", "."); + let hostname = utils::hostname_unchecked(); + format!("{local_uri}{hostname}.{extention}") + } + + fn filename_by_time(stream_hash: &str, time: NaiveDateTime) -> String { + format!( + "{}.{}", + stream_hash, + Self::file_time_suffix(time, "data.arrows") + ) + } + + fn filename_by_current_time(stream_hash: &str) -> String { + let datetime = Utc::now(); + Self::filename_by_time(stream_hash, datetime.naive_utc()) + } + + pub fn path_by_current_time(&self, stream_hash: &str) -> PathBuf { + self.data_path + .join(Self::filename_by_current_time(stream_hash)) + } + + pub fn arrow_files(&self) -> Vec { + let Ok(dir) = self.data_path + .read_dir() else { return vec![] }; + + let paths: Vec = dir + .flatten() + .map(|file| file.path()) + .filter(|file| file.extension().map_or(false, |ext| ext.eq("arrows"))) + .collect(); + + paths + } + + #[allow(unused)] + pub fn arrow_files_grouped_by_time(&self) -> HashMap> { + // hashmap + let mut grouped_arrow_file: HashMap> = HashMap::new(); + let arrow_files = self.arrow_files(); + for arrow_file_path in arrow_files { + let key = Self::arrow_path_to_parquet(&arrow_file_path); + grouped_arrow_file + .entry(key) + .or_default() + .push(arrow_file_path); + } + + grouped_arrow_file + } + + pub fn arrow_files_grouped_exclude_time( + &self, + exclude: NaiveDateTime, + ) -> HashMap> { + let hot_filename = StorageDir::file_time_suffix(exclude, "data.arrow"); + // hashmap but exclude where hotfilename matches + let mut grouped_arrow_file: HashMap> = HashMap::new(); + let mut arrow_files = self.arrow_files(); + arrow_files.retain(|path| { + !path + .file_name() + .unwrap() + .to_str() + .unwrap() + .ends_with(&hot_filename) + }); + for arrow_file_path in arrow_files { + let key = Self::arrow_path_to_parquet(&arrow_file_path); + grouped_arrow_file + .entry(key) + .or_default() + .push(arrow_file_path); + } + + grouped_arrow_file + } + + pub fn parquet_files(&self) -> Vec { + let Ok(dir) = self.data_path + .read_dir() else { return vec![] }; + + dir.flatten() + .map(|file| file.path()) + .filter(|file| file.extension().map_or(false, |ext| ext.eq("parquet"))) + .collect() + } + + fn arrow_path_to_parquet(path: &Path) -> PathBuf { + let filename = path.file_name().unwrap().to_str().unwrap(); + let (_, filename) = filename.split_once('.').unwrap(); + let mut parquet_path = path.to_owned(); + parquet_path.set_file_name(filename); + parquet_path.set_extension("parquet"); + parquet_path + } +} + +pub fn to_parquet_path(stream_name: &str, time: NaiveDateTime) -> PathBuf { + let data_path = CONFIG.parseable.local_stream_data_path(stream_name); + let dir = StorageDir::file_time_suffix(time, "data.parquet"); + + data_path.join(dir) +} + +pub fn convert_disk_files_to_parquet( + stream: &str, + dir: &StorageDir, +) -> Result, MoveDataError> { + let mut schemas = Vec::new(); + + let time = chrono::Utc::now().naive_utc(); + let staging_files = dir.arrow_files_grouped_exclude_time(time); + if staging_files.is_empty() { + metrics::STAGING_FILES.with_label_values(&[stream]).set(0); + } + + for (parquet_path, files) in staging_files { + metrics::STAGING_FILES + .with_label_values(&[stream]) + .set(files.len() as i64); + + for file in &files { + let file_size = file.metadata().unwrap().len(); + let file_type = file.extension().unwrap().to_str().unwrap(); + + metrics::STORAGE_SIZE + .with_label_values(&["staging", stream, file_type]) + .add(file_size as i64); + } + + let record_reader = MergedRecordReader::try_new(&files).unwrap(); + + let parquet_file = fs::File::create(&parquet_path).map_err(|_| MoveDataError::Create)?; + + let props = parquet_writer_props().build(); + let merged_schema = record_reader.merged_schema(); + schemas.push(merged_schema.clone()); + let schema = Arc::new(merged_schema); + let mut writer = ArrowWriter::try_new(parquet_file, schema.clone(), Some(props))?; + + for ref record in record_reader.merged_iter(&schema) { + writer.write(record)?; + } + + writer.close()?; + + for file in files { + if fs::remove_file(file).is_err() { + log::error!("Failed to delete file. Unstable state"); + process::abort() + } + } + } + + if !schemas.is_empty() { + Ok(Some(Schema::try_merge(schemas).unwrap())) + } else { + Ok(None) + } +} + +pub fn convert_mem_to_parquet( + stream: &str, + read_buf: ReadBuf, +) -> Result, MoveDataError> { + let mut schemas = Vec::new(); + let ReadBuf { time, buf } = read_buf; + let Some(last_schema) = dbg!(buf.last().map(|last| last.schema())) else { return Ok(None) }; + schemas.push(last_schema.as_ref().clone()); + let record_reader = buf.into_iter().map(|rb| adapt_batch(&last_schema, rb)); + + let parquet_path = dbg!(to_parquet_path(stream, time)); + if let Some(path) = parquet_path.parent() { + fs::create_dir_all(path)?; + } + let parquet_file = dbg!(fs::File::create(&parquet_path)).map_err(|_| MoveDataError::Create)?; + + let props = parquet_writer_props().build(); + let mut writer = ArrowWriter::try_new(parquet_file, last_schema.clone(), Some(props))?; + + for ref record in record_reader { + writer.write(record)?; + } + + writer.close()?; + + if !schemas.is_empty() { + Ok(Some(Schema::try_merge(schemas).unwrap())) + } else { + Ok(None) + } +} + +fn parquet_writer_props() -> WriterPropertiesBuilder { + WriterProperties::builder() + .set_max_row_group_size(CONFIG.parseable.row_group_size) + .set_compression(CONFIG.parseable.parquet_compression.into()) + .set_column_encoding( + ColumnPath::new(vec!["p_timestamp".to_string()]), + Encoding::DELTA_BINARY_PACKED, + ) +} + +#[derive(Debug, thiserror::Error)] +pub enum MoveDataError { + #[error("Unable to create recordbatch stream")] + Arrow(#[from] ArrowError), + #[error("Could not generate parquet file")] + Parquet(#[from] ParquetError), + #[error("IO Error {0}")] + ObjectStorage(#[from] std::io::Error), + #[error("Could not generate parquet file")] + Create, +} diff --git a/server/src/utils.rs b/server/src/utils.rs index ed5213b79..29cf982d3 100644 --- a/server/src/utils.rs +++ b/server/src/utils.rs @@ -17,7 +17,6 @@ */ pub mod arrow; -pub mod batch_adapter; pub mod header_parsing; pub mod json; pub mod uid; diff --git a/server/src/utils/arrow.rs b/server/src/utils/arrow.rs index d9db95b1e..6b8dff675 100644 --- a/server/src/utils/arrow.rs +++ b/server/src/utils/arrow.rs @@ -23,6 +23,12 @@ use arrow_array::{Array, RecordBatch}; use arrow_schema::Schema; use itertools::Itertools; +pub mod batch_adapter; +pub mod merged_reader; + +pub use batch_adapter::adapt_batch; +pub use merged_reader::MergedRecordReader; + pub fn replace_columns( schema: Arc, batch: RecordBatch, diff --git a/server/src/utils/batch_adapter.rs b/server/src/utils/arrow/batch_adapter.rs similarity index 100% rename from server/src/utils/batch_adapter.rs rename to server/src/utils/arrow/batch_adapter.rs diff --git a/server/src/utils/arrow/merged_reader.rs b/server/src/utils/arrow/merged_reader.rs new file mode 100644 index 000000000..84b19c961 --- /dev/null +++ b/server/src/utils/arrow/merged_reader.rs @@ -0,0 +1,85 @@ +use std::{fs::File, path::PathBuf}; + +use arrow_array::{RecordBatch, TimestampMillisecondArray}; +use arrow_ipc::reader::StreamReader; +use arrow_schema::Schema; +use itertools::kmerge_by; + +use super::adapt_batch; + +#[derive(Debug)] +pub struct Reader { + reader: StreamReader, + timestamp_col_index: usize, +} + +impl From> for Reader { + fn from(reader: StreamReader) -> Self { + let timestamp_col_index = reader + .schema() + .all_fields() + .binary_search_by(|field| field.name().as_str().cmp("p_timestamp")) + .expect("schema should have this field"); + + Self { + reader, + timestamp_col_index, + } + } +} + +#[derive(Debug)] +pub struct MergedRecordReader { + pub readers: Vec, +} + +impl MergedRecordReader { + pub fn try_new(files: &[PathBuf]) -> Result { + let mut readers = Vec::with_capacity(files.len()); + + for file in files { + let reader = StreamReader::try_new(File::open(file).unwrap(), None).map_err(|_| ())?; + readers.push(reader.into()); + } + + Ok(Self { readers }) + } + + pub fn merged_iter(self, schema: &Schema) -> impl Iterator + '_ { + let adapted_readers = self.readers.into_iter().map(move |reader| { + reader + .reader + .flatten() + .zip(std::iter::repeat(reader.timestamp_col_index)) + }); + + kmerge_by( + adapted_readers, + |(a, a_col): &(RecordBatch, usize), (b, b_col): &(RecordBatch, usize)| { + let a: &TimestampMillisecondArray = a + .column(*a_col) + .as_any() + .downcast_ref::() + .unwrap(); + + let b: &TimestampMillisecondArray = b + .column(*b_col) + .as_any() + .downcast_ref::() + .unwrap(); + + a.value(0) < b.value(0) + }, + ) + .map(|(batch, _)| adapt_batch(schema, batch)) + } + + pub fn merged_schema(&self) -> Schema { + Schema::try_merge( + self.readers + .iter() + .map(|reader| reader.reader.schema().as_ref().clone()), + ) + .unwrap() + } +} From 8e55756c01ccd01df52489aa0e4b6bfe845fb62b Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Wed, 19 Apr 2023 19:42:45 +0530 Subject: [PATCH 2/8] add test --- server/src/event/writer/mutable.rs | 196 ++++++++++++++++++++++++++++- 1 file changed, 195 insertions(+), 1 deletion(-) diff --git a/server/src/event/writer/mutable.rs b/server/src/event/writer/mutable.rs index 716f9e446..51a54e749 100644 --- a/server/src/event/writer/mutable.rs +++ b/server/src/event/writer/mutable.rs @@ -544,10 +544,66 @@ mod tests { use std::sync::Arc; use arrow_array::{BooleanArray, RecordBatch}; - use arrow_schema::{DataType, Field, Schema}; + use arrow_schema::{DataType, Field, Schema, TimeUnit}; use super::{MutableColumnArray, MutableColumns}; + macro_rules! check_array_builder { + ($t:expr) => { + assert_eq!(MutableColumnArray::new(&$t).data_type(), $t) + }; + } + + macro_rules! check_unit_list_builder { + ($t:expr) => { + assert_eq!( + MutableColumnArray::new(&DataType::List(Box::new(Field::new("item", $t, true)))) + .data_type(), + DataType::List(Box::new(Field::new("item", $t, true))) + ) + }; + } + + macro_rules! check_nested_list_builder { + ($t:expr) => { + assert_eq!( + MutableColumnArray::new(&DataType::List(Box::new(Field::new( + "item", + DataType::List(Box::new(Field::new("item", $t, true))), + true + )))) + .data_type(), + DataType::List(Box::new(Field::new( + "item", + DataType::List(Box::new(Field::new("item", $t, true))), + true + ))) + ) + }; + } + + #[test] + fn create_mutable_col_and_check_datatype() { + check_array_builder!(DataType::Boolean); + check_array_builder!(DataType::Int64); + check_array_builder!(DataType::UInt64); + check_array_builder!(DataType::Float64); + check_array_builder!(DataType::Utf8); + check_array_builder!(DataType::Timestamp(TimeUnit::Millisecond, None)); + check_unit_list_builder!(DataType::Boolean); + check_unit_list_builder!(DataType::Int64); + check_unit_list_builder!(DataType::UInt64); + check_unit_list_builder!(DataType::Float64); + check_unit_list_builder!(DataType::Utf8); + check_unit_list_builder!(DataType::Timestamp(TimeUnit::Millisecond, None)); + check_nested_list_builder!(DataType::Boolean); + check_nested_list_builder!(DataType::Int64); + check_nested_list_builder!(DataType::UInt64); + check_nested_list_builder!(DataType::Float64); + check_nested_list_builder!(DataType::Utf8); + check_nested_list_builder!(DataType::Timestamp(TimeUnit::Millisecond, None)); + } + #[test] fn empty_columns_push_single_col() { let mut columns = MutableColumns::default(); @@ -814,4 +870,142 @@ mod tests { ) } } + + #[test] + fn two_empty_column_push_new_column_before() { + let mut columns = MutableColumns::default(); + let schema = Schema::new(vec![ + Field::new("b", DataType::Boolean, true), + Field::new("c", DataType::Boolean, true), + ]); + let rb = RecordBatch::new_empty(Arc::new(schema)); + columns.push(rb); + + assert_eq!(columns.columns.len(), 2); + assert_eq!(columns.len, 0); + + let schema = Schema::new(vec![Field::new("a", DataType::Boolean, true)]); + let rb = RecordBatch::try_new( + Arc::new(schema), + vec![Arc::new(BooleanArray::from(vec![true, false, false]))], + ) + .unwrap(); + columns.push(rb); + + assert_eq!(columns.columns.len(), 3); + assert_eq!(columns.len, 3); + + let MutableColumnArray::Boolean(builder) = &mut columns.columns[0].column else {unreachable!()}; + { + let arr = builder.finish(); + assert_eq!( + arr.iter().collect::>(), + vec![Some(true), Some(false), Some(false)] + ) + } + + let MutableColumnArray::Boolean(builder) = &mut columns.columns[1].column else {unreachable!()}; + { + let arr = builder.finish(); + assert_eq!(arr.iter().collect::>(), vec![None, None, None]) + } + + let MutableColumnArray::Boolean(builder) = &mut columns.columns[2].column else {unreachable!()}; + { + let arr = builder.finish(); + assert_eq!(arr.iter().collect::>(), vec![None, None, None]) + } + } + + #[test] + fn two_empty_column_push_new_column_middle() { + let mut columns = MutableColumns::default(); + let schema = Schema::new(vec![ + Field::new("a", DataType::Boolean, true), + Field::new("c", DataType::Boolean, true), + ]); + let rb = RecordBatch::new_empty(Arc::new(schema)); + columns.push(rb); + + assert_eq!(columns.columns.len(), 2); + assert_eq!(columns.len, 0); + + let schema = Schema::new(vec![Field::new("b", DataType::Boolean, true)]); + let rb = RecordBatch::try_new( + Arc::new(schema), + vec![Arc::new(BooleanArray::from(vec![false, true, false]))], + ) + .unwrap(); + columns.push(rb); + + assert_eq!(columns.columns.len(), 3); + assert_eq!(columns.len, 3); + + let MutableColumnArray::Boolean(builder) = &mut columns.columns[0].column else {unreachable!()}; + { + let arr = builder.finish(); + assert_eq!(arr.iter().collect::>(), vec![None, None, None]) + } + + let MutableColumnArray::Boolean(builder) = &mut columns.columns[1].column else {unreachable!()}; + { + let arr = builder.finish(); + assert_eq!( + arr.iter().collect::>(), + vec![Some(false), Some(true), Some(false)] + ) + } + + let MutableColumnArray::Boolean(builder) = &mut columns.columns[2].column else {unreachable!()}; + { + let arr = builder.finish(); + assert_eq!(arr.iter().collect::>(), vec![None, None, None]) + } + } + + #[test] + fn two_empty_column_push_new_column_after() { + let mut columns = MutableColumns::default(); + let schema = Schema::new(vec![ + Field::new("a", DataType::Boolean, true), + Field::new("b", DataType::Boolean, true), + ]); + let rb = RecordBatch::new_empty(Arc::new(schema)); + columns.push(rb); + + assert_eq!(columns.columns.len(), 2); + assert_eq!(columns.len, 0); + + let schema = Schema::new(vec![Field::new("c", DataType::Boolean, true)]); + let rb = RecordBatch::try_new( + Arc::new(schema), + vec![Arc::new(BooleanArray::from(vec![false, false, true]))], + ) + .unwrap(); + columns.push(rb); + + assert_eq!(columns.columns.len(), 3); + assert_eq!(columns.len, 3); + + let MutableColumnArray::Boolean(builder) = &mut columns.columns[0].column else {unreachable!()}; + { + let arr = builder.finish(); + assert_eq!(arr.iter().collect::>(), vec![None, None, None]) + } + + let MutableColumnArray::Boolean(builder) = &mut columns.columns[1].column else {unreachable!()}; + { + let arr = builder.finish(); + assert_eq!(arr.iter().collect::>(), vec![None, None, None]) + } + + let MutableColumnArray::Boolean(builder) = &mut columns.columns[2].column else {unreachable!()}; + { + let arr = builder.finish(); + assert_eq!( + arr.iter().collect::>(), + vec![Some(false), Some(false), Some(true)] + ) + } + } } From 51b045ae152d63f1e1278fa3e1d2056ce67243bd Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Wed, 19 Apr 2023 20:30:42 +0530 Subject: [PATCH 3/8] Add in memory configuration option --- server/src/event/writer.rs | 18 ++++++++++-------- server/src/option.rs | 18 ++++++++++++++++++ 2 files changed, 28 insertions(+), 8 deletions(-) diff --git a/server/src/event/writer.rs b/server/src/event/writer.rs index 2a1f39868..3bf0fdc09 100644 --- a/server/src/event/writer.rs +++ b/server/src/event/writer.rs @@ -26,7 +26,10 @@ use std::{ sync::{Mutex, RwLock}, }; -use crate::storage::staging::{self, ReadBuf}; +use crate::{ + option::CONFIG, + storage::staging::{self, ReadBuf}, +}; use self::{errors::StreamWriterError, file_writer::FileWriter}; use arrow_array::RecordBatch; @@ -44,12 +47,6 @@ pub enum StreamWriter { Disk(FileWriter), } -impl Default for StreamWriter { - fn default() -> Self { - StreamWriter::Mem(MemWriter::default()) - } -} - impl StreamWriter { pub fn push( &mut self, @@ -112,7 +109,12 @@ impl WriterTable { stream_name: stream_name.to_owned(), time: Utc::now().naive_utc(), }; - let mut writer = StreamWriter::default(); + let mut writer = if CONFIG.parseable.in_mem_ingestion { + StreamWriter::Mem(InMemWriter::default()) + } else { + StreamWriter::Disk(FileWriter::default()) + }; + writer.push(stream_name, schema_key, record)?; map.insert(stream_name.to_owned(), (Mutex::new(writer), context)); } diff --git a/server/src/option.rs b/server/src/option.rs index 585e29f4d..c735e4bfa 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -178,6 +178,9 @@ pub struct Server { /// Password for the basic authentication on the server pub password: String, + /// Server in memory ingestion stratergy + pub in_mem_ingestion: bool, + /// Server should check for update or not pub check_update: bool, @@ -221,6 +224,10 @@ impl FromArgMatches for Server { .get_one::(Self::PASSWORD) .cloned() .expect("default for password"); + self.in_mem_ingestion = m + .get_one::(Self::IN_MEM) + .cloned() + .expect("default for in memory ingestion"); self.check_update = m .get_one::(Self::CHECK_UPDATE) .cloned() @@ -261,6 +268,7 @@ impl Server { pub const UPLOAD_INTERVAL: &str = "upload-interval"; pub const USERNAME: &str = "username"; pub const PASSWORD: &str = "password"; + pub const IN_MEM: &str = "in-memory-ingestion"; pub const CHECK_UPDATE: &str = "check-update"; pub const SEND_ANALYTICS: &str = "send-analytics"; pub const ROW_GROUP_SIZE: &str = "row-group-size"; @@ -342,6 +350,16 @@ impl Server { .required(true) .help("Password for the basic authentication on the server"), ) + .arg( + Arg::new(Self::IN_MEM) + .long(Self::IN_MEM) + .env("P_IN_MEMORY_INGESITON") + .value_name("BOOL") + .required(false) + .default_value("false") + .value_parser(value_parser!(bool)) + .help("Disable/Enable in memory ingestion strategy"), + ) .arg( Arg::new(Self::CHECK_UPDATE) .long(Self::CHECK_UPDATE) From 58bc66ded76bb0f86d5aef47a4d38c62ca18ab6b Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Wed, 19 Apr 2023 20:36:09 +0530 Subject: [PATCH 4/8] Add banner --- server/src/event/writer/file_writer.rs | 19 +++++++++++++++++++ server/src/event/writer/mem_writer.rs | 19 +++++++++++++++++++ server/src/event/writer/mutable.rs | 19 +++++++++++++++++++ server/src/storage/staging.rs | 19 +++++++++++++++++++ server/src/utils/arrow/merged_reader.rs | 19 +++++++++++++++++++ 5 files changed, 95 insertions(+) diff --git a/server/src/event/writer/file_writer.rs b/server/src/event/writer/file_writer.rs index b4645aa2c..616ca0d10 100644 --- a/server/src/event/writer/file_writer.rs +++ b/server/src/event/writer/file_writer.rs @@ -1,3 +1,22 @@ +/* + * Parseable Server (C) 2022 - 2023 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * + */ + use arrow_array::RecordBatch; use arrow_ipc::writer::StreamWriter; use derive_more::{Deref, DerefMut}; diff --git a/server/src/event/writer/mem_writer.rs b/server/src/event/writer/mem_writer.rs index 1409d9d50..40eb68db9 100644 --- a/server/src/event/writer/mem_writer.rs +++ b/server/src/event/writer/mem_writer.rs @@ -1,3 +1,22 @@ +/* + * Parseable Server (C) 2022 - 2023 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * + */ + use std::sync::Arc; use arrow_array::RecordBatch; diff --git a/server/src/event/writer/mutable.rs b/server/src/event/writer/mutable.rs index 51a54e749..7fc1b9fcc 100644 --- a/server/src/event/writer/mutable.rs +++ b/server/src/event/writer/mutable.rs @@ -1,3 +1,22 @@ +/* + * Parseable Server (C) 2022 - 2023 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * + */ + use std::{cmp::Ordering, sync::Arc}; use arrow_array::{ diff --git a/server/src/storage/staging.rs b/server/src/storage/staging.rs index 6411e62e5..15f6a1cfc 100644 --- a/server/src/storage/staging.rs +++ b/server/src/storage/staging.rs @@ -1,3 +1,22 @@ +/* + * Parseable Server (C) 2022 - 2023 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * + */ + use std::{ collections::HashMap, fs, diff --git a/server/src/utils/arrow/merged_reader.rs b/server/src/utils/arrow/merged_reader.rs index 84b19c961..4c9feb5f7 100644 --- a/server/src/utils/arrow/merged_reader.rs +++ b/server/src/utils/arrow/merged_reader.rs @@ -1,3 +1,22 @@ +/* + * Parseable Server (C) 2022 - 2023 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * + */ + use std::{fs::File, path::PathBuf}; use arrow_array::{RecordBatch, TimestampMillisecondArray}; From 70aa3eabfaf9aa798854863a498029115333e39b Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Wed, 19 Apr 2023 21:54:10 +0530 Subject: [PATCH 5/8] Remove dbg --- server/src/event/writer.rs | 4 ---- server/src/storage/object_storage.rs | 4 +--- server/src/storage/staging.rs | 6 +++--- 3 files changed, 4 insertions(+), 10 deletions(-) diff --git a/server/src/event/writer.rs b/server/src/event/writer.rs index 3bf0fdc09..ea36b5e03 100644 --- a/server/src/event/writer.rs +++ b/server/src/event/writer.rs @@ -131,13 +131,11 @@ impl WriterTable { let mut table = self.write().unwrap(); let map = std::mem::take(&mut *table); drop(table); - dbg!(map.len()); for (writer, context) in map.into_values() { let writer = writer.into_inner().unwrap(); match writer { StreamWriter::Mem(mem) => { let rb = mem.finalize(); - dbg!(rb.len()); let mut read_bufs = staging::MEMORY_READ_BUFFERS.write().unwrap(); read_bufs @@ -147,8 +145,6 @@ impl WriterTable { time: context.time, buf: rb, }); - - dbg!(read_bufs.len()); } StreamWriter::Disk(disk) => disk.close_all(), } diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 892f4902f..1849cb579 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -247,10 +247,8 @@ pub trait ObjectStorage: Sync + 'static { let mut stream_stats = HashMap::new(); for (stream_name, bufs) in staging::take_all_read_bufs() { - dbg!(&stream_name, bufs.len()); for buf in bufs { - dbg!(&buf.time); - let schema = dbg!(convert_mem_to_parquet(&stream_name, buf)) + let schema = convert_mem_to_parquet(&stream_name, buf) .map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?; if let Some(schema) = schema { commit_schema_to_storage(&stream_name, schema).await?; diff --git a/server/src/storage/staging.rs b/server/src/storage/staging.rs index 15f6a1cfc..0b0538fff 100644 --- a/server/src/storage/staging.rs +++ b/server/src/storage/staging.rs @@ -252,15 +252,15 @@ pub fn convert_mem_to_parquet( ) -> Result, MoveDataError> { let mut schemas = Vec::new(); let ReadBuf { time, buf } = read_buf; - let Some(last_schema) = dbg!(buf.last().map(|last| last.schema())) else { return Ok(None) }; + let Some(last_schema) = buf.last().map(|last| last.schema()) else { return Ok(None) }; schemas.push(last_schema.as_ref().clone()); let record_reader = buf.into_iter().map(|rb| adapt_batch(&last_schema, rb)); - let parquet_path = dbg!(to_parquet_path(stream, time)); + let parquet_path = to_parquet_path(stream, time); if let Some(path) = parquet_path.parent() { fs::create_dir_all(path)?; } - let parquet_file = dbg!(fs::File::create(&parquet_path)).map_err(|_| MoveDataError::Create)?; + let parquet_file = fs::File::create(&parquet_path).map_err(|_| MoveDataError::Create)?; let props = parquet_writer_props().build(); let mut writer = ArrowWriter::try_new(parquet_file, last_schema.clone(), Some(props))?; From 8ebe2dd5f8061f2c1d1a625f355f8a51fa150712 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Thu, 20 Apr 2023 18:37:22 +0530 Subject: [PATCH 6/8] Fix --- server/src/event/writer/mem_writer.rs | 12 ++++++------ server/src/storage/staging.rs | 8 +------- 2 files changed, 7 insertions(+), 13 deletions(-) diff --git a/server/src/event/writer/mem_writer.rs b/server/src/event/writer/mem_writer.rs index 40eb68db9..c661fc427 100644 --- a/server/src/event/writer/mem_writer.rs +++ b/server/src/event/writer/mem_writer.rs @@ -33,16 +33,17 @@ pub struct MemWriter { impl MemWriter { pub fn push(&mut self, rb: RecordBatch) { - if self.mutable_buffer.len() + rb.num_rows() < N { - self.mutable_buffer.push(rb) - } else { + if self.mutable_buffer.len() + rb.num_rows() > N { + // init new mutable columns with schema of current let schema = self.mutable_buffer.current_schema(); let mut new_mutable_buffer = MutableColumns::default(); new_mutable_buffer.push(RecordBatch::new_empty(Arc::new(schema))); + // replace new mutable buffer with current one as that is full let mutable_buffer = std::mem::replace(&mut self.mutable_buffer, new_mutable_buffer); - let rb = mutable_buffer.into_recordbatch(); - self.read_buffer.push(rb); + let filled_rb = mutable_buffer.into_recordbatch(); + self.read_buffer.push(filled_rb); } + self.mutable_buffer.push(rb) } #[allow(unused)] @@ -67,7 +68,6 @@ impl MemWriter { if rb.num_rows() > 0 { read_buffer.push(rb) } - read_buffer .into_iter() .map(|rb| adapt_batch(&schema, rb)) diff --git a/server/src/storage/staging.rs b/server/src/storage/staging.rs index 0b0538fff..a140488f3 100644 --- a/server/src/storage/staging.rs +++ b/server/src/storage/staging.rs @@ -250,10 +250,8 @@ pub fn convert_mem_to_parquet( stream: &str, read_buf: ReadBuf, ) -> Result, MoveDataError> { - let mut schemas = Vec::new(); let ReadBuf { time, buf } = read_buf; let Some(last_schema) = buf.last().map(|last| last.schema()) else { return Ok(None) }; - schemas.push(last_schema.as_ref().clone()); let record_reader = buf.into_iter().map(|rb| adapt_batch(&last_schema, rb)); let parquet_path = to_parquet_path(stream, time); @@ -271,11 +269,7 @@ pub fn convert_mem_to_parquet( writer.close()?; - if !schemas.is_empty() { - Ok(Some(Schema::try_merge(schemas).unwrap())) - } else { - Ok(None) - } + Ok(Some(last_schema.as_ref().clone())) } fn parquet_writer_props() -> WriterPropertiesBuilder { From 56ecd5f9bae2adffaddfc50efd733005008f9cad Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Fri, 21 Apr 2023 13:02:51 +0530 Subject: [PATCH 7/8] Fix --- server/src/event/format/json.rs | 1 + server/src/stats.rs | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/event/format/json.rs b/server/src/event/format/json.rs index 523ffeaca..af9c1c874 100644 --- a/server/src/event/format/json.rs +++ b/server/src/event/format/json.rs @@ -214,6 +214,7 @@ fn valid_type(data_type: &DataType, value: &Value) -> bool { false } } + DataType::Timestamp(_, _) => value.is_string() || value.is_number(), _ => unreachable!(), } } diff --git a/server/src/stats.rs b/server/src/stats.rs index 945f83831..63b9e790e 100644 --- a/server/src/stats.rs +++ b/server/src/stats.rs @@ -72,7 +72,7 @@ impl StatsCounter { } pub fn increase_event_by_n(&self, n: u64) { - self.events_ingested.fetch_add(n, Ordering::Relaxed); + self.events_ingested.fetch_add(n, Ordering::AcqRel); } } From eef30edbb931a30ed0ff73829d71ae15b90e253e Mon Sep 17 00:00:00 2001 From: Nitish Tiwari Date: Fri, 21 Apr 2023 14:32:17 +0530 Subject: [PATCH 8/8] Fix names (#2) --- server/src/option.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/option.rs b/server/src/option.rs index c735e4bfa..a3a01c3d9 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -353,12 +353,12 @@ impl Server { .arg( Arg::new(Self::IN_MEM) .long(Self::IN_MEM) - .env("P_IN_MEMORY_INGESITON") + .env("P_MEMORY_STAGING") .value_name("BOOL") .required(false) .default_value("false") .value_parser(value_parser!(bool)) - .help("Disable/Enable in memory ingestion strategy"), + .help("Disable/Enable memory based data staging"), ) .arg( Arg::new(Self::CHECK_UPDATE)