Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Initial journalling implementation #845

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ arrow-buffer = { version = "47.0.0" }
arrow-cast = { version = "47.0.0" }
arrow-csv = { version = "47.0.0" }
arrow-data = { version = "47.0.0" }
arrow-ipc = { version = "47.0.0" }
arrow-json = { version = "47.0.0" }
arrow-ord = { version = "47.0.0" }
arrow-schema = { version = "47.0.0", features = ["serde"] }
Expand Down
1 change: 1 addition & 0 deletions crates/sparrow-batch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Defines the main struct for wrapping RecordBatches in execution.
"""

[features]
default = []
testing = ["proptest"]

[dependencies]
Expand Down
50 changes: 42 additions & 8 deletions crates/sparrow-batch/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#[derive(Clone, PartialEq, Debug)]
pub struct Batch {
/// The data associated with the batch.
pub(crate) data: Option<BatchInfo>,
pub data: Option<BatchInfo>,

/// An indication that the batch stream has completed up to the given time.
/// Any rows in future batches on this stream must have a time strictly
Expand Down Expand Up @@ -475,6 +475,35 @@
RowTime::from_timestamp_ns(up_to_time),
)
}

/// Creates a batch with the given times and key hashes, and `null` data.
#[cfg(any(test, feature = "testing"))]
pub fn null_from(
time: impl Into<TimestampNanosecondArray>,
subsort: impl Into<arrow_array::UInt64Array>,
key_hash: impl Into<arrow_array::UInt64Array>,
up_to_time: i64,
) -> Self {
use arrow_array::NullArray;

let time: TimestampNanosecondArray = time.into();
let subsort: UInt64Array = subsort.into();
let key_hash: UInt64Array = key_hash.into();

let time: ArrayRef = Arc::new(time);
let subsort: ArrayRef = Arc::new(subsort);
let key_hash: ArrayRef = Arc::new(key_hash);

let data = Arc::new(NullArray::new(time.len()));

Batch::new_with_data(
data,
time,
subsort,
key_hash,
RowTime::from_timestamp_ns(up_to_time),
)
}
}

#[cfg(debug_assertions)]
Expand Down Expand Up @@ -602,13 +631,14 @@
}

#[derive(Clone, Debug)]
pub(crate) struct BatchInfo {
pub(crate) data: ArrayRef,
pub(crate) time: ArrayRef,
pub(crate) subsort: ArrayRef,
pub(crate) key_hash: ArrayRef,
min_present_time: RowTime,
max_present_time: RowTime,
#[non_exhaustive]
pub struct BatchInfo {
pub data: ArrayRef,
pub time: ArrayRef,
pub subsort: ArrayRef,
pub key_hash: ArrayRef,
pub min_present_time: RowTime,
pub max_present_time: RowTime,
}

impl PartialEq for BatchInfo {
Expand Down Expand Up @@ -709,6 +739,10 @@
pub(crate) fn key_hash(&self) -> &UInt64Array {
self.key_hash.as_primitive()
}

pub fn len(&self) -> usize {

Check failure on line 743 in crates/sparrow-batch/src/batch.rs

View workflow job for this annotation

GitHub Actions / clippy

struct `BatchInfo` has a public `len` method, but no `is_empty` method

error: struct `BatchInfo` has a public `len` method, but no `is_empty` method --> crates/sparrow-batch/src/batch.rs:743:5 | 743 | pub fn len(&self) -> usize { | ^^^^^^^^^^^^^^^^^^^^^^^^^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#len_without_is_empty = note: `-D clippy::len-without-is-empty` implied by `-D warnings`
self.time.len()
}
}

#[cfg(any(test, feature = "testing"))]
Expand Down
36 changes: 36 additions & 0 deletions crates/sparrow-batch/src/row_time.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use std::str::FromStr;

use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos;
use arrow_array::temporal_conversions::timestamp_ns_to_datetime;
use error_stack::{IntoReport, ResultExt};

/// Wrapper around the time of a row.
///
Expand Down Expand Up @@ -26,6 +30,15 @@ impl RowTime {
pub fn pred(&self) -> Self {
Self(self.0 - 1)
}

pub fn bytes(&self) -> [u8; 8] {
self.0.to_be_bytes()
}

pub fn from_bytes(bytes: [u8; 8]) -> Self {
let time = i64::from_be_bytes(bytes);
Self(time)
}
}

impl From<RowTime> for i64 {
Expand All @@ -39,3 +52,26 @@ impl From<i64> for RowTime {
RowTime(value)
}
}

impl From<RowTime> for String {
fn from(val: RowTime) -> Self {
format!("{val}")
}
}

#[derive(derive_more::Display, Debug)]
#[display(fmt = "invalid row time: {_0}")]
pub struct ParseError(String);

impl error_stack::Context for ParseError {}

impl FromStr for RowTime {
type Err = error_stack::Report<ParseError>;

fn from_str(s: &str) -> Result<Self, Self::Err> {
string_to_timestamp_nanos(s)
.into_report()
.change_context_lazy(|| ParseError(s.to_string()))
.map(Self)
}
}
32 changes: 32 additions & 0 deletions crates/sparrow-journal/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
[package]
name = "sparrow-journal"
version.workspace = true
authors.workspace = true
edition.workspace = true
license.workspace = true
publish = false
description = """
Durable journaling of batches.
"""

[dependencies]
arrow-array.workspace = true
arrow-buffer.workspace = true
arrow-ipc.workspace = true
arrow-schema.workspace = true
derive_more.workspace = true
error-stack.workspace = true
futures.workspace = true
itertools.workspace = true
okaywal = "0.3.0"
parking_lot.workspace = true
sparrow-batch = { path = "../sparrow-batch" }
tracing.workspace = true

[dev-dependencies]
sparrow-batch = { path = "../sparrow-batch", features = ["testing"] }
sparrow-testing = { path = "../sparrow-testing" }
tempfile.workspace = true

[lib]
doctest = false
Loading
Loading