Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion src/ore/src/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use bytes::{Buf, BufMut, Bytes, BytesMut};
use internal::SegmentedReader;
#[cfg(feature = "parquet")]
use parquet::errors::ParquetError;
use serde::{Deserialize, Serialize};
use smallvec::SmallVec;

#[cfg(feature = "parquet")]
Expand All @@ -46,7 +47,7 @@ use crate::cast::CastFrom;
/// [`smallvec::SmallVec`] to store our [`Bytes`] segments, and `N` is how many `Bytes` we'll
/// store inline before spilling to the heap. We default `N = 1`, so in the case of a single
/// `Bytes` segment, we avoid one layer of indirection.
#[derive(Clone, Debug, PartialEq, Eq)]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct SegmentedBytes<const N: usize = 1> {
/// Collection of non-contiguous segments, each segment is guaranteed to be non-empty.
segments: SmallVec<[(Bytes, Padding); N]>,
Expand Down
3 changes: 3 additions & 0 deletions src/persist/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,12 @@ prost = { version = "0.13.5", features = ["no-recursion-limit"] }
rand = { version = "0.8.5", features = ["small_rng"] }
reqwest = { version = "0.12", features = ["blocking", "json", "default-tls", "charset", "http2"], default-features = false }
serde = { version = "1.0.219", features = ["derive"] }
serde_json = { version = "1.0.145", optional = true }
timely = "0.25.1"
tokio = { version = "1.44.1", default-features = false, features = ["fs", "macros", "sync", "rt", "rt-multi-thread"] }
tokio-postgres = { version = "0.7.8" }
tracing = "0.1.37"
turmoil = { version = "0.7.0", optional = true }
url = "2.3.1"
urlencoding = "2.1.3"
uuid = { version = "1.18.1", features = ["v4"] }
Expand All @@ -88,6 +90,7 @@ prost-build = "0.13.5"

[features]
default = ["mz-build-tools/default", "workspace-hack"]
turmoil = ["dep:serde_json", "dep:turmoil"]

[package.metadata.cargo-udeps.ignore]
normal = ["workspace-hack", "sha2"]
22 changes: 22 additions & 0 deletions src/persist/src/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ pub enum BlobConfig {
Mem(bool),
/// Config for [AzureBlob].
Azure(AzureBlobConfig),
#[cfg(feature = "turmoil")]
/// Config for [crate::turmoil::TurmoilBlob].
Turmoil(crate::turmoil::BlobConfig),
}

/// Configuration knobs for [Blob].
Expand All @@ -77,6 +80,8 @@ impl BlobConfig {
BlobConfig::Mem(tombstone) => {
Ok(Arc::new(MemBlob::open(MemBlobConfig::new(tombstone))))
}
#[cfg(feature = "turmoil")]
BlobConfig::Turmoil(config) => Ok(Arc::new(crate::turmoil::TurmoilBlob::open(config))),
}
}

Expand Down Expand Up @@ -186,6 +191,11 @@ impl BlobConfig {
}
_ => Err(anyhow!("unknown persist blob scheme: {}", url.as_str())),
},
#[cfg(feature = "turmoil")]
"turmoil" => {
let cfg = crate::turmoil::BlobConfig::new(url);
Ok(BlobConfig::Turmoil(cfg))
}
p => Err(anyhow!(
"unknown persist blob scheme {}: {}",
p,
Expand Down Expand Up @@ -216,6 +226,9 @@ pub enum ConsensusConfig {
Postgres(PostgresConsensusConfig),
/// Config for [MemConsensus], only available in testing.
Mem,
#[cfg(feature = "turmoil")]
/// Config for [crate::turmoil::TurmoilConsensus].
Turmoil(crate::turmoil::ConsensusConfig),
}

impl ConsensusConfig {
Expand All @@ -226,6 +239,10 @@ impl ConsensusConfig {
Ok(Arc::new(PostgresConsensus::open(config).await?))
}
ConsensusConfig::Mem => Ok(Arc::new(MemConsensus::default())),
#[cfg(feature = "turmoil")]
ConsensusConfig::Turmoil(config) => {
Ok(Arc::new(crate::turmoil::TurmoilConsensus::open(config)))
}
}
}

Expand All @@ -246,6 +263,11 @@ impl ConsensusConfig {
}
Ok(ConsensusConfig::Mem)
}
#[cfg(feature = "turmoil")]
"turmoil" => {
let cfg = crate::turmoil::ConsensusConfig::new(url);
Ok(ConsensusConfig::Turmoil(cfg))
}
p => Err(anyhow!(
"unknown persist consensus scheme {}: {}",
p,
Expand Down
2 changes: 2 additions & 0 deletions src/persist/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,7 @@ pub mod metrics;
pub mod postgres;
pub mod retry;
pub mod s3;
#[cfg(feature = "turmoil")]
pub mod turmoil;
pub mod unreliable;
pub mod workload;
4 changes: 2 additions & 2 deletions src/persist/src/location.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ impl From<tokio::task::JoinError> for ExternalError {

/// An abstraction for a single arbitrarily-sized binary blob and an associated
/// version number (sequence number).
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct VersionedData {
/// The sequence number of the data.
pub seqno: SeqNo,
Expand All @@ -352,7 +352,7 @@ pub const SCAN_ALL: usize = u64_to_usize(i64::MAX as u64);
pub const CONSENSUS_HEAD_LIVENESS_KEY: &str = "LIVENESS";

/// Return type to indicate whether [Consensus::compare_and_set] succeeded or failed.
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub enum CaSResult {
/// The compare-and-set succeeded and committed new state.
Committed,
Expand Down
4 changes: 2 additions & 2 deletions src/persist/src/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ impl MemBlobConfig {
}

/// An in-memory implementation of [Blob].
#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct MemBlob {
core: Arc<tokio::sync::Mutex<MemBlobCore>>,
}
Expand Down Expand Up @@ -201,7 +201,7 @@ impl Blob for MemBlob {
}

/// An in-memory implementation of [Consensus].
#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct MemConsensus {
// TODO: This was intended to be a tokio::sync::Mutex but that seems to
// regularly deadlock in the `concurrency` test.
Expand Down
Loading