Skip to content

Commit

Permalink
[CLN] Make log use enum not dyn (#2351)
Browse files Browse the repository at this point in the history
## Description of changes

*Summarize the changes made by this PR.*
 - Improvements & Bug fixes
	 - Same reasoning as #2350 
 - New functionality
	 - None

## Test plan
*How are these changes tested?*
existing
- [x] Tests pass locally with `pytest` for python, `yarn test` for js,
`cargo test` for rust

## Documentation Changes
None
  • Loading branch information
HammadB authored Jun 17, 2024
1 parent aceffb4 commit aa9a3b7
Show file tree
Hide file tree
Showing 10 changed files with 131 additions and 106 deletions.
14 changes: 9 additions & 5 deletions rust/worker/src/compactor/compaction_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub(crate) struct CompactionManager {
system: Option<System>,
scheduler: Scheduler,
// Dependencies
log: Box<dyn Log>,
log: Box<Log>,
sysdb: Box<SysDb>,
storage: Storage,
blockfile_provider: BlockfileProvider,
Expand Down Expand Up @@ -65,7 +65,7 @@ impl ChromaError for CompactionError {
impl CompactionManager {
pub(crate) fn new(
scheduler: Scheduler,
log: Box<dyn Log>,
log: Box<Log>,
sysdb: Box<SysDb>,
storage: Storage,
blockfile_provider: BlockfileProvider,
Expand Down Expand Up @@ -311,12 +311,16 @@ mod tests {

#[tokio::test]
async fn test_compaction_manager() {
let mut log = Box::new(InMemoryLog::new());
let mut log = Box::new(Log::InMemory(InMemoryLog::new()));
let mut in_memory_log = match *log {
Log::InMemory(ref mut log) => log,
_ => panic!("Expected InMemoryLog"),
};
let tmpdir = tempfile::tempdir().unwrap();
let storage = Storage::Local(LocalStorage::new(tmpdir.path().to_str().unwrap()));

let collection_uuid_1 = Uuid::from_str("00000000-0000-0000-0000-000000000001").unwrap();
log.add_log(
in_memory_log.add_log(
collection_uuid_1.clone(),
Box::new(InternalLogRecord {
collection_id: collection_uuid_1.clone(),
Expand All @@ -337,7 +341,7 @@ mod tests {
);

let collection_uuid_2 = Uuid::from_str("00000000-0000-0000-0000-000000000002").unwrap();
log.add_log(
in_memory_log.add_log(
collection_uuid_2.clone(),
Box::new(InternalLogRecord {
collection_id: collection_uuid_2.clone(),
Expand Down
28 changes: 18 additions & 10 deletions rust/worker/src/compactor/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use uuid::Uuid;

pub(crate) struct Scheduler {
my_ip: String,
log: Box<dyn Log>,
log: Box<Log>,
sysdb: Box<SysDb>,
policy: Box<dyn SchedulerPolicy>,
job_queue: Vec<CompactionJob>,
Expand All @@ -22,7 +22,7 @@ pub(crate) struct Scheduler {
impl Scheduler {
pub(crate) fn new(
my_ip: String,
log: Box<dyn Log>,
log: Box<Log>,
sysdb: Box<SysDb>,
policy: Box<dyn SchedulerPolicy>,
max_concurrent_jobs: usize,
Expand Down Expand Up @@ -202,10 +202,14 @@ mod tests {

#[tokio::test]
async fn test_scheduler() {
let mut log = Box::new(InMemoryLog::new());
let mut log = Box::new(Log::InMemory(InMemoryLog::new()));
let mut in_memory_log = match *log {
Log::InMemory(ref mut in_memory_log) => in_memory_log,
_ => panic!("Invalid log type"),
};

let collection_uuid_1 = Uuid::from_str("00000000-0000-0000-0000-000000000001").unwrap();
log.add_log(
in_memory_log.add_log(
collection_uuid_1.clone(),
Box::new(InternalLogRecord {
collection_id: collection_uuid_1.clone(),
Expand All @@ -226,7 +230,7 @@ mod tests {
);

let collection_uuid_2 = Uuid::from_str("00000000-0000-0000-0000-000000000002").unwrap();
log.add_log(
in_memory_log.add_log(
collection_uuid_2.clone(),
Box::new(InternalLogRecord {
collection_id: collection_uuid_2.clone(),
Expand Down Expand Up @@ -349,10 +353,14 @@ mod tests {
expected = "offset in sysdb is less than offset in log, this should not happen!"
)]
async fn test_scheduler_panic() {
let mut log = Box::new(InMemoryLog::new());
let mut log = Box::new(Log::InMemory(InMemoryLog::new()));
let mut in_memory_log = match *log {
Log::InMemory(ref mut in_memory_log) => in_memory_log,
_ => panic!("Invalid log type"),
};

let collection_uuid_1 = Uuid::from_str("00000000-0000-0000-0000-000000000001").unwrap();
log.add_log(
in_memory_log.add_log(
collection_uuid_1.clone(),
Box::new(InternalLogRecord {
collection_id: collection_uuid_1.clone(),
Expand All @@ -371,7 +379,7 @@ mod tests {
},
}),
);
log.add_log(
in_memory_log.add_log(
collection_uuid_1.clone(),
Box::new(InternalLogRecord {
collection_id: collection_uuid_1.clone(),
Expand All @@ -390,7 +398,7 @@ mod tests {
},
}),
);
log.add_log(
in_memory_log.add_log(
collection_uuid_1.clone(),
Box::new(InternalLogRecord {
collection_id: collection_uuid_1.clone(),
Expand All @@ -409,7 +417,7 @@ mod tests {
},
}),
);
log.add_log(
in_memory_log.add_log(
collection_uuid_1.clone(),
Box::new(InternalLogRecord {
collection_id: collection_uuid_1.clone(),
Expand Down
90 changes: 47 additions & 43 deletions rust/worker/src/execution/operators/pull_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,19 @@ use crate::log::log::Log;
use crate::log::log::PullLogsError;
use crate::types::LogRecord;
use async_trait::async_trait;
use tracing::trace;
use uuid::Uuid;

/// The pull logs operator is responsible for reading logs from the log service.
#[derive(Debug)]
pub struct PullLogsOperator {
client: Box<dyn Log>,
client: Box<Log>,
}

impl PullLogsOperator {
/// Create a new pull logs operator.
/// # Parameters
/// * `client` - The log client to use for reading logs.
pub fn new(client: Box<dyn Log>) -> Box<Self> {
pub fn new(client: Box<Log>) -> Box<Self> {
Box::new(PullLogsOperator { client })
}
}
Expand Down Expand Up @@ -152,47 +151,52 @@ mod tests {

#[tokio::test]
async fn test_pull_logs() {
let mut log = Box::new(InMemoryLog::new());

let mut log = Box::new(Log::InMemory(InMemoryLog::new()));
let collection_uuid_1 = Uuid::from_str("00000000-0000-0000-0000-000000000001").unwrap();
log.add_log(
collection_uuid_1.clone(),
Box::new(InternalLogRecord {
collection_id: collection_uuid_1.clone(),
log_offset: 0,
log_ts: 1,
record: LogRecord {
log_offset: 0,
record: OperationRecord {
id: "embedding_id_1".to_string(),
embedding: None,
encoding: None,
metadata: None,
document: None,
operation: Operation::Add,
},
},
}),
);
log.add_log(
collection_uuid_1.clone(),
Box::new(InternalLogRecord {
collection_id: collection_uuid_1.clone(),
log_offset: 1,
log_ts: 2,
record: LogRecord {
log_offset: 1,
record: OperationRecord {
id: "embedding_id_2".to_string(),
embedding: None,
encoding: None,
metadata: None,
document: None,
operation: Operation::Add,
},
},
}),
);

match *log {
Log::InMemory(ref mut log) => {
log.add_log(
collection_uuid_1.clone(),
Box::new(InternalLogRecord {
collection_id: collection_uuid_1.clone(),
log_offset: 0,
log_ts: 1,
record: LogRecord {
log_offset: 0,
record: OperationRecord {
id: "embedding_id_1".to_string(),
embedding: None,
encoding: None,
metadata: None,
document: None,
operation: Operation::Add,
},
},
}),
);
log.add_log(
collection_uuid_1.clone(),
Box::new(InternalLogRecord {
collection_id: collection_uuid_1.clone(),
log_offset: 1,
log_ts: 2,
record: LogRecord {
log_offset: 1,
record: OperationRecord {
id: "embedding_id_2".to_string(),
embedding: None,
encoding: None,
metadata: None,
document: None,
operation: Operation::Add,
},
},
}),
);
}
_ => panic!("Expected InMemoryLog"),
}

let operator = PullLogsOperator::new(log);

Expand Down
6 changes: 3 additions & 3 deletions rust/worker/src/execution/operators/register.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub struct RegisterInput {
collection_version: i32,
segment_flush_info: Arc<[SegmentFlushInfo]>,
sysdb: Box<SysDb>,
log: Box<dyn Log>,
log: Box<Log>,
}

impl RegisterInput {
Expand All @@ -58,7 +58,7 @@ impl RegisterInput {
collection_version: i32,
segment_flush_info: Arc<[SegmentFlushInfo]>,
sysdb: Box<SysDb>,
log: Box<dyn Log>,
log: Box<Log>,
) -> Self {
RegisterInput {
tenant,
Expand Down Expand Up @@ -151,7 +151,7 @@ mod tests {
#[tokio::test]
async fn test_register_operator() {
let mut sysdb = Box::new(SysDb::Test(TestSysDb::new()));
let mut log = Box::new(InMemoryLog::new());
let mut log = Box::new(Log::InMemory(InMemoryLog::new()));
let collection_version = 0;
let collection_uuid_1 = Uuid::from_str("00000000-0000-0000-0000-000000000001").unwrap();
let tenant_1 = "tenant_1".to_string();
Expand Down
4 changes: 2 additions & 2 deletions rust/worker/src/execution/orchestration/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ pub struct CompactOrchestrator {
system: System,
collection_id: Uuid,
// Dependencies
log: Box<dyn Log>,
log: Box<Log>,
sysdb: Box<SysDb>,
blockfile_provider: BlockfileProvider,
hnsw_index_provider: HnswIndexProvider,
Expand Down Expand Up @@ -145,7 +145,7 @@ impl CompactOrchestrator {
compaction_job: CompactionJob,
system: System,
collection_id: Uuid,
log: Box<dyn Log>,
log: Box<Log>,
sysdb: Box<SysDb>,
blockfile_provider: BlockfileProvider,
hnsw_index_provider: HnswIndexProvider,
Expand Down
4 changes: 2 additions & 2 deletions rust/worker/src/execution/orchestration/hnsw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ pub(crate) struct HnswQueryOrchestrator {
merge_dependency_count: u32,
finish_dependency_count: u32,
// Services
log: Box<dyn Log>,
log: Box<Log>,
sysdb: Box<SysDb>,
dispatcher: Box<dyn Receiver<TaskMessage>>,
hnsw_index_provider: HnswIndexProvider,
Expand All @@ -148,7 +148,7 @@ impl HnswQueryOrchestrator {
allowed_ids: Vec<String>,
include_embeddings: bool,
segment_id: Uuid,
log: Box<dyn Log>,
log: Box<Log>,
sysdb: Box<SysDb>,
hnsw_index_provider: HnswIndexProvider,
blockfile_provider: BlockfileProvider,
Expand Down
8 changes: 4 additions & 4 deletions rust/worker/src/execution/orchestration/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub(crate) struct MetadataQueryOrchestrator {
// State machine management
merge_dependency_count: u32,
// Services
log: Box<dyn Log>,
log: Box<Log>,
sysdb: Box<SysDb>,
dispatcher: Box<dyn Receiver<TaskMessage>>,
blockfile_provider: BlockfileProvider,
Expand All @@ -83,7 +83,7 @@ pub(crate) struct CountQueryOrchestrator {
record_segment: Option<Segment>,
collection: Option<Collection>,
// Services
log: Box<dyn Log>,
log: Box<Log>,
sysdb: Box<SysDb>,
dispatcher: Box<dyn Receiver<TaskMessage>>,
blockfile_provider: BlockfileProvider,
Expand Down Expand Up @@ -129,7 +129,7 @@ impl CountQueryOrchestrator {
pub(crate) fn new(
system: System,
metadata_segment_id: &Uuid,
log: Box<dyn Log>,
log: Box<Log>,
sysdb: Box<SysDb>,
dispatcher: Box<dyn Receiver<TaskMessage>>,
blockfile_provider: BlockfileProvider,
Expand Down Expand Up @@ -450,7 +450,7 @@ impl MetadataQueryOrchestrator {
system: System,
metadata_segment_id: &Uuid,
query_ids: Option<Vec<String>>,
log: Box<dyn Log>,
log: Box<Log>,
sysdb: Box<SysDb>,
dispatcher: Box<dyn Receiver<TaskMessage>>,
blockfile_provider: BlockfileProvider,
Expand Down
Loading

0 comments on commit aa9a3b7

Please sign in to comment.