Skip to content

Commit

Permalink
[BUG] InMemoryLog preserve offset orders
Browse files Browse the repository at this point in the history
  • Loading branch information
Ishiihara committed Apr 24, 2024
1 parent ed0d578 commit 0004f1b
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 5 deletions.
2 changes: 1 addition & 1 deletion rust/worker/src/execution/operators/pull_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl Operator<PullLogsInput, PullLogsOutput> for PullLogsOperator {
}

num_records_read += logs.len();
offset += batch_size as i64;
offset = logs.last().unwrap().log_offset + 1;
result.append(&mut logs);

// We used a a timestamp and we didn't get a full batch, so we have retrieved
Expand Down
24 changes: 20 additions & 4 deletions rust/worker/src/log/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,17 @@ impl InMemoryLog {

pub fn add_log(&mut self, collection_id: String, log: Box<InternalLogRecord>) {
let logs = self.logs.entry(collection_id).or_insert(Vec::new());
logs.push(log);
if logs.is_empty() {
logs.push(log);
return;
} else {
let last_log = logs.last().unwrap();
if log.log_offset != last_log.log_offset + 1 {
panic!("log offset is not in order");
} else {
logs.push(log);
}
}
}
}

Expand All @@ -284,11 +294,17 @@ impl Log for InMemoryLog {
None => return Ok(Vec::new()),
};
let mut result = Vec::new();
for i in offset..(offset + batch_size as i64) {
if i < logs.len() as i64 && logs[i as usize].log_ts <= end_timestamp {
result.push(logs[i as usize].record.clone());
for i in 0..logs.len() as i64 {
if logs[i as usize].log_offset < offset {
continue;
}
if logs[i as usize].log_ts > end_timestamp {
continue;
}
result.push(logs[i as usize].record.clone());
}
result.sort_by(|a, b| a.log_offset.cmp(&b.log_offset));
result.truncate(batch_size as usize);
Ok(result)
}

Expand Down

0 comments on commit 0004f1b

Please sign in to comment.