diff --git a/rust/worker/src/execution/operators/pull_log.rs b/rust/worker/src/execution/operators/pull_log.rs index 5e628b5887a..f62e861f7e4 100644 --- a/rust/worker/src/execution/operators/pull_log.rs +++ b/rust/worker/src/execution/operators/pull_log.rs @@ -117,7 +117,8 @@ impl Operator for PullLogsOperator { } num_records_read += logs.len(); - offset += batch_size as i64; + // unwrap here is safe because we just checked if empty + offset = logs.last().unwrap().log_offset + 1; result.append(&mut logs); // We used a a timestamp and we didn't get a full batch, so we have retrieved