Skip to content

Append arrow directly to local data #100

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 136 additions & 76 deletions server/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,143 @@
*/
use datafusion::arrow;
use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::ipc::writer::StreamWriter;
use datafusion::arrow::json;
use datafusion::arrow::json::reader::infer_json_schema;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::parquet::arrow::{ArrowReader, ArrowWriter, ParquetFileArrowReader};
use datafusion::parquet::file::properties::WriterProperties;
use datafusion::parquet::file::serialized_reader::SerializedFileReader;
use lazy_static::lazy_static;
use log::error;
use std::fs;
use std::collections::HashMap;
use std::fs::OpenOptions;
use std::io::BufReader;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::RwLock;

use crate::metadata;
use crate::metadata::STREAM_INFO;
use crate::option::CONFIG;
use crate::response;
use crate::storage::ObjectStorage;
use crate::Error;

type LocalWriter = Mutex<Option<StreamWriter<std::fs::File>>>;

lazy_static! {
#[derive(Default)]
pub static ref STREAM_WRITERS: RwLock<HashMap<String, LocalWriter>> = RwLock::new(HashMap::new());
}

impl STREAM_WRITERS {
// append to a existing stream
fn append_to_local(stream: &str, record: &RecordBatch) -> Result<(), ()> {
let hashmap_guard = STREAM_WRITERS.read().unwrap();
match hashmap_guard.get(stream) {
Some(localwriter) => {
let mut writer_guard = localwriter.lock().unwrap();
if let Some(ref mut writer) = *writer_guard {
writer.write(record).map_err(|_| ())?;
} else {
drop(writer_guard);
drop(hashmap_guard);
STREAM_WRITERS::set_entry(stream, record).unwrap();
}
}
None => {
drop(hashmap_guard);
STREAM_WRITERS::create_entry(stream.to_string(), record).unwrap();
}
};
Ok(())
}

// create a new entry with new stream_writer
// todo: error type
// Only create entry for valid streams
fn create_entry(stream: String, record: &RecordBatch) -> Result<(), ()> {
let mut hashmap_guard = STREAM_WRITERS.write().unwrap();

if STREAM_INFO.schema(&stream).is_err() {
return Err(());
}

let file = OpenOptions::new()
.append(true)
.create_new(true)
.open(data_file_path(&stream))
.map_err(|_| ())?;

let mut stream_writer = StreamWriter::try_new(file, &record.schema()).map_err(|_| ())?;
stream_writer.write(record).map_err(|_| ())?;

hashmap_guard.insert(stream, Mutex::new(Some(stream_writer)));

Ok(())
}

// Deleting a logstream requires that metadata is deleted first
pub fn delete_entry(stream: &str) -> Result<(), ()> {
let mut hashmap_guard = STREAM_WRITERS.write().unwrap();

if STREAM_INFO.schema(stream).is_ok() {
return Err(());
}

hashmap_guard.remove(stream);

Ok(())
}

fn set_entry(stream: &str, record: &RecordBatch) -> Result<(), ()> {
let file = OpenOptions::new()
.append(true)
.create_new(true)
.open(data_file_path(stream))
.map_err(|_| ())?;

let mut stream_writer = StreamWriter::try_new(file, &record.schema()).map_err(|_| ())?;
stream_writer.write(record).map_err(|_| ())?;

STREAM_WRITERS
.read()
.expect("Current Thread should not hold any lock")
.get(stream)
.expect("set entry is only called on valid entries")
.lock()
.expect("Poisioning is not handled yet")
.replace(stream_writer); // replace the stream writer behind this mutex

Ok(())
}

// Unset the entry so that
pub fn unset_entry(stream: &str) {
let guard = STREAM_WRITERS.read().unwrap();
let stream_writer = match guard.get(stream) {
Some(writer) => writer,
None => return,
};
stream_writer
.lock()
.expect("Poisioning is not handled yet")
.take();
}
}

#[derive(Debug, thiserror::Error)]
enum StreamWriterError {}

fn data_file_path(stream_name: &str) -> String {
format!(
"{}/{}",
CONFIG
.parseable
.local_stream_data_path(stream_name)
.to_string_lossy(),
"data.records"
)
}

#[derive(Clone)]
pub struct Event {
pub body: String,
Expand All @@ -44,17 +164,6 @@ pub struct Event {
// Events holds the schema related to a each event for a single log stream

impl Event {
fn data_file_path(&self) -> String {
format!(
"{}/{}",
CONFIG
.parseable
.local_stream_data_path(&self.stream_name)
.to_string_lossy(),
"data.parquet"
)
}

pub async fn process(
&self,
storage: &impl ObjectStorage,
Expand All @@ -65,12 +174,11 @@ impl Event {
})?;

let event = self.get_reader(inferred_schema.clone());
let size = self.body_size();

let stream_schema = metadata::STREAM_INFO.schema(&self.stream_name)?;
let is_first_event = stream_schema.is_none();

let compressed_size = if let Some(existing_schema) = stream_schema {
if let Some(existing_schema) = stream_schema {
// validate schema before processing the event
if existing_schema != inferred_schema {
return Err(Error::SchemaMismatch(self.stream_name.clone()));
Expand All @@ -84,11 +192,6 @@ impl Event {
.await?
};

if let Err(e) = metadata::STREAM_INFO.update_stats(&self.stream_name, size, compressed_size)
{
error!("Couldn't update stream stats. {:?}", e);
}

if let Err(e) = metadata::STREAM_INFO.check_alerts(self).await {
error!("Error checking for alerts. {:?}", e);
}
Expand All @@ -115,59 +218,44 @@ impl Event {
storage: &impl ObjectStorage,
) -> Result<u64, Error> {
let rb = event.next()?.ok_or(Error::MissingRecord)?;
let stream_name = &self.stream_name;

// Store record batch to Parquet file on local cache
let compressed_size = self.convert_arrow_parquet(rb)?;
// Store record batch on local cache
STREAM_WRITERS::create_entry(stream_name.clone(), &rb).unwrap();

// Put the inferred schema to object store
let stream_name = &self.stream_name;

storage
.put_schema(stream_name.clone(), &schema)
.await
.map_err(|e| response::EventError {
msg: format!(
"Failed to upload schema for log stream {} due to err: {}",
self.stream_name, e
stream_name, e
),
})?;

// set the schema in memory for this stream
metadata::STREAM_INFO
.set_schema(&self.stream_name, schema)
.set_schema(stream_name, schema)
.map_err(|e| response::EventError {
msg: format!(
"Failed to set schema for log stream {} due to err: {}",
&self.stream_name, e
stream_name, e
),
})?;

Ok(compressed_size)
Ok(0)
}

// event process all events after the 1st event. Concatenates record batches
// and puts them in memory store for each event.
fn process_event<R: std::io::Read>(&self, mut event: json::Reader<R>) -> Result<u64, Error> {
let next_event_rb = event.next()?.ok_or(Error::MissingRecord)?;

let compressed_size = match self.convert_parquet_rb_reader() {
Ok(mut arrow_reader) => {
let mut total_size = 0;
let rb = arrow_reader.get_record_reader(2048).unwrap();
for prev_rb in rb {
let new_rb = RecordBatch::concat(
&std::sync::Arc::new(arrow_reader.get_schema().unwrap()),
&[next_event_rb.clone(), prev_rb.unwrap()],
)?;
total_size += self.convert_arrow_parquet(new_rb)?;
}
let rb = event.next()?.ok_or(Error::MissingRecord)?;
let stream_name = &self.stream_name;

total_size
}
Err(_) => self.convert_arrow_parquet(next_event_rb)?,
};
STREAM_WRITERS::append_to_local(stream_name, &rb).map_err(|_| Error::MissingRecord)?;

Ok(compressed_size)
Ok(0)
}

// inferSchema is a constructor to Schema
Expand All @@ -187,32 +275,4 @@ impl Event {
json::reader::DecoderOptions::new().with_batch_size(1024),
)
}

fn body_size(&self) -> u64 {
self.body.as_bytes().len() as u64
}

// convert arrow record batch to parquet
// and write it to local cache path as a data.parquet file.
fn convert_arrow_parquet(&self, rb: RecordBatch) -> Result<u64, Error> {
let parquet_path = self.data_file_path();
let parquet_file = fs::File::create(&parquet_path)?;
let props = WriterProperties::builder().build();
let mut writer =
ArrowWriter::try_new(parquet_file, Arc::new(self.infer_schema()?), Some(props))?;
writer.write(&rb)?;
writer.close()?;

let compressed_size = fs::metadata(parquet_path)?.len();

Ok(compressed_size)
}

pub fn convert_parquet_rb_reader(&self) -> Result<ParquetFileArrowReader, Error> {
let file = fs::File::open(&self.data_file_path())?;
let file_reader = SerializedFileReader::new(file)?;
let arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader));

Ok(arrow_reader)
}
}
23 changes: 15 additions & 8 deletions server/src/handlers/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ use actix_web::http::StatusCode;
use actix_web::{web, HttpRequest, HttpResponse, Responder};

use crate::alerts::Alerts;
use crate::response;
use crate::s3::S3;
use crate::storage::{ObjectStorage, StorageDir};
use crate::{event, response};
use crate::{metadata, validator};

pub async fn delete(req: HttpRequest) -> HttpResponse {
Expand Down Expand Up @@ -59,20 +59,27 @@ pub async fn delete(req: HttpRequest) -> HttpResponse {
.to_http();
}

let stream_dir = StorageDir::new(&stream_name);
if fs::remove_dir_all(&stream_dir.data_path).is_err() {
if let Err(e) = metadata::STREAM_INFO.delete_stream(&stream_name) {
log::warn!(
"failed to delete local data for stream {}. Clean {} manually",
"failed to delete log stream {} from metadata due to err: {}",
stream_name,
stream_dir.data_path.to_string_lossy()
e
)
}

if let Err(e) = metadata::STREAM_INFO.delete_stream(&stream_name) {
if event::STREAM_WRITERS::delete_entry(&stream_name).is_err() {
log::warn!(
"failed to delete log stream {} from metadata due to err: {}",
"failed to delete log stream event writers for stream {}",
stream_name
)
}

let stream_dir = StorageDir::new(stream_name.clone());
if fs::remove_dir_all(&stream_dir.data_path).is_err() {
log::warn!(
"failed to delete local data for stream {}. Clean {} manually",
stream_name,
e
stream_dir.data_path.to_string_lossy()
)
}

Expand Down
15 changes: 8 additions & 7 deletions server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ async fn main() -> anyhow::Result<()> {
warn!("could not populate local metadata. {:?}", e);
}

// Move all exiting data.parquet file to their respective tmp directory
// Move all exiting data.records file to their respective tmp directory
// they will be synced to object store on next s3 sync cycle
startup_sync();

Expand Down Expand Up @@ -112,11 +112,12 @@ fn startup_sync() {
}

for stream in metadata::STREAM_INFO.list_streams() {
let dir = StorageDir::new(&stream);
// if data.parquet file is not present then skip this stream
if !dir.parquet_path_exists() {
let dir = StorageDir::new(stream.clone());
// if data.records file is not present then skip this stream
if !dir.local_data_exists() {
continue;
}

if let Err(e) = dir.create_temp_dir() {
log::error!(
"Error creating tmp directory for {} due to error [{}]",
Expand All @@ -126,7 +127,7 @@ fn startup_sync() {
continue;
}
// create prefix for this file from its last modified time
let path = dir.data_path.join("data.parquet");
let path = dir.data_path.join("data.records");

// metadata.modified gives us system time
// This may not work on all platfomns
Expand Down Expand Up @@ -156,7 +157,7 @@ fn startup_sync() {
let local_uri = str::replace(&uri, "/", ".");
let hostname = utils::hostname_unchecked();
let parquet_file_local = format!("{}{}.data.parquet", local_uri, hostname);
if let Err(err) = dir.move_parquet_to_temp(parquet_file_local) {
if let Err(err) = dir.move_local_to_temp(parquet_file_local) {
log::warn!(
"Failed to move parquet file at {} to tmp directory due to error {}",
path.display(),
Expand Down Expand Up @@ -269,7 +270,7 @@ async fn run_http() -> anyhow::Result<()> {
(_, _) => None,
};

let http_server = HttpServer::new(move || create_app!());
let http_server = HttpServer::new(move || create_app!()).workers(num_cpus::get() - 1);
if let Some(builder) = ssl_acceptor {
http_server
.bind_openssl(&CONFIG.parseable.address, builder)?
Expand Down
Loading