diff --git a/analytic_engine/src/instance/open.rs b/analytic_engine/src/instance/open.rs index c6d1130bd2..0c371addc4 100644 --- a/analytic_engine/src/instance/open.rs +++ b/analytic_engine/src/instance/open.rs @@ -273,7 +273,7 @@ impl Instance { read_ctx: &ReadContext, ) -> Result<()> { let read_req = ReadRequest { - location: table_data.wal_location(), + wal_location: table_data.wal_location(), start: ReadBoundary::Min, end: ReadBoundary::Max, }; diff --git a/analytic_engine/src/meta/details.rs b/analytic_engine/src/meta/details.rs index 553c0aa9be..70b2810b7f 100644 --- a/analytic_engine/src/meta/details.rs +++ b/analytic_engine/src/meta/details.rs @@ -324,7 +324,7 @@ impl MetaUpdateLogStore for RegionWal { async fn scan(&self, start: ReadBoundary, end: ReadBoundary) -> Result { let ctx = ReadContext::default(); let read_req = ReadRequest { - location: self.wal_location, + wal_location: self.wal_location, start, end, }; diff --git a/analytic_engine/src/wal_synchronizer.rs b/analytic_engine/src/wal_synchronizer.rs index efc0fb5f72..681ed25f57 100644 --- a/analytic_engine/src/wal_synchronizer.rs +++ b/analytic_engine/src/wal_synchronizer.rs @@ -315,7 +315,7 @@ struct SynchronizeState { impl SynchronizeState { pub fn read_req(&self) -> ReadRequest { ReadRequest { - location: self.wal_location, + wal_location: self.wal_location, start: ReadBoundary::Excluded(self.last_synced_seq.load(Ordering::Relaxed)), end: ReadBoundary::Max, } diff --git a/wal/src/kv_encoder.rs b/wal/src/kv_encoder.rs index de5cb38d6c..1075d28385 100644 --- a/wal/src/kv_encoder.rs +++ b/wal/src/kv_encoder.rs @@ -523,22 +523,22 @@ impl LogEncoding { /// LogBatchEncoder which are used to encode specify payloads. #[derive(Debug)] pub struct LogBatchEncoder { - location: WalLocation, + wal_location: WalLocation, log_encoding: LogEncoding, } impl LogBatchEncoder { /// Create LogBatchEncoder with specific region_id. - pub fn create(location: WalLocation) -> Self { + pub fn create(wal_location: WalLocation) -> Self { Self { - location, + wal_location, log_encoding: LogEncoding::newest(), } } /// Consume LogBatchEncoder and encode single payload to LogWriteBatch. pub fn encode(self, payload: &impl Payload) -> manager::Result { - let mut write_batch = LogWriteBatch::new(self.location); + let mut write_batch = LogWriteBatch::new(self.wal_location); let mut buf = BytesMut::new(); self.log_encoding .encode_value(&mut buf, payload) @@ -562,7 +562,7 @@ impl LogBatchEncoder { where &'a I: Into

, { - let mut write_batch = LogWriteBatch::new(self.location); + let mut write_batch = LogWriteBatch::new(self.wal_location); let mut buf = BytesMut::new(); for raw_payload in raw_payload_batch.iter() { self.log_encoding diff --git a/wal/src/manager.rs b/wal/src/manager.rs index a8971e1660..9459042b78 100644 --- a/wal/src/manager.rs +++ b/wal/src/manager.rs @@ -224,7 +224,7 @@ impl ReadBoundary { #[derive(Debug, Clone)] pub struct ReadRequest { /// WalLocation of the wal to read - pub location: WalLocation, + pub wal_location: WalLocation, // TODO(yingwen): Or just rename to ReadBound? /// Start bound pub start: ReadBoundary, @@ -235,7 +235,7 @@ pub struct ReadRequest { #[derive(Debug, Clone)] pub struct ScanRequest { /// WalLocation of the wal to read - pub location: WalLocation, + pub wal_location: WalLocation, } pub type ScanContext = ReadContext; diff --git a/wal/src/rocks_impl/manager.rs b/wal/src/rocks_impl/manager.rs index 5b6f69a256..4c8f806851 100644 --- a/wal/src/rocks_impl/manager.rs +++ b/wal/src/rocks_impl/manager.rs @@ -630,7 +630,7 @@ impl WalManager for RocksImpl { ctx: &ReadContext, req: &ReadRequest, ) -> Result { - let blocking_iter = if let Some(region) = self.region(req.location.table_id) { + let blocking_iter = if let Some(region) = self.region(req.wal_location.table_id) { region.read(ctx, req)? } else { let iter = DBIterator::new(self.db.clone(), ReadOptions::default()); diff --git a/wal/src/table_kv_impl/namespace.rs b/wal/src/table_kv_impl/namespace.rs index cc283d5f49..26a2ec5cf0 100644 --- a/wal/src/table_kv_impl/namespace.rs +++ b/wal/src/table_kv_impl/namespace.rs @@ -584,7 +584,7 @@ impl NamespaceInner { // buckets. let buckets = self.list_buckets(); - let region_id = req.location.table_id; + let region_id = req.wal_location.table_id; if let Some(region) = self.get_or_open_region(region_id).await? { region .read_log(&self.table_kv, buckets, ctx, req) diff --git a/wal/src/tests/read_write.rs b/wal/src/tests/read_write.rs index 8ff76adb12..235b64bb86 100644 --- a/wal/src/tests/read_write.rs +++ b/wal/src/tests/read_write.rs @@ -29,12 +29,12 @@ async fn check_write_batch_with_read_request( async fn check_write_batch( env: &TestEnv, wal: WalManagerRef, - location: WalLocation, + wal_location: WalLocation, max_seq: SequenceNumber, payload_batch: &[TestPayload], ) { let read_req = ReadRequest { - location, + wal_location, start: ReadBoundary::Included(max_seq + 1 - payload_batch.len() as u64), end: ReadBoundary::Included(max_seq), }; @@ -44,26 +44,26 @@ async fn check_write_batch( async fn simple_read_write_with_wal( env: impl Deref>, wal: WalManagerRef, - location: WalLocation, + wal_location: WalLocation, ) { - let (payload_batch, write_batch) = env.build_log_batch(wal.clone(), location, 0, 10).await; + let (payload_batch, write_batch) = env.build_log_batch(wal.clone(), wal_location, 0, 10).await; let seq = wal .write(&env.write_ctx, &write_batch) .await .expect("should succeed to write"); - check_write_batch(&env, wal, location, seq, &payload_batch).await + check_write_batch(&env, wal, wal_location, seq, &payload_batch).await } -async fn simple_read_write(env: &TestEnv, location: WalLocation) { +async fn simple_read_write(env: &TestEnv, wal_location: WalLocation) { let wal = env.build_wal().await; // Empty region has 0 sequence num. - let last_seq = wal.sequence_num(location).await.unwrap(); + let last_seq = wal.sequence_num(wal_location).await.unwrap(); assert_eq!(0, last_seq); - simple_read_write_with_wal(env, wal.clone(), location).await; + simple_read_write_with_wal(env, wal.clone(), wal_location).await; - let last_seq = wal.sequence_num(location).await.unwrap(); + let last_seq = wal.sequence_num(wal_location).await.unwrap(); assert_eq!(10, last_seq); wal.close_gracefully().await.unwrap(); @@ -72,14 +72,14 @@ async fn simple_read_write(env: &TestEnv, location: WalLocatio /// Test the read with different kinds of boundaries. async fn read_with_boundary(env: &TestEnv) { let wal = env.build_wal().await; - let location = WalLocation::new(0, 0); - let (payload_batch, write_batch) = env.build_log_batch(wal.clone(), location, 0, 10).await; + let wal_location = WalLocation::new(0, 0); + let (payload_batch, write_batch) = env.build_log_batch(wal.clone(), wal_location, 0, 10).await; let end_seq = wal .write(&env.write_ctx, &write_batch) .await .expect("should succeed to write"); - let last_seq = wal.sequence_num(location).await.unwrap(); + let last_seq = wal.sequence_num(wal_location).await.unwrap(); assert_eq!(end_seq, last_seq); let start_seq = end_seq + 1 - write_batch.entries.len() as u64; @@ -87,7 +87,7 @@ async fn read_with_boundary(env: &TestEnv) { // [min, max] { let read_req = ReadRequest { - location, + wal_location, start: ReadBoundary::Min, end: ReadBoundary::Max, }; @@ -98,7 +98,7 @@ async fn read_with_boundary(env: &TestEnv) { // [0, 10] { let read_req = ReadRequest { - location, + wal_location, start: ReadBoundary::Included(start_seq), end: ReadBoundary::Included(end_seq), }; @@ -109,7 +109,7 @@ async fn read_with_boundary(env: &TestEnv) { // (0, 10] { let read_req = ReadRequest { - location, + wal_location, start: ReadBoundary::Excluded(start_seq), end: ReadBoundary::Included(end_seq), }; @@ -121,7 +121,7 @@ async fn read_with_boundary(env: &TestEnv) { // [0, 10) { let read_req = ReadRequest { - location, + wal_location, start: ReadBoundary::Included(start_seq), end: ReadBoundary::Excluded(end_seq), }; @@ -139,7 +139,7 @@ async fn read_with_boundary(env: &TestEnv) { // (0, 10) { let read_req = ReadRequest { - location, + wal_location, start: ReadBoundary::Excluded(start_seq), end: ReadBoundary::Excluded(end_seq), }; @@ -213,7 +213,7 @@ async fn reopen(env: &TestEnv) { assert_eq!(seq, last_seq); let read_req = ReadRequest { - location: WalLocation::new(table_id, table_id), + wal_location: WalLocation::new(table_id, table_id), start: ReadBoundary::Included(seq + 1 - write_batch.entries.len() as u64), end: ReadBoundary::Included(seq), }; @@ -342,7 +342,7 @@ async fn simple_write_delete(env: &TestEnv) { .await .expect("should succeed to delete"); let read_req = ReadRequest { - location: WalLocation::new(table_id, table_id), + wal_location: WalLocation::new(table_id, table_id), start: ReadBoundary::Min, end: ReadBoundary::Max, }; @@ -387,7 +387,7 @@ async fn write_delete_half(env: &TestEnv) { .await .expect("should succeed to delete"); let read_req = ReadRequest { - location: WalLocation::new(table_id, table_id), + wal_location: WalLocation::new(table_id, table_id), start: ReadBoundary::Min, end: ReadBoundary::Max, }; @@ -439,7 +439,7 @@ async fn write_delete_multiple_regions(env: &TestEnv) { .await .expect("should succeed to delete"); let read_req = ReadRequest { - location: WalLocation::new(table_id_1, table_id_1), + wal_location: WalLocation::new(table_id_1, table_id_1), start: ReadBoundary::Min, end: ReadBoundary::Max, };