diff --git a/Cargo.lock b/Cargo.lock index b7bf2737d57..9e862373306 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1314,6 +1314,7 @@ dependencies = [ "reqwest 0.12.9", "serde", "serde_json", + "siphasher", "tokio", "tower-http 0.6.2", "tracing", @@ -5314,6 +5315,12 @@ dependencies = [ "rand_core", ] +[[package]] +name = "siphasher" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d" + [[package]] name = "sketches-ddsketch" version = "0.2.2" diff --git a/rust/load/Cargo.toml b/rust/load/Cargo.toml index e34d0bd9cb9..48e248cce67 100644 --- a/rust/load/Cargo.toml +++ b/rust/load/Cargo.toml @@ -28,3 +28,4 @@ chromadb = { git = "https://github.com/rescrv/chromadb-rs", rev = "e364e35c34c66 guacamole = { version = "0.9", default-features = false } tower-http = { version = "0.6.2", features = ["trace"] } reqwest = { version = "0.12", features = ["json"] } +siphasher = "1.0.1" diff --git a/rust/load/examples/workload-json.rs b/rust/load/examples/workload-json.rs index d350bddadb6..af379fff446 100644 --- a/rust/load/examples/workload-json.rs +++ b/rust/load/examples/workload-json.rs @@ -1,4 +1,4 @@ -use chroma_load::{Distribution, GetQuery, QueryQuery, Workload}; +use chroma_load::{Distribution, GetQuery, QueryQuery, Skew, Workload}; fn main() { let w = Workload::Hybrid(vec![ @@ -7,6 +7,7 @@ fn main() { ( 1.0, Workload::Get(GetQuery { + skew: Skew::Zipf { theta: 0.999 }, limit: Distribution::Constant(10), document: None, metadata: None, @@ -15,6 +16,7 @@ fn main() { ( 1.0, Workload::Query(QueryQuery { + skew: Skew::Zipf { theta: 0.999 }, limit: Distribution::Constant(10), document: None, metadata: None, diff --git a/rust/load/src/bit_difference.rs b/rust/load/src/bit_difference.rs new file mode 100644 index 00000000000..e7790d283b9 --- /dev/null +++ b/rust/load/src/bit_difference.rs @@ -0,0 +1,460 @@ +//! This is a synthetic data set that generates documents by using a sparse embedding over a fixed +//! set of words. Essentially, each dimension of the embedding corresponds to a word and is zero +//! when the word is absent and one when the word is present. In two dimensions, the space looks +//! like this: +//! +//! ```text +//! │ 01 11 +//! │ +//! │ 00 10 +//! └──────── +//! ``` +//! +//! The algorithm is simple: Pick a random point in space, which we shall call the center of a +//! cluster, and then generate a number of documents around that point. The number of documents is +//! parameterized. The idea being that these documents, by nature of the randomness of the center, +//! will be more alike to each other than to any random document in the space. Given a random +//! point, we can recall that point measure the recall of the documents in the cluster. +//! +//! To generate clusters randomly, we use the `guacamole` crate. Guacamole provides a fast way to +//! seed a random number generator, and provides predictability in the seed so that if you pick two +//! numbers that are far apart, the streams are predictably far apart. Given a guacamole stream, +//! we can generate the same clusters deterministically by using the same set of seeds, one per +//! cluster (this is why being quick to seek is important). +//! +//! Internally, guacamole provides primitives that make it easy to manage the set of seeds to get a +//! variety of data sets out of the synthetic data. + +use chromadb::v2::collection::{CollectionEntries, GetOptions, QueryOptions}; +use chromadb::v2::ChromaClient; +use guacamole::combinators::*; +use guacamole::{FromGuacamole, Guacamole, Zipf}; +use siphasher::sip::SipHasher24; +use tracing::Instrument; + +use crate::words::WORDS; +use crate::{DataSet, GetQuery, KeySelector, QueryQuery, Skew, UpsertQuery}; + +const EMBEDDING_BYTES: usize = 128; +const EMBEDDING_SIZE: usize = 8 * EMBEDDING_BYTES; + +/// This magic constant for hashing gives a consistent way to compute the document ID from the +/// document content when paired with siphasher. +pub const MAGIC_CONSTANT_FOR_HASHING: [u8; 16] = [ + 0x63, 0x68, 0x72, 0x6f, 0x6d, 0x61, 0x20, 0x62, 0x65, 0x6e, 0x63, 0x68, 0x6d, 0x61, 0x72, 0x6b, +]; + +fn embedding(embedding: [u8; EMBEDDING_BYTES]) -> Vec { + let mut result = vec![]; + for byte in embedding.into_iter() { + for j in 0..8 { + if byte & (1 << j) != 0 { + result.push(1.0); + } else { + result.push(0.0); + } + } + } + result +} + +/// Options for generating synthetic data. +#[derive(Clone, Debug)] +pub struct EmbeddingOptions { + /// The number of clusters to generate. + pub num_clusters: usize, + /// The seed for the random number generator for the clusters. Given the same seed, the + /// clusters will overlap. For example, if you generate 10 clusters with the same seed and + /// then update the num_clusters to 100, you will generate the same 10 clusters and 90 new + /// clusters. + pub seed_clusters: usize, + /// Clustering options. + pub clustering: ClusterOptions, +} + +/// Options for generating a single cluster. +#[derive(Clone, Debug)] +pub struct ClusterOptions { + /// The maximum number of adjacent documents. + pub max_adjacent: u32, + /// The theta of the zipf distribution for the number of adjacent documents. + pub adjacent_theta: f64, +} + +/// A document is a string of content. +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct Document { + /// The content of the document. + pub content: String, +} + +impl Document { + /// The ID of the document. This ID is a deterministic function of document content. + pub fn id(&self) -> String { + let hasher = SipHasher24::new_with_key(&MAGIC_CONSTANT_FOR_HASHING); + let h = hasher.hash(self.content.as_bytes()); + format!("doc:{}", h) + } + + /// The embedding of the document. This embedding is a deterministic function of document + /// content. + pub fn embedding(&self) -> Vec { + let mut result = vec![]; + let words = self.content.split_whitespace().collect::>(); + for word in WORDS.iter() { + if words.contains(word) { + result.push(1.0); + } else { + result.push(0.0); + } + } + result + } +} + +impl From<[u8; EMBEDDING_BYTES]> for Document { + fn from(embedding: [u8; EMBEDDING_BYTES]) -> Document { + let document = WORDS + .iter() + .enumerate() + .filter_map(|(idx, word)| { + // If the idx'th bit is set... + if embedding[idx >> 3] & (1 << (idx & 0x7)) != 0 { + Some(*word) + } else { + None + } + }) + .collect::>() + .join(" "); + Document { content: document } + } +} + +/// The representation of a single cluster. +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct Cluster { + pub cluster_id: u64, + pub center: [u8; EMBEDDING_BYTES], + pub docs: Vec, +} + +impl FromGuacamole for Cluster { + fn from_guacamole(co: &mut ClusterOptions, guac: &mut Guacamole) -> Cluster { + let cluster_id: u64 = any(guac); + let mut embedding = [0u8; EMBEDDING_BYTES]; + guac.generate(&mut embedding); + let center = embedding; + let num_adjacent = range_to(co.max_adjacent)(guac); + let mut bits_to_flip = vec![]; + while bits_to_flip.len() < num_adjacent as usize { + let bit_to_flip: u16 = any(guac); + let bit_to_flip = bit_to_flip as usize & (EMBEDDING_SIZE - 1); + if !bits_to_flip.contains(&bit_to_flip) { + bits_to_flip.push(bit_to_flip); + } + } + let mut docs = vec![]; + for bit_to_flip in bits_to_flip.into_iter() { + let mut embedding = embedding; + embedding[bit_to_flip >> 3] ^= 1 << (bit_to_flip & 0x7); + docs.push(Document::from(embedding)); + } + Cluster { + cluster_id, + center, + docs, + } + } +} + +/// A synthetic data set that generates documents and sparse embeddings. +#[derive(Clone, Debug)] +pub struct SyntheticDataSet { + collection: String, + embedding_options: EmbeddingOptions, +} + +impl SyntheticDataSet { + /// Create a new synthetic data set. The collection name will be the name of the document. + pub fn new(collection: String, embedding_options: EmbeddingOptions) -> SyntheticDataSet { + SyntheticDataSet { + collection, + embedding_options, + } + } + + /// Generate cluster by index. If the index is greater than the number of clusters, it will + /// wrap. + fn cluster_by_index(&self, idx: usize) -> Cluster { + let mut eo = self.embedding_options.clone(); + let unique_index = unique_set_index(eo.seed_clusters)(idx % eo.num_clusters); + let cluster: Cluster = from_seed(from(&mut eo.clustering))(unique_index); + cluster + } + + /// Generate a cluster according to the skew function. + fn cluster_by_skew(&self, skew: Skew, guac: &mut Guacamole) -> Cluster { + let eo = self.embedding_options.clone(); + match skew { + Skew::Uniform => { + set_element(range_to(eo.num_clusters), |idx| self.cluster_by_index(idx))(guac) + } + Skew::Zipf { theta } => { + let zipf = Zipf::from_theta(eo.num_clusters as u64, theta); + let cluster: Cluster = set_element( + |guac| zipf.next(guac) as usize, + |idx| self.cluster_by_index(idx), + )(guac); + cluster + } + } + } + + fn sample_ids(&self, skew: Skew, guac: &mut Guacamole, limit: usize) -> Vec { + let mut ids = vec![]; + for _ in 0..limit { + let cluster = self.cluster_by_skew(skew, guac); + let doc_idx = (any::(guac) as u64 * cluster.docs.len() as u64) >> 32; + ids.push(cluster.docs[doc_idx as usize].id()); + } + ids + } + + async fn upsert_sequential( + &self, + client: &ChromaClient, + _: UpsertQuery, + idx: usize, + _: &mut Guacamole, + ) -> Result<(), Box> { + let collection = client.get_or_create_collection(&self.name(), None).await?; + let mut ids = vec![]; + let mut embeddings = vec![]; + let mut docs = vec![]; + let cluster = self.cluster_by_index(idx); + for doc in cluster.docs.iter() { + ids.push(doc.id()); + embeddings.push(doc.embedding()); + docs.push(doc.content.as_str()); + } + let ids = ids.iter().map(String::as_str).collect(); + let entries = CollectionEntries { + ids, + embeddings: Some(embeddings), + metadatas: None, + documents: Some(docs), + }; + let results = collection + .upsert(entries, None) + .instrument(tracing::info_span!("upsert_sequential")) + .await; + let _results = results?; + Ok(()) + } + + async fn upsert_random( + &self, + client: &ChromaClient, + uq: UpsertQuery, + skew: Skew, + guac: &mut Guacamole, + ) -> Result<(), Box> { + let collection = client.get_or_create_collection(&self.name(), None).await?; + let mut ids = vec![]; + let mut embeddings = vec![]; + let mut docs = vec![]; + for _ in 0..uq.batch_size { + let cluster = self.cluster_by_skew(skew, guac); + let num_this_cluster = (cluster.docs.len() as f64 * uq.associativity).ceil() as usize; + for _ in 0..num_this_cluster { + let doc_idx = (any::(guac) as u64 * cluster.docs.len() as u64) >> 32; + ids.push(cluster.docs[doc_idx as usize].id()); + embeddings.push(cluster.docs[doc_idx as usize].embedding()); + docs.push(cluster.docs[doc_idx as usize].content.clone()); + } + } + let ids = ids.iter().map(String::as_str).collect(); + let docs = docs.iter().map(String::as_str).collect(); + let entries = CollectionEntries { + ids, + embeddings: Some(embeddings), + documents: Some(docs), + ..Default::default() + }; + let results = collection + .upsert(entries, None) + .instrument(tracing::info_span!("upsert_random")) + .await; + let _results = results?; + Ok(()) + } +} + +#[async_trait::async_trait] +impl DataSet for SyntheticDataSet { + fn name(&self) -> String { + self.collection.clone() + } + + fn description(&self) -> String { + "a synthetic data set".to_string() + } + + fn json(&self) -> serde_json::Value { + serde_json::json! {{}} + } + + async fn get( + &self, + client: &ChromaClient, + gq: GetQuery, + guac: &mut Guacamole, + ) -> Result<(), Box> { + let collection = client.get_or_create_collection(&self.name(), None).await?; + let limit = gq.limit.sample(guac); + let mut ids = self.sample_ids(gq.skew, guac, limit); + let where_metadata = gq.metadata.map(|m| m.into_where_metadata(guac)); + let where_document = gq.document.map(|m| m.into_where_document(guac)); + let results = collection + .get(GetOptions { + ids: ids.clone(), + where_metadata: where_metadata.clone(), + limit: Some(limit), + offset: None, + where_document: where_document.clone(), + include: None, + }) + .instrument(tracing::info_span!("get", limit = limit)) + .await; + let mut results = results?; + ids.sort(); + results.ids.sort(); + if where_metadata.is_none() && where_document.is_none() && results.ids != ids { + return Err(format!("expected {:?} but got {:?}", ids, results.ids).into()); + } + Ok(()) + } + + async fn query( + &self, + client: &ChromaClient, + vq: QueryQuery, + guac: &mut Guacamole, + ) -> Result<(), Box> { + let collection = client.get_or_create_collection(&self.name(), None).await?; + let cluster = self.cluster_by_skew(vq.skew, guac); + let where_metadata = vq.metadata.map(|m| m.into_where_metadata(guac)); + let where_document = vq.document.map(|m| m.into_where_document(guac)); + let results = collection + .query( + QueryOptions { + query_embeddings: Some(vec![embedding(cluster.center)]), + n_results: Some(cluster.docs.len()), + include: None, + query_texts: None, + where_metadata: where_metadata.clone(), + where_document: where_document.clone(), + }, + None, + ) + .instrument(tracing::info_span!("query")) + .await; + let results = results?; + if where_metadata.is_none() && where_document.is_none() { + println!("expected {:?} but got {:?}", cluster.docs, results.ids); + } + Ok(()) + } + + async fn upsert( + &self, + client: &ChromaClient, + uq: UpsertQuery, + guac: &mut Guacamole, + ) -> Result<(), Box> { + match uq.key { + KeySelector::Index(idx) => self.upsert_sequential(client, uq, idx, guac).await, + KeySelector::Random(skew) => self.upsert_random(client, uq, skew, guac).await, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn constants() { + assert_eq!(EMBEDDING_SIZE, WORDS.len()); + } + + mod synthethic { + use std::time::Instant; + + use super::*; + + /// This test generates a synthetic data set of one cluster. + /// + /// It tests that no matter the index or skew, we get that one cluster. + #[test] + fn one() { + let eo = EmbeddingOptions { + num_clusters: 1, + // Randomly chosen seed. + seed_clusters: 10570396521983666744, + clustering: ClusterOptions { + max_adjacent: 1, + adjacent_theta: 0.999, + }, + }; + let sd = SyntheticDataSet { + collection: "test".to_string(), + embedding_options: eo, + }; + let reference = sd.cluster_by_index(0); + for i in 0..10 { + let cluster = sd.cluster_by_index(i); + assert_eq!(reference, cluster); + } + for _ in 0..50 { + let cluster = sd.cluster_by_skew( + Skew::Uniform, + &mut Guacamole::new(Instant::now().elapsed().as_nanos() as u64), + ); + assert_eq!(reference, cluster); + } + } + + /// This test generates one hundred clusters. + /// + /// It tests that they are different from each other. + #[test] + fn hundred() { + let eo = EmbeddingOptions { + num_clusters: 100, + // Randomly chosen seed. + seed_clusters: 16252348095511272702, + clustering: ClusterOptions { + max_adjacent: 10, + adjacent_theta: 0.999, + }, + }; + let sd = SyntheticDataSet { + collection: "test".to_string(), + embedding_options: eo, + }; + let clusters = (0..100).map(|i| sd.cluster_by_index(i)).collect::>(); + for (i, c1) in clusters.iter().enumerate() { + println!("cluster {}", c1.cluster_id); + for (j, c2) in clusters.iter().enumerate() { + if i == j { + assert_eq!(c1, c2); + } else { + assert_ne!(c1, c2, "{} {}", i, j); + } + } + } + } + } +} diff --git a/rust/load/src/data_sets.rs b/rust/load/src/data_sets.rs index 66f836918ca..8c40a425dd7 100644 --- a/rust/load/src/data_sets.rs +++ b/rust/load/src/data_sets.rs @@ -6,7 +6,7 @@ use guacamole::combinators::*; use guacamole::Guacamole; use tracing::Instrument; -use crate::{DataSet, Error, GetQuery, QueryQuery}; +use crate::{bit_difference, DataSet, Error, GetQuery, QueryQuery, UpsertQuery}; //////////////////////////////////////////////// Nop /////////////////////////////////////////////// @@ -46,6 +46,16 @@ impl DataSet for NopDataSet { tracing::info!("nop query"); Ok(()) } + + async fn upsert( + &self, + _: &ChromaClient, + _: UpsertQuery, + _: &mut Guacamole, + ) -> Result<(), Box> { + tracing::info!("nop upsert"); + Ok(()) + } } /////////////////////////////////////////// Tiny Stories /////////////////////////////////////////// @@ -155,6 +165,15 @@ impl DataSet for TinyStoriesDataSet { let _results = results?; Ok(()) } + + async fn upsert( + &self, + _: &ChromaClient, + _: UpsertQuery, + _: &mut Guacamole, + ) -> Result<(), Box> { + Err(Error::InvalidRequest("Upsert not supported".into()).into()) + } } const ALL_MINILM_L6_V2: &str = "all-MiniLM-L6-v2"; @@ -249,5 +268,40 @@ pub fn all_data_sets() -> Vec> { for data_set in TINY_STORIES_DATA_SETS { data_sets.push(Arc::new(data_set.clone()) as _); } + for num_clusters in [10_000, 100_000] { + for (seed_idx, seed_clusters) in [ + 0xab1cd5b6a5173d40usize, + 0x415c2b5b6451416dusize, + 0x7bfbf398fb74d56usize, + 0xed11fe8e8655591eusize, + 0xcb86c32c95df5657usize, + 0xa869711d201b98a4usize, + 0xe2a276bde1c91d1ausize, + 0x866a7f8100ccf78usize, + 0xa23e0b862d45e227usize, + 0x59f651f54a5ffe1usize, + ] + .into_iter() + .enumerate() + { + for max_adjacent in [1, 10, 100] { + let adjacent_theta = 0.99; + let eo = bit_difference::EmbeddingOptions { + num_clusters, + seed_clusters, + clustering: bit_difference::ClusterOptions { + max_adjacent, + adjacent_theta, + }, + }; + let collection = format!( + "bit-difference-scale-{:e}-seed-{}-adj-{}", + num_clusters, seed_idx, max_adjacent + ); + let data_set = Arc::new(bit_difference::SyntheticDataSet::new(collection, eo)); + data_sets.push(data_set as _); + } + } + } data_sets } diff --git a/rust/load/src/lib.rs b/rust/load/src/lib.rs index d554cc97c6f..2495d94ca57 100644 --- a/rust/load/src/lib.rs +++ b/rust/load/src/lib.rs @@ -18,10 +18,12 @@ use tower_http::trace::TraceLayer; use tracing::Instrument; use uuid::Uuid; +pub mod bit_difference; pub mod config; pub mod data_sets; pub mod opentelemetry_config; pub mod rest; +pub mod words; pub mod workloads; const CONFIG_PATH_ENV_VAR: &str = "CONFIG_PATH"; @@ -104,16 +106,25 @@ pub trait DataSet: std::fmt::Debug + Send + Sync { gq: GetQuery, guac: &mut Guacamole, ) -> Result<(), Box>; + async fn query( &self, client: &ChromaClient, vq: QueryQuery, guac: &mut Guacamole, ) -> Result<(), Box>; + + async fn upsert( + &self, + client: &ChromaClient, + uq: UpsertQuery, + guac: &mut Guacamole, + ) -> Result<(), Box>; } /////////////////////////////////////////// Distribution /////////////////////////////////////////// +/// Distribution size and shape. #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] pub enum Distribution { Constant(usize), @@ -136,6 +147,17 @@ impl Distribution { } } +/////////////////////////////////////////////// Skew /////////////////////////////////////////////// + +/// Distribution shape, without size. +#[derive(Copy, Clone, Debug, serde::Deserialize, serde::Serialize)] +pub enum Skew { + #[serde(rename = "uniform")] + Uniform, + #[serde(rename = "zipf")] + Zipf { theta: f64 }, +} + /////////////////////////////////////////// MetadataQuery ////////////////////////////////////////// #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] @@ -172,6 +194,7 @@ impl DocumentQuery { #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] pub struct GetQuery { + pub skew: Skew, pub limit: Distribution, #[serde(skip_serializing_if = "Option::is_none")] pub metadata: Option, @@ -183,6 +206,7 @@ pub struct GetQuery { #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] pub struct QueryQuery { + pub skew: Skew, pub limit: Distribution, #[serde(skip_serializing_if = "Option::is_none")] pub metadata: Option, @@ -190,6 +214,34 @@ pub struct QueryQuery { pub document: Option, } +//////////////////////////////////////////// KeySelector /////////////////////////////////////////// + +#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] +#[serde(tag = "type")] +pub enum KeySelector { + #[serde(rename = "index")] + Index(usize), + #[serde(rename = "random")] + Random(Skew), +} + +//////////////////////////////////////////// UpsertQuery /////////////////////////////////////////// + +#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] +pub struct UpsertQuery { + pub key: KeySelector, + pub batch_size: usize, + pub associativity: f64, +} + +/////////////////////////////////////////// WorkloadState ////////////////////////////////////////// + +#[derive(Clone)] +pub struct WorkloadState { + seq_no: u64, + guac: Guacamole, +} + ///////////////////////////////////////////// Workload ///////////////////////////////////////////// #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] @@ -209,6 +261,10 @@ pub enum Workload { after: chrono::DateTime, wrap: Box, }, + #[serde(rename = "load")] + Load, + #[serde(rename = "random")] + RandomUpsert(KeySelector), } impl Workload { @@ -234,6 +290,8 @@ impl Workload { } } Workload::Delay { after: _, wrap } => wrap.resolve_by_name(workloads)?, + Workload::Load => {} + Workload::RandomUpsert(_) => {} } Ok(()) } @@ -242,7 +300,7 @@ impl Workload { &self, client: &ChromaClient, data_set: &dyn DataSet, - guac: &mut Guacamole, + state: &mut WorkloadState, ) -> Result<(), Box> { match self { Workload::Nop => { @@ -257,18 +315,18 @@ impl Workload { } Workload::Get(get) => { data_set - .get(client, get.clone(), guac) + .get(client, get.clone(), &mut state.guac) .instrument(tracing::info_span!("get")) .await } Workload::Query(query) => { data_set - .query(client, query.clone(), guac) + .query(client, query.clone(), &mut state.guac) .instrument(tracing::info_span!("query")) .await } Workload::Hybrid(hybrid) => { - let scale: f64 = any(guac); + let scale: f64 = any(&mut state.guac); let mut total = scale * hybrid .iter() @@ -282,7 +340,7 @@ impl Workload { } if workload.is_active() { if *p >= total { - return Box::pin(workload.step(client, data_set, guac)).await; + return Box::pin(workload.step(client, data_set, state)).await; } total -= *p; } @@ -291,7 +349,41 @@ impl Workload { "miscalculation of total hybrid probabilities".to_string(), ))) } - Workload::Delay { after: _, wrap } => Box::pin(wrap.step(client, data_set, guac)).await, + Workload::Delay { after: _, wrap } => { + Box::pin(wrap.step(client, data_set, state)).await + } + Workload::Load => { + data_set + .upsert( + client, + UpsertQuery { + key: KeySelector::Index(state.seq_no as usize), + batch_size: 100, + // Associativity is the ratio of documents in a cluster to documents + // written by the workload. It is ignored for load. + associativity: 0.0, + }, + &mut state.guac, + ) + .instrument(tracing::info_span!("load")) + .await + } + Workload::RandomUpsert(key) => { + data_set + .upsert( + client, + UpsertQuery { + key: key.clone(), + batch_size: 100, + // Associativity is the ratio of documents in a cluster to documents + // written by the workload. It is ignored for load. + associativity: 0.0, + }, + &mut state.guac, + ) + .instrument(tracing::info_span!("load")) + .await + } } } @@ -303,6 +395,8 @@ impl Workload { Workload::Query(_) => true, Workload::Hybrid(hybrid) => hybrid.iter().any(|(_, w)| w.is_active()), Workload::Delay { after, wrap } => chrono::Utc::now() >= *after && wrap.is_active(), + Workload::Load => true, + Workload::RandomUpsert(_) => true, } } } @@ -545,7 +639,9 @@ impl LoadService { } } }); + let mut seq_no = 0u64; while !done.load(std::sync::atomic::Ordering::Relaxed) { + seq_no += 1; let delay = interarrival_duration(spec.throughput)(&mut guac); next_op += delay; let now = Instant::now(); @@ -560,10 +656,11 @@ impl LoadService { let workload = spec.workload.clone(); let client = Arc::clone(&client); let data_set = Arc::clone(&spec.data_set); - let mut guacamole = Guacamole::new(any(&mut guac)); + let guac = Guacamole::new(any(&mut guac)); + let mut state = WorkloadState { seq_no, guac }; let fut = async move { workload - .step(&client, &*data_set, &mut guacamole) + .step(&client, &*data_set, &mut state) .await .map_err(|err| { tracing::error!("workload failed: {err:?}"); @@ -812,6 +909,11 @@ mod tests { 1.0, { "get": { + "skew": { + "zipf": { + "theta": 0.999 + } + }, "limit": { "Constant": 10 } @@ -822,6 +924,11 @@ mod tests { 1.0, { "query": { + "skew": { + "zipf": { + "theta": 0.999 + } + }, "limit": { "Constant": 10 } @@ -845,6 +952,7 @@ mod tests { ( 1.0, Workload::Get(GetQuery { + skew: Skew::Zipf { theta: 0.999 }, limit: Distribution::Constant(10), document: None, metadata: None, @@ -853,6 +961,7 @@ mod tests { ( 1.0, Workload::Query(QueryQuery { + skew: Skew::Zipf { theta: 0.999 }, limit: Distribution::Constant(10), document: None, metadata: None, diff --git a/rust/load/src/words.rs b/rust/load/src/words.rs new file mode 100644 index 00000000000..f77f185c7b1 --- /dev/null +++ b/rust/load/src/words.rs @@ -0,0 +1,1026 @@ +pub const WORDS: &[&str] = &[ + "man’s", + "sought", + "touch", + "understood", + "watched", + "can't", + "distant", + "floor", + "hung", + "noise", + "not?”", + "opening", + "prepared", + "ancient", + "art", + "course,", + "debray", + "english", + "fields", + "mere", + "sherlock", + "spoke,", + "“for", + "air,", + "anxious", + "dropped", + "myself,", + "name,", + "slowly", + "awful", + "bore", + "broad", + "child,", + "escape", + "foe", + "forget", + "line", + "more,", + "powers", + "rue", + "sons", + "waited", + "afterwards", + "bound", + "court", + "distance", + "easily", + "entirely", + "figure", + "foundation", + "o’clock", + "presented", + "worthy", + "ah,", + "drawn", + "heroes", + "notice", + "presently", + "pretty", + "stay", + "'tis", + "glance", + "merely", + "mouth", + "outside", + "city", + "glad", + "morcerf,", + "shining", + "beauchamp", + "before,", + "chair", + "conversation", + "early", + "eight", + "except", + "impossible", + "terror", + "wind", + "words,", + "“there", + "advanced", + "happened", + "learn", + "persons", + "real", + "says", + "silver", + "speaking", + "today", + "tomorrow", + "unhappy", + "vengeance", + "view", + "glory", + "mother,", + "right,", + "step", + "think,", + "whilst", + "appearance", + "drove", + "forward", + "murmured", + "you;", + "“yes;", + "alone,", + "battle", + "danger", + "equal", + "formed", + "getting", + "story", + "“this", + "brother", + "captain", + "greek", + "host", + "rushed", + "sitting", + "twelve", + "year", + "for,", + "gloomy", + "presence", + "safe", + "seat", + "why,", + "generous", + "him”", + "island", + "lorry,", + "run", + "will,", + "“your", + "ali", + "loud", + "loved", + "perhaps,", + "that's", + "beyond", + "filled", + "lose", + "pleasure", + "started", + "third", + "count’s", + "fifty", + "grief", + "purpose", + "therefore", + "thick", + "uttered", + "clear", + "grew", + "kill", + "subject", + "town", + "trust", + "walls", + "wanted", + "“let", + "besides,", + "boys", + "common", + "covered", + "evidently", + "further", + "god,", + "passing", + "sea", + "trouble", + "work,", + "effect", + "fresh", + "iron", + "mine", + "other,", + "whatever", + "write", + "age", + "aid", + "darkness", + "dust", + "road", + "sweet", + "touched", + "window,", + "arthur", + "be,", + "cry", + "do,", + "mark", + "perfectly", + "saying", + "stern", + "thoughts", + "(the", + "ajax", + "easy", + "front", + "morning,", + "mortal", + "ought", + "pointed", + "receive", + "train", + "allow", + "back,", + "doing", + "huck", + "papers", + "paris,", + "single", + "worth", + "written", + "hero", + "immediately", + "silent", + "standing", + "bent", + "first,", + "future", + "homer", + "leaving", + "makes", + "pity", + "seek", + "tone", + "women", + "act", + "appear", + "box", + "broken", + "gold", + "mercédès", + "showed", + "account", + "book", + "charge", + "death,", + "died", + "field", + "hands,", + "men,", + "voice,", + "family", + "race", + "suppose", + "walked", + "bed,", + "chance", + "fury", + "joy", + "seven", + "expected", + "honor", + "in,", + "living", + "sudden", + "wild", + "cast", + "cut", + "mind,", + "st", + "“yes,”", + "ain't", + "desire", + "number", + "beautiful", + "hardly", + "master", + "moment,", + "neither", + "p", + "thank", + "waiting", + "heart,", + "sometimes", + "street", + "trembling", + "visit", + "wide", + "comes", + "me?”", + "place,", + "knows", + "pray", + "send", + "stand", + "french", + "monsieur", + "professor", + "water", + "blue", + "i'll", + "seized", + "“we", + "certainly", + "nearly", + "pay", + "rising", + "swift", + "anyone", + "holmes,", + "jonathan", + "nature", + "mina", + "across", + "bad", + "beside", + "different", + "foot", + "glass", + "hard", + "indeed", + "observed", + "repeated", + "scene", + "table", + "“why,", + "feeling", + "me;", + "question", + "remember", + "rich", + "franz,", + "past", + "rage", + "country", + "else", + "plain", + "sad", + "thee", + "wished", + "inquired", + "noble", + "son,", + "down,", + "hair", + "it?”", + "wall", + "added", + "caderousse,", + "change", + "possible", + "boy", + "fine", + "fixed", + "paid", + "remain", + "talk", + "bold", + "follow", + "gentleman", + "horses", + "itself", + "moved", + "often", + "short", + "true", + "form", + "hold", + "reached", + "watch", + "ye", + "gutenberg", + "promise", + "“to", + "dantès,", + "golden", + "greeks", + "hours", + "public", + "scarcely", + "tears", + "“in", + "paper", + "several", + "sun", + "twenty", + "away,", + "friends", + "prisoner", + "usual", + "electronic", + "free", + "lady", + "fortune", + "i,", + "la", + "mighty", + "proud", + "terms", + "cold", + "corner", + "following", + "life,", + "shook", + "heavy", + "live", + "stone", + "breast", + "late", + "grecian", + "herself", + "lucy", + "show", + "you?”", + "none", + "“yes”", + "it;", + "length", + "sign", + "steps", + "themselves", + "arrived", + "attention", + "edmond", + "himself,", + "husband", + "secret", + "die", + "greece", + "human", + "arms,", + "try", + "“is", + "interest", + "bright", + "defarge", + "instead", + "one,", + "valentine,", + "“how", + "exclaimed", + "mrs", + "need", + "“if", + "expression", + "indeed,", + "threw", + "tried", + "face,", + "mean", + "morcerf", + "although", + "fierce", + "fight", + "money", + "reason", + "head,", + "seem", + "wait", + "caderousse", + "girl", + "see,", + "together", + "“then", + "bear", + "come,", + "him;", + "led", + "noirtier", + "happy", + "smile", + "state", + "instant", + "mademoiselle", + "meet", + "paris", + "ran", + "earth", + "it's", + "low", + "seeing", + "silence", + "“a", + "carried", + "closed", + "andrea", + "child", + "evening", + "met", + "sacred", + "sir,”", + "too,", + "minutes", + "not,", + "answer", + "brave", + "making", + "fall", + "francs", + "on,", + "turning", + "up,", + "bring", + "daughter", + "strength", + "used", + "able", + "pass", + "which,", + "six", + "times", + "course", + "sort", + "lorry", + "longer", + "vain", + "others", + "above", + "certain", + "save", + "war", + "works", + "point", + "everything", + "forth", + "us,", + "abbé", + "me”", + "out,", + "troy", + "who,", + "father,", + "rose", + "suddenly", + "yourself", + "fire", + "ready", + "window", + "lord", + "spoke", + "know,", + "known", + "night,", + "doubt", + "gods", + "trojan", + "bed", + "lips", + "stopped", + "yes,", + "all,", + "idea", + "o", + "pale", + "fair", + "fate", + "friend,", + "way,", + "cause", + "here,", + "laid", + "means", + "dreadful", + "general", + "room,", + "taking", + "albert,", + "business", + "force", + "kept", + "whether", + "become", + "feet", + "terrible", + "home", + "kind", + "wife", + "soul", + "house,", + "it”", + "mother", + "care", + "ground", + "sight", + "turn", + "dr", + "woman", + "received", + "“no,", + "no,", + "seems", + "beneath", + "doctor", + "helsing", + "jove", + "morrel,", + "this,", + "chief", + "“ah,", + "body", + "manner", + "sound", + "remained", + "again,", + "either", + "gone", + "matter", + "person", + "raised", + "red", + "strange", + "eye", + "second", + "achilles", + "arm", + "drew", + "world", + "best", + "carriage", + "eyes,", + "“he", + "case", + "present", + "sleep", + "became", + "feel", + "sure", + "help", + "sent", + "answered", + "coming", + "deep", + "power", + "“my", + "letter", + "sir,", + "dark", + "followed", + "king", + "least", + "blood", + "heaven", + "struck", + "appeared", + "perhaps", + "use", + "say,", + "understand", + "close", + "strong", + "was,", + "large", + "near", + "placed", + "door,", + "thousand", + "cristo,", + "day,", + "gutenbergtm", + "air", + "order", + "you”", + "so,", + "danglars,", + "call", + "entered", + "among", + "holmes", + "black", + "enough", + "four", + "rest", + "given", + "is,", + "ten", + "continued", + "there,", + "hector", + "less", + "really", + "“do", + "along", + "called", + "read", + "white", + "also", + "five", + "return", + "rather", + "valentine", + "hundred", + "during", + "lost", + "thou", + "within", + "fear", + "hour", + "hand,", + "people", + "speak", + "keep", + "dead", + "oh,", + "villefort,", + "days", + "opened", + "held", + "van", + "hope", + "morning", + "set", + "open", + "sat", + "want", + "behind", + "said:", + "high", + "love", + "already", + "part", + "alone", + "o'er", + "thing", + "lay", + "well,", + "miss", + "next", + "her,", + "anything", + "ask", + "but,", + "until", + "because", + "believe", + "taken", + "things", + "word", + "brought", + "time,", + "why", + "count,", + "voice", + "fell", + "does", + "side", + "small", + "years", + "franz", + "end", + "son", + "half", + "hear", + "them,", + "looking", + "“that", + "better", + "around", + "death", + "new", + "however,", + "stood", + "“well,", + "albert", + "that,", + "felt", + "“yes,", + "chapter", + "cannot", + "began", + "he,", + "morrel", + "don't", + "friend", + "right", + "words", + "till", + "wish", + "soon", + "leave", + "passed", + "gave", + "done", + "myself", + "almost", + "dantès", + "cried", + "mind", + "full", + "something", + "both", + "god", + "round", + "far", + "going", + "arms", + "light", + "whole", + "between", + "name", + "few", + "“oh,", + "returned", + "danglars", + "man,", + "“but", + "“the", + "knew", + "since", + "heart", + "towards", + "get", + "room", + "place", + "turned", + "quite", + "put", + "moment", + "tom", + "off", + "told", + "got", + "“what", + "against", + "always", + "project", + "said,", + "another", + "men", + "“it", + "now,", + "find", + "having", + "life", + "villefort", + "father", + "hands", + "work", + "nor", + "house", + "many", + "night", + "poor", + "seen", + "under", + "face", + "look", + "give", + "whose", + "and,", + "ever", + "away", + "heard", + "being", + "head", + "thought", + "each", + "once", + "too", + "nothing", + "cristo", + "though", + "dear", + "madame", + "three", + "*", + "door", + "whom", + "well", + "again", + "found", + "way", + "left", + "seemed", + "say", + "replied", + "asked", + "here", + "thus", + "take", + "last", + "back", + "looked", + "make", + "same", + "then,", + "still", + "most", + "every", + "day", + "saw", + "“and", + "yet", + "good", + "took", + "think", + "those", + "went", + "long", + "own", + "just", + "tell", + "eyes", + "while", + "“you", + "came", + "go", + "might", + "without", + "let", + "count", + "even", + "come", + "never", + "thy", + "hand", + "it,", + "old", + "you,", + "how", + "much", + "first", + "mr", + "him,", + "after", + "m", + "himself", + "de", + "down", + "young", + "its", + "other", + "me,", + "monte", + "little", + "us", + "over", + "made", + "great", + "where", + "about", + "through", + "these", + "time", + "such", + "should", + "before", + "did", + "can", + "two", + "than", + "know", + "may", + "must", + "any", + "see", + "am", + "only", + "like", + "them", + "upon", + "very", + "now", + "man", + "up", + "our", + "shall", + "into", + "out", + "could", + "more", + "some", + "“i", + "has", + "then", + "do", + "been", + "would", + "their", + "there", + "what", + "no", + "an", + "one", + "will", + "if", + "when", + "who", + "were", + "are", + "your", + "she", + "or", + "they", + "we", + "me", + "so", + "him", + "all", + "said", + "by", + "which", + "this", + "her", + "from", + "be", + "but", + "have", + "on", + "my", + "at", + "not", + "had", + "for", + "as", + "is", + "it", + "with", + "you", + "was", + "that", + "his", + "he", + "i", + "in", + "a", + "to", + "of", + "and", + "the", +]; diff --git a/rust/load/src/workloads.rs b/rust/load/src/workloads.rs index c61b17014c0..22e67d699c9 100644 --- a/rust/load/src/workloads.rs +++ b/rust/load/src/workloads.rs @@ -1,12 +1,15 @@ use std::collections::HashMap; -use crate::{Distribution, DocumentQuery, GetQuery, MetadataQuery, QueryQuery, Workload}; +use crate::{ + Distribution, DocumentQuery, GetQuery, KeySelector, MetadataQuery, QueryQuery, Skew, Workload, +}; pub fn all_workloads() -> HashMap { HashMap::from_iter([ ( "get-no-filter".to_string(), Workload::Get(GetQuery { + skew: Skew::Zipf { theta: 0.999 }, limit: Distribution::Constant(10), metadata: None, document: None, @@ -15,6 +18,7 @@ pub fn all_workloads() -> HashMap { ( "get-document".to_string(), Workload::Get(GetQuery { + skew: Skew::Zipf { theta: 0.999 }, limit: Distribution::Constant(10), metadata: None, document: Some(DocumentQuery::Raw(serde_json::json!({"$contains": "the"}))), @@ -23,6 +27,7 @@ pub fn all_workloads() -> HashMap { ( "get-metadata".to_string(), Workload::Get(GetQuery { + skew: Skew::Zipf { theta: 0.999 }, limit: Distribution::Constant(10), metadata: Some(MetadataQuery::Raw(serde_json::json!({"i1": 1000}))), document: None, @@ -31,6 +36,7 @@ pub fn all_workloads() -> HashMap { ( "query-no-filter".to_string(), Workload::Query(QueryQuery { + skew: Skew::Zipf { theta: 0.999 }, limit: Distribution::Constant(10), metadata: None, document: None, @@ -42,6 +48,7 @@ pub fn all_workloads() -> HashMap { ( 0.3, Workload::Get(GetQuery { + skew: Skew::Zipf { theta: 0.999 }, limit: Distribution::Constant(10), metadata: None, document: Some(DocumentQuery::Raw(serde_json::json!({"$contains": "the"}))), @@ -50,6 +57,7 @@ pub fn all_workloads() -> HashMap { ( 0.7, Workload::Query(QueryQuery { + skew: Skew::Zipf { theta: 0.999 }, limit: Distribution::Constant(10), metadata: Some(MetadataQuery::Raw(serde_json::json!({"i1": 1000}))), document: None, @@ -63,6 +71,7 @@ pub fn all_workloads() -> HashMap { ( 0.5, Workload::Get(GetQuery { + skew: Skew::Zipf { theta: 0.999 }, limit: Distribution::Constant(10), metadata: None, document: Some(DocumentQuery::Raw(serde_json::json!({"$contains": "the"}))), @@ -71,6 +80,7 @@ pub fn all_workloads() -> HashMap { ( 0.25, Workload::Get(GetQuery { + skew: Skew::Zipf { theta: 0.999 }, limit: Distribution::Constant(10), metadata: Some(MetadataQuery::Raw(serde_json::json!({"i1": 1000}))), document: None, @@ -79,6 +89,7 @@ pub fn all_workloads() -> HashMap { ( 0.25, Workload::Query(QueryQuery { + skew: Skew::Zipf { theta: 0.999 }, limit: Distribution::Constant(10), metadata: None, document: None, @@ -86,5 +97,10 @@ pub fn all_workloads() -> HashMap { ), ]), ), + ("load".to_string(), Workload::Load), + ( + "random-upsert".to_string(), + Workload::RandomUpsert(KeySelector::Random(Skew::Zipf { theta: 0.999 })), + ), ]) }