diff --git a/quickwit/quickwit-indexing/src/source/ingest/mod.rs b/quickwit/quickwit-indexing/src/source/ingest/mod.rs index 53aa50a4b1e..e36752f0632 100644 --- a/quickwit/quickwit-indexing/src/source/ingest/mod.rs +++ b/quickwit/quickwit-indexing/src/source/ingest/mod.rs @@ -409,7 +409,6 @@ impl Source for IngestSource { .publish_position_inclusive .unwrap_or_default(); let from_position_exclusive = current_position_inclusive.clone(); - let to_position_inclusive = Position::Eof; let status = if from_position_exclusive == Position::Eof { IndexingStatus::Complete @@ -421,7 +420,6 @@ impl Source for IngestSource { source_id, shard_id, from_position_exclusive, - to_position_inclusive, )) .await { diff --git a/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs b/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs index a26c2452d48..26653e32ecc 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs @@ -21,7 +21,6 @@ use std::borrow::Borrow; use std::collections::hash_map::Entry; use std::collections::HashMap; use std::fmt; -use std::ops::{Bound, RangeBounds}; use std::sync::Arc; use bytes::{BufMut, BytesMut}; @@ -38,17 +37,17 @@ use super::ingester::IngesterState; use crate::ingest_v2::mrecord::is_eof_mrecord; use crate::{ClientId, IngesterPool}; -/// A fetch task is responsible for waiting and pushing new records written to a shard's record log -/// into a channel named `fetch_response_tx`. -pub(super) struct FetchTask { +/// A fetch stream task is responsible for waiting and pushing new records written to a shard's +/// record log into a channel named `fetch_response_tx`. +pub(super) struct FetchStreamTask { /// Uniquely identifies the consumer of the fetch task for logging and debugging purposes. client_id: ClientId, index_uid: IndexUid, source_id: SourceId, shard_id: ShardId, queue_id: QueueId, - /// Range of records to fetch. - fetch_range: FetchRange, + /// The position of the next record fetched. + from_position_inclusive: u64, state: Arc>, fetch_response_tx: mpsc::Sender>, /// This channel notifies the fetch task when new records are available. This way the fetch @@ -57,9 +56,9 @@ pub(super) struct FetchTask { batch_num_bytes: usize, } -impl fmt::Debug for FetchTask { +impl fmt::Debug for FetchStreamTask { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("FetchTask") + f.debug_struct("FetchStreamTask") .field("client_id", &self.client_id) .field("index_uid", &self.index_uid) .field("source_id", &self.source_id) @@ -68,9 +67,7 @@ impl fmt::Debug for FetchTask { } } -type FetchTaskHandle = JoinHandle<(u64, Position)>; - -impl FetchTask { +impl FetchStreamTask { pub const DEFAULT_BATCH_NUM_BYTES: usize = 1024 * 1024; // 1 MiB pub fn spawn( @@ -80,48 +77,46 @@ impl FetchTask { batch_num_bytes: usize, ) -> ( ServiceStream>, - FetchTaskHandle, + JoinHandle<()>, ) { + let from_position_inclusive = open_fetch_stream_request + .from_position_exclusive() + .as_u64() + .map(|offset| offset + 1) + .unwrap_or_default(); let (fetch_response_tx, fetch_stream) = ServiceStream::new_bounded(3); - let fetch_range = FetchRange::new( - open_fetch_stream_request.from_position_exclusive(), - open_fetch_stream_request.to_position_inclusive(), - ); let mut fetch_task = Self { queue_id: open_fetch_stream_request.queue_id(), client_id: open_fetch_stream_request.client_id, index_uid: open_fetch_stream_request.index_uid.into(), source_id: open_fetch_stream_request.source_id, shard_id: open_fetch_stream_request.shard_id, - fetch_range, + from_position_inclusive, state, fetch_response_tx, new_records_rx, batch_num_bytes, }; let future = async move { fetch_task.run().await }; - let fetch_task_handle: FetchTaskHandle = tokio::spawn(future); + let fetch_task_handle: JoinHandle<()> = tokio::spawn(future); (fetch_stream, fetch_task_handle) } /// Runs the fetch task. It waits for new records in the log and pushes them into the fetch - /// response channel until `to_position_inclusive` is reached, the shard is closed and - /// `to_position_inclusive` is reached, or the ingester is dropped. It returns the total number - /// of records fetched and the position of the last record fetched. - async fn run(&mut self) -> (u64, Position) { + /// response channel until it reaches the end of the shard marked by an EOF record. + async fn run(&mut self) { debug!( client_id=%self.client_id, index_uid=%self.index_uid, source_id=%self.source_id, shard_id=%self.shard_id, - fetch_range=?self.fetch_range, + from_position_inclusive=%self.from_position_inclusive, "spawning fetch task" ); let mut has_drained_queue = false; let mut has_reached_eof = false; - let mut num_records_total = 0; - while !has_reached_eof && !self.fetch_range.is_empty() { + while !has_reached_eof { if has_drained_queue && self.new_records_rx.changed().await.is_err() { // The shard was dropped. break; @@ -134,7 +129,7 @@ impl FetchTask { let Ok(mrecords) = state_guard .mrecordlog - .range(&self.queue_id, self.fetch_range) + .range(&self.queue_id, self.from_position_inclusive..) else { // The queue was dropped. break; @@ -153,29 +148,37 @@ impl FetchTask { if mrecord_buffer.is_empty() { continue; } + let from_position_exclusive = if self.from_position_inclusive == 0 { + Position::Beginning + } else { + Position::from(self.from_position_inclusive - 1) + }; + self.from_position_inclusive += mrecord_lengths.len() as u64; + let last_mrecord_len = *mrecord_lengths .last() .expect("`mrecord_lengths` should not be empty") as usize; let last_mrecord = &mrecord_buffer[mrecord_buffer.len() - last_mrecord_len..]; - has_reached_eof = is_eof_mrecord(last_mrecord); - + let to_position_inclusive = if is_eof_mrecord(last_mrecord) { + debug!( + client_id=%self.client_id, + index_uid=%self.index_uid, + source_id=%self.source_id, + shard_id=%self.shard_id, + to_position_inclusive=%self.from_position_inclusive - 1, + "fetch stream reached end of shard" + ); + has_reached_eof = true; + Position::Eof + } else { + Position::from(self.from_position_inclusive - 1) + }; let mrecord_batch = MRecordBatch { mrecord_buffer: mrecord_buffer.freeze(), mrecord_lengths, }; - let num_records = mrecord_batch.num_mrecords() as u64; - num_records_total += num_records; - - let from_position_exclusive = self.fetch_range.from_position_exclusive(); - self.fetch_range.advance_by(num_records); - - let to_position_inclusive = if has_reached_eof { - Position::Eof - } else { - self.fetch_range.from_position_exclusive() - }; let fetch_response = FetchResponseV2 { index_uid: self.index_uid.clone().into(), source_id: self.source_id.clone(), @@ -194,25 +197,21 @@ impl FetchTask { break; } } - if !has_reached_eof || !self.fetch_range.is_empty() { + if !has_reached_eof { error!( client_id=%self.client_id, index_uid=%self.index_uid, source_id=%self.source_id, shard_id=%self.shard_id, - "fetch stream ended unexpectedly" + "fetch stream ended before reaching end of shard" ); let _ = self .fetch_response_tx .send(Err(IngestV2Error::Internal( - "fetch stream ended unexpectedly".to_string(), + "fetch stream ended before reaching end of shard".to_string(), ))) .await; } - ( - num_records_total, - self.fetch_range.from_position_exclusive(), - ) } } @@ -263,7 +262,6 @@ impl MultiFetchStream { source_id: SourceId, shard_id: ShardId, from_position_exclusive: Position, - to_position_inclusive: Position, ) -> IngestV2Result<()> { let queue_id = queue_id(index_uid.as_str(), &source_id, shard_id); let entry = self.fetch_task_handles.entry(queue_id.clone()); @@ -282,13 +280,12 @@ impl MultiFetchStream { if let Some(failover_ingester_id) = failover_ingester_id_opt { ingester_ids.push(failover_ingester_id); } - let fetch_task_future = fault_tolerant_fetch_task( + let fetch_task_future = fault_tolerant_fetch_stream_task( self.client_id.clone(), index_uid, source_id, shard_id, from_position_exclusive, - to_position_inclusive, ingester_ids, self.ingester_pool.clone(), self.fetch_response_tx.clone(), @@ -370,13 +367,12 @@ fn select_preferred_and_failover_ingesters( /// Streams records from the preferred ingester and fails over to the other ingester if an error /// occurs. #[allow(clippy::too_many_arguments)] -async fn fault_tolerant_fetch_task( +async fn fault_tolerant_fetch_stream_task( client_id: String, index_uid: IndexUid, source_id: SourceId, shard_id: ShardId, mut from_position_exclusive: Position, - to_position_inclusive: Position, ingester_ids: Vec, ingester_pool: IngesterPool, fetch_response_tx: mpsc::Sender>, @@ -427,7 +423,6 @@ async fn fault_tolerant_fetch_task( source_id: source_id.clone(), shard_id, from_position_exclusive: Some(from_position_exclusive.clone()), - to_position_inclusive: Some(to_position_inclusive.clone()), }; let mut fetch_stream = match ingester.open_fetch_stream(open_fetch_stream_request).await { Ok(fetch_stream) => fetch_stream, @@ -512,62 +507,6 @@ async fn fault_tolerant_fetch_task( } } -#[derive(Debug, Clone, Copy)] -pub(super) struct FetchRange { - from_position_exclusive_opt: Option, - to_position_inclusive_opt: Option, -} - -impl FetchRange { - pub(super) fn new(from_position_exclusive: Position, to_position_inclusive: Position) -> Self { - Self { - from_position_exclusive_opt: from_position_exclusive.as_u64(), - to_position_inclusive_opt: to_position_inclusive.as_u64(), - } - } - - #[allow(clippy::wrong_self_convention)] - fn from_position_exclusive(&self) -> Position { - Position::from(self.from_position_exclusive_opt) - } - - fn is_empty(&self) -> bool { - match ( - self.from_position_exclusive_opt, - self.to_position_inclusive_opt, - ) { - (Some(from_position_exclusive), Some(to_position_inclusive)) => { - from_position_exclusive >= to_position_inclusive - } - _ => false, - } - } - - fn advance_by(&mut self, num_records: u64) { - if let Some(from_position_exclusive) = self.from_position_exclusive_opt { - self.from_position_exclusive_opt = Some(from_position_exclusive + num_records); - } else { - self.from_position_exclusive_opt = Some(num_records - 1); - } - } -} - -impl RangeBounds for FetchRange { - fn start_bound(&self) -> std::ops::Bound<&u64> { - self.from_position_exclusive_opt - .as_ref() - .map(Bound::Excluded) - .unwrap_or(Bound::Unbounded) - } - - fn end_bound(&self) -> std::ops::Bound<&u64> { - self.to_position_inclusive_opt - .as_ref() - .map(Bound::Included) - .unwrap_or(Bound::Unbounded) - } -} - #[cfg(test)] mod tests { use std::time::Duration; @@ -581,36 +520,8 @@ mod tests { use super::*; use crate::MRecord; - #[test] - fn test_fetch_range() { - let mut fetch_range = FetchRange::new(Position::Beginning, Position::Eof); - assert_eq!(fetch_range.start_bound(), Bound::Unbounded); - assert_eq!(fetch_range.end_bound(), Bound::Unbounded); - assert_eq!(fetch_range.from_position_exclusive(), Position::Beginning); - assert!(!fetch_range.is_empty()); - - fetch_range.advance_by(1); - assert_eq!(fetch_range.start_bound(), Bound::Excluded(&0)); - assert_eq!(fetch_range.from_position_exclusive(), 0u64); - assert!(!fetch_range.is_empty()); - - fetch_range.advance_by(10); - assert_eq!(fetch_range.from_position_exclusive(), Position::from(10u64)); - - let mut fetch_range = FetchRange::new(Position::Beginning, Position::from(1u64)); - assert!(!fetch_range.is_empty()); - - fetch_range.advance_by(1); - assert_eq!(fetch_range.from_position_exclusive(), 0u64); - assert!(!fetch_range.is_empty()); - - fetch_range.advance_by(1); - assert_eq!(fetch_range.from_position_exclusive(), 1u64); - assert!(fetch_range.is_empty()); - } - #[tokio::test] - async fn test_fetch_task() { + async fn test_fetch_task_happy_path() { let tempdir = tempfile::tempdir().unwrap(); let mrecordlog = MultiRecordLog::open(tempdir.path()).await.unwrap(); let client_id = "test-client".to_string(); @@ -622,7 +533,6 @@ mod tests { source_id: source_id.clone(), shard_id: 1, from_position_exclusive: None, - to_position_inclusive: None, }; let (new_records_tx, new_records_rx) = watch::channel(()); let state = Arc::new(RwLock::new(IngesterState { @@ -632,7 +542,7 @@ mod tests { replication_streams: HashMap::new(), replication_tasks: HashMap::new(), })); - let (mut fetch_stream, fetch_task_handle) = FetchTask::spawn( + let (mut fetch_stream, fetch_task_handle) = FetchStreamTask::spawn( open_fetch_stream_request, state.clone(), new_records_rx, @@ -773,14 +683,11 @@ mod tests { .mrecord_buffer, "\0\0test-doc-baz\0\0test-doc-qux\0\x02" ); - - let (num_records, last_position) = fetch_task_handle.await.unwrap(); - assert_eq!(num_records, 5); - assert_eq!(last_position, Position::from(4u64)); + fetch_task_handle.await.unwrap(); } #[tokio::test] - async fn test_fetch_task_error() { + async fn test_fetch_task_from_position_exclusive() { let tempdir = tempfile::tempdir().unwrap(); let mrecordlog = MultiRecordLog::open(tempdir.path()).await.unwrap(); let client_id = "test-client".to_string(); @@ -791,10 +698,9 @@ mod tests { index_uid: index_uid.clone(), source_id: source_id.clone(), shard_id: 1, - from_position_exclusive: None, - to_position_inclusive: None, + from_position_exclusive: Some(Position::from(0u64)), }; - let (_new_records_tx, new_records_rx) = watch::channel(()); + let (new_records_tx, new_records_rx) = watch::channel(()); let state = Arc::new(RwLock::new(IngesterState { mrecordlog, shards: HashMap::new(), @@ -802,26 +708,86 @@ mod tests { replication_streams: HashMap::new(), replication_tasks: HashMap::new(), })); - let (mut fetch_stream, fetch_task_handle) = FetchTask::spawn( + let (mut fetch_stream, _fetch_task_handle) = FetchStreamTask::spawn( open_fetch_stream_request, state.clone(), new_records_rx, 1024, ); - let ingest_error = timeout(Duration::from_millis(50), fetch_stream.next()) + let queue_id = queue_id(&index_uid, &source_id, 1); + + let mut state_guard = state.write().await; + + state_guard + .mrecordlog + .create_queue(&queue_id) + .await + .unwrap(); + drop(state_guard); + + new_records_tx.send(()).unwrap(); + + timeout(Duration::from_millis(50), fetch_stream.next()) + .await + .unwrap_err(); + + let mut state_guard = state.write().await; + + state_guard + .mrecordlog + .append_record(&queue_id, None, MRecord::new_doc("test-doc-foo").encode()) + .await + .unwrap(); + drop(state_guard); + + new_records_tx.send(()).unwrap(); + + timeout(Duration::from_millis(50), fetch_stream.next()) + .await + .unwrap_err(); + + let mut state_guard = state.write().await; + + state_guard + .mrecordlog + .append_record(&queue_id, None, MRecord::new_doc("test-doc-bar").encode()) + .await + .unwrap(); + drop(state_guard); + + new_records_tx.send(()).unwrap(); + + let fetch_response = timeout(Duration::from_millis(50), fetch_stream.next()) .await .unwrap() .unwrap() - .unwrap_err(); - assert!(matches!(ingest_error, IngestV2Error::Internal(_))); + .unwrap(); - let (num_records, last_position) = fetch_task_handle.await.unwrap(); - assert_eq!(num_records, 0); - assert_eq!(last_position, Position::Beginning); + assert_eq!(fetch_response.index_uid, "test-index:0"); + assert_eq!(fetch_response.source_id, "test-source"); + assert_eq!(fetch_response.shard_id, 1); + assert_eq!(fetch_response.from_position_exclusive(), 0u64,); + assert_eq!(fetch_response.to_position_inclusive(), 1u64); + assert_eq!( + fetch_response + .mrecord_batch + .as_ref() + .unwrap() + .mrecord_lengths, + [14] + ); + assert_eq!( + fetch_response + .mrecord_batch + .as_ref() + .unwrap() + .mrecord_buffer, + "\0\0test-doc-bar" + ); } #[tokio::test] - async fn test_fetch_task_up_to_position() { + async fn test_fetch_task_error() { let tempdir = tempfile::tempdir().unwrap(); let mrecordlog = MultiRecordLog::open(tempdir.path()).await.unwrap(); let client_id = "test-client".to_string(); @@ -833,8 +799,8 @@ mod tests { source_id: source_id.clone(), shard_id: 1, from_position_exclusive: None, - to_position_inclusive: Some(Position::from(0u64)), }; + let (_new_records_tx, new_records_rx) = watch::channel(()); let state = Arc::new(RwLock::new(IngesterState { mrecordlog, shards: HashMap::new(), @@ -842,49 +808,20 @@ mod tests { replication_streams: HashMap::new(), replication_tasks: HashMap::new(), })); - let (new_records_tx, new_records_rx) = watch::channel(()); - let (mut fetch_stream, fetch_task_handle) = FetchTask::spawn( + let (mut fetch_stream, fetch_task_handle) = FetchStreamTask::spawn( open_fetch_stream_request, state.clone(), new_records_rx, 1024, ); - let queue_id = queue_id(&index_uid, &source_id, 1); - - let mut state_guard = state.write().await; - - state_guard - .mrecordlog - .create_queue(&queue_id) - .await - .unwrap(); - state_guard - .mrecordlog - .append_record(&queue_id, None, Bytes::from_static(b"test-doc-foo")) - .await - .unwrap(); - drop(state_guard); - - new_records_tx.send(()).unwrap(); - - let fetch_response = timeout(Duration::from_millis(50), fetch_stream.next()) + let ingest_error = timeout(Duration::from_millis(50), fetch_stream.next()) .await .unwrap() .unwrap() - .unwrap(); - assert_eq!( - fetch_response - .mrecord_batch - .as_ref() - .unwrap() - .mrecord_lengths - .len(), - 1 - ); + .unwrap_err(); + assert!(matches!(ingest_error, IngestV2Error::Internal(_))); - let (num_records, last_position) = fetch_task_handle.await.unwrap(); - assert_eq!(num_records, 1); - assert_eq!(last_position, 0u64); + fetch_task_handle.await.unwrap(); } #[tokio::test] @@ -900,7 +837,6 @@ mod tests { source_id: source_id.clone(), shard_id: 1, from_position_exclusive: None, - to_position_inclusive: Some(Position::from(2u64)), }; let state = Arc::new(RwLock::new(IngesterState { mrecordlog, @@ -911,7 +847,7 @@ mod tests { })); let (new_records_tx, new_records_rx) = watch::channel(()); let (mut fetch_stream, _fetch_task_handle) = - FetchTask::spawn(open_fetch_stream_request, state.clone(), new_records_rx, 30); + FetchStreamTask::spawn(open_fetch_stream_request, state.clone(), new_records_rx, 30); let queue_id = queue_id(&index_uid, &source_id, 1); let mut state_guard = state.write().await; @@ -1016,7 +952,6 @@ mod tests { let source_id: SourceId = "test-source".into(); let shard_id: ShardId = 1; let from_position_exclusive = Position::from(0u64); - let to_position_inclusive = Position::Eof; let ingester_ids: Vec = vec!["test-ingester-0".into(), "test-ingester-1".into()]; let (fetch_response_tx, mut fetch_stream) = ServiceStream::new_bounded(5); @@ -1034,7 +969,6 @@ mod tests { assert_eq!(request.source_id, "test-source"); assert_eq!(request.shard_id, 1); assert_eq!(request.from_position_exclusive(), 0u64); - assert_eq!(request.to_position_inclusive(), Position::Eof); Ok(service_stream_0) }); @@ -1049,7 +983,6 @@ mod tests { assert_eq!(request.source_id, "test-source"); assert_eq!(request.shard_id, 1); assert_eq!(request.from_position_exclusive(), 1u64); - assert_eq!(request.to_position_inclusive(), Position::Eof); Ok(service_stream_1) }); @@ -1081,13 +1014,12 @@ mod tests { }; service_stream_tx_1.send(Ok(fetch_response)).unwrap(); - fault_tolerant_fetch_task( + fault_tolerant_fetch_stream_task( client_id, index_uid, source_id, shard_id, from_position_exclusive, - to_position_inclusive, ingester_ids, ingester_pool, fetch_response_tx, diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index 0b372e8ebbc..543e89c2505 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -46,7 +46,7 @@ use quickwit_proto::types::{NodeId, Position, QueueId}; use tokio::sync::RwLock; use tracing::{error, info, warn}; -use super::fetch::FetchTask; +use super::fetch::FetchStreamTask; use super::models::{IngesterShard, PrimaryShard}; use super::mrecordlog_utils::{append_eof_record_if_necessary, check_enough_capacity}; use super::rate_limiter::{RateLimiter, RateLimiterSettings}; @@ -555,11 +555,11 @@ impl IngesterService for Ingester { .get(&queue_id) .ok_or_else(|| IngestV2Error::Internal("shard not found".to_string()))? .new_records_rx(); - let (service_stream, _fetch_task_handle) = FetchTask::spawn( + let (service_stream, _fetch_task_handle) = FetchStreamTask::spawn( open_fetch_stream_request, self.state.clone(), new_records_rx, - FetchTask::DEFAULT_BATCH_NUM_BYTES, + FetchStreamTask::DEFAULT_BATCH_NUM_BYTES, ); Ok(service_stream) } @@ -664,7 +664,6 @@ mod tests { use tonic::transport::{Endpoint, Server}; use super::*; - use crate::ingest_v2::fetch::FetchRange; use crate::ingest_v2::mrecord::is_eof_mrecord; use crate::ingest_v2::test_utils::{IngesterShardTestExt, MultiRecordLogTestExt}; @@ -1501,7 +1500,6 @@ mod tests { source_id: "test-source".to_string(), shard_id: 1, from_position_exclusive: None, - to_position_inclusive: None, }; let mut fetch_stream = ingester .open_fetch_stream(open_fetch_stream_request) @@ -1700,9 +1698,7 @@ mod tests { source_id: "test-source:0".to_string(), shard_id: 1, from_position_exclusive: None, - to_position_inclusive: None, }; - let mut fetch_stream = ingester .open_fetch_stream(open_fetch_stream_request) .await @@ -1747,10 +1743,7 @@ mod tests { for queue_id in closed_shards.queue_ids() { let last_position = state_guard .mrecordlog - .range( - &queue_id, - FetchRange::new(Position::Beginning, Position::Beginning), - ) + .range(&queue_id, ..) .unwrap() .last() .unwrap(); diff --git a/quickwit/quickwit-proto/protos/quickwit/ingester.proto b/quickwit/quickwit-proto/protos/quickwit/ingester.proto index 109888c60f8..8b3882611ae 100644 --- a/quickwit/quickwit-proto/protos/quickwit/ingester.proto +++ b/quickwit/quickwit-proto/protos/quickwit/ingester.proto @@ -182,7 +182,6 @@ message OpenFetchStreamRequest { string source_id = 3; uint64 shard_id = 4; quickwit.ingest.Position from_position_exclusive = 5; - quickwit.ingest.Position to_position_inclusive = 6; } message FetchResponseV2 { diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs index 53857e8ac79..f9c9330b018 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs @@ -240,8 +240,6 @@ pub struct OpenFetchStreamRequest { pub shard_id: u64, #[prost(message, optional, tag = "5")] pub from_position_exclusive: ::core::option::Option, - #[prost(message, optional, tag = "6")] - pub to_position_inclusive: ::core::option::Option, } #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] diff --git a/quickwit/quickwit-proto/src/ingest/ingester.rs b/quickwit/quickwit-proto/src/ingest/ingester.rs index 24d3aa1410a..2168e35c4f5 100644 --- a/quickwit/quickwit-proto/src/ingest/ingester.rs +++ b/quickwit/quickwit-proto/src/ingest/ingester.rs @@ -45,10 +45,6 @@ impl OpenFetchStreamRequest { pub fn from_position_exclusive(&self) -> Position { self.from_position_exclusive.clone().unwrap_or_default() } - - pub fn to_position_inclusive(&self) -> Position { - self.to_position_inclusive.clone().unwrap_or_default() - } } impl PersistSubrequest {