Skip to content

Commit

Permalink
add WalLocation.
Browse files Browse the repository at this point in the history
  • Loading branch information
Rachelint committed Oct 10, 2022
1 parent a8fb2c9 commit f40d064
Show file tree
Hide file tree
Showing 19 changed files with 394 additions and 229 deletions.
12 changes: 6 additions & 6 deletions analytic_engine/src/instance/alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,19 +122,19 @@ impl Instance {
let payload = WritePayload::AlterSchema(&alter_schema_pb);

// Encode payloads
let region_id = table_data.wal_region_id();
let region_id = table_data.wal_location();
let log_batch_encoder =
self.space_store
.wal_manager
.encoder(region_id)
.context(GetLogBatchEncoder {
table: &table_data.name,
region_id: table_data.wal_region_id(),
wal_location: table_data.wal_location(),
})?;

let log_batch = log_batch_encoder.encode(&payload).context(EncodePayloads {
table: &table_data.name,
region_id: table_data.wal_region_id(),
wal_location: table_data.wal_location(),
})?;

// Write log batch
Expand Down Expand Up @@ -290,19 +290,19 @@ impl Instance {
let payload = WritePayload::AlterOption(&alter_options_pb);

// Encode payload
let region_id = table_data.wal_region_id();
let region_id = table_data.wal_location();
let log_batch_encoder =
self.space_store
.wal_manager
.encoder(region_id)
.context(GetLogBatchEncoder {
table: &table_data.name,
region_id: table_data.wal_region_id(),
wal_location: table_data.wal_location(),
})?;

let log_batch = log_batch_encoder.encode(&payload).context(EncodePayloads {
table: &table_data.name,
region_id: table_data.wal_region_id(),
wal_location: table_data.wal_location(),
})?;

// Write log batch
Expand Down
14 changes: 7 additions & 7 deletions analytic_engine/src/instance/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use table_engine::{
engine::{CloseTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest},
table::TableId,
};
use wal::manager::RegionId;
use wal::manager::WalLocation;

use crate::{
instance::{write_worker::WriteGroup, Instance},
Expand Down Expand Up @@ -192,26 +192,26 @@ pub enum Error {
},

#[snafu(display(
"Failed to get to log batch encoder, table:{}, region_id:{}, err:{}",
"Failed to get to log batch encoder, table:{}, wal_location:{:?}, err:{}",
table,
region_id,
wal_location,
source
))]
GetLogBatchEncoder {
table: String,
region_id: RegionId,
wal_location: WalLocation,
source: wal::manager::Error,
},

#[snafu(display(
"Failed to encode payloads, table:{},region_id:{}, err:{}",
"Failed to encode payloads, table:{},wal_location:{:?}, err:{}",
table,
region_id,
wal_location,
source
))]
EncodePayloads {
table: String,
region_id: RegionId,
wal_location: WalLocation,
source: wal::manager::Error,
},
}
Expand Down
14 changes: 9 additions & 5 deletions analytic_engine/src/instance/flush_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use log::{error, info};
use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
use table_engine::{predicate::Predicate, table::Result as TableResult};
use tokio::sync::oneshot;
use wal::manager::RegionId;
use wal::manager::WalLocation;

use crate::{
compaction::{
Expand Down Expand Up @@ -63,9 +63,13 @@ pub enum Error {
source: Box<dyn std::error::Error + Send + Sync>,
},

#[snafu(display("Failed to purge wal, region_id:{}, sequence:{}", region_id, sequence))]
#[snafu(display(
"Failed to purge wal, wal_location:{:?}, sequence:{}",
wal_location,
sequence
))]
PurgeWal {
region_id: RegionId,
wal_location: WalLocation,
sequence: SequenceNumber,
source: wal::manager::Error,
},
Expand Down Expand Up @@ -556,10 +560,10 @@ impl Instance {
// Mark sequence <= flushed_sequence to be deleted.
self.space_store
.wal_manager
.mark_delete_entries_up_to(table_data.wal_region_id(), flushed_sequence)
.mark_delete_entries_up_to(table_data.wal_location(), flushed_sequence)
.await
.context(PurgeWal {
region_id: table_data.wal_region_id(),
wal_location: table_data.wal_location(),
sequence: flushed_sequence,
})?;

Expand Down
2 changes: 1 addition & 1 deletion analytic_engine/src/instance/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ impl Instance {
read_ctx: &ReadContext,
) -> Result<()> {
let read_req = ReadRequest {
region_id: table_data.wal_region_id(),
location: table_data.wal_location(),
start: ReadBoundary::Min,
end: ReadBoundary::Max,
};
Expand Down
20 changes: 10 additions & 10 deletions analytic_engine/src/instance/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use smallvec::SmallVec;
use snafu::{ensure, Backtrace, ResultExt, Snafu};
use table_engine::table::WriteRequest;
use tokio::sync::oneshot;
use wal::manager::{RegionId, SequenceNumber, WriteContext};
use wal::manager::{SequenceNumber, WalLocation, WriteContext};

use crate::{
instance::{
Expand All @@ -37,26 +37,26 @@ use crate::{
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display(
"Failed to get to log batch encoder, table:{}, region_id:{}, err:{}",
"Failed to get to log batch encoder, table:{}, wal_location:{:?}, err:{}",
table,
region_id,
wal_location,
source
))]
GetLogBatchEncoder {
table: String,
region_id: RegionId,
wal_location: WalLocation,
source: wal::manager::Error,
},

#[snafu(display(
"Failed to encode payloads, table:{}, region_id:{}, err:{}",
"Failed to encode payloads, table:{}, wal_location:{:?}, err:{}",
table,
region_id,
wal_location,
source
))]
EncodePayloads {
table: String,
region_id: RegionId,
wal_location: WalLocation,
source: wal::manager::Error,
},

Expand Down Expand Up @@ -398,18 +398,18 @@ impl Instance {

// Encode payload
let payload = WritePayload::Write(&write_req_pb);
let region_id = table_data.wal_region_id();
let region_id = table_data.wal_location();
let log_batch_encoder =
self.space_store
.wal_manager
.encoder(region_id)
.context(GetLogBatchEncoder {
table: &table_data.name,
region_id: table_data.wal_region_id(),
wal_location: table_data.wal_location(),
})?;
let log_batch = log_batch_encoder.encode(&payload).context(EncodePayloads {
table: &table_data.name,
region_id: table_data.wal_region_id(),
wal_location: table_data.wal_location(),
})?;

// Write to wal manager
Expand Down
Loading

0 comments on commit f40d064

Please sign in to comment.