Skip to content

Commit

Permalink
refactor: modify exist wal (#288)
Browse files Browse the repository at this point in the history
* add WalLocation.

* add scan to WalManager.

* unify the variable name to `wal_location`.

* make use of the region_id in wal_location.

* add `shard_id` in `TableData` and modidy related path to pass it to `TableData` while open/create/alter table, especially things about manifest.

* refactor wal's `ScanRequest`.

* refactor by cr.
  • Loading branch information
Rachelint authored Oct 13, 2022
1 parent 13bb026 commit a5de2d0
Show file tree
Hide file tree
Showing 30 changed files with 648 additions and 299 deletions.
18 changes: 9 additions & 9 deletions analytic_engine/src/instance/alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::{
write_worker::{AlterOptionsCommand, AlterSchemaCommand, WorkerLocal},
Instance,
},
meta::meta_update::{AlterOptionsMeta, AlterSchemaMeta, MetaUpdate},
meta::meta_update::{AlterOptionsMeta, AlterSchemaMeta, MetaUpdate, MetaUpdateRequest},
payload::WritePayload,
space::SpaceAndTable,
table::data::TableDataRef,
Expand Down Expand Up @@ -122,19 +122,19 @@ impl Instance {
let payload = WritePayload::AlterSchema(&alter_schema_pb);

// Encode payloads
let region_id = table_data.wal_region_id();
let region_id = table_data.location();
let log_batch_encoder =
self.space_store
.wal_manager
.encoder(region_id)
.context(GetLogBatchEncoder {
table: &table_data.name,
region_id: table_data.wal_region_id(),
table_location: table_data.location(),
})?;

let log_batch = log_batch_encoder.encode(&payload).context(EncodePayloads {
table: &table_data.name,
region_id: table_data.wal_region_id(),
table_location: table_data.location(),
})?;

// Write log batch
Expand All @@ -159,7 +159,7 @@ impl Instance {
let update = MetaUpdate::AlterSchema(manifest_update);
self.space_store
.manifest
.store_update(update)
.store_update(MetaUpdateRequest::new(table_data.location(), update))
.await
.context(WriteManifest {
space_id: table_data.space_id,
Expand Down Expand Up @@ -290,19 +290,19 @@ impl Instance {
let payload = WritePayload::AlterOption(&alter_options_pb);

// Encode payload
let region_id = table_data.wal_region_id();
let region_id = table_data.location();
let log_batch_encoder =
self.space_store
.wal_manager
.encoder(region_id)
.context(GetLogBatchEncoder {
table: &table_data.name,
region_id: table_data.wal_region_id(),
table_location: table_data.location(),
})?;

let log_batch = log_batch_encoder.encode(&payload).context(EncodePayloads {
table: &table_data.name,
region_id: table_data.wal_region_id(),
table_location: table_data.location(),
})?;

// Write log batch
Expand All @@ -322,7 +322,7 @@ impl Instance {
let meta_update = MetaUpdate::AlterOptions(manifest_update);
self.space_store
.manifest
.store_update(meta_update)
.store_update(MetaUpdateRequest::new(table_data.location(), meta_update))
.await
.context(WriteManifest {
space_id: table_data.space_id,
Expand Down
4 changes: 2 additions & 2 deletions analytic_engine/src/instance/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::{
write_worker::{self, CreateTableCommand, WorkerLocal},
Instance,
},
meta::meta_update::{AddTableMeta, MetaUpdate},
meta::meta_update::{AddTableMeta, MetaUpdate, MetaUpdateRequest},
space::SpaceRef,
table::data::{TableData, TableDataRef},
table_options,
Expand Down Expand Up @@ -104,7 +104,7 @@ impl Instance {
});
self.space_store
.manifest
.store_update(update)
.store_update(MetaUpdateRequest::new(table_data.location(), update))
.await
.context(WriteManifest {
space_id: space.id,
Expand Down
4 changes: 2 additions & 2 deletions analytic_engine/src/instance/drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::{
write_worker::{self, DropTableCommand, WorkerLocal},
Instance,
},
meta::meta_update::{DropTableMeta, MetaUpdate},
meta::meta_update::{DropTableMeta, MetaUpdate, MetaUpdateRequest},
space::SpaceRef,
};

Expand Down Expand Up @@ -111,7 +111,7 @@ impl Instance {
});
self.space_store
.manifest
.store_update(update)
.store_update(MetaUpdateRequest::new(table_data.location(), update))
.await
.context(WriteManifest {
space_id: space.id,
Expand Down
17 changes: 8 additions & 9 deletions analytic_engine/src/instance/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@

use std::sync::Arc;

use common_types::schema::Version;
use common_types::{schema::Version, table::Location};
use common_util::define_result;
use snafu::{Backtrace, OptionExt, Snafu};
use table_engine::{
engine::{CloseTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest},
table::TableId,
};
use wal::manager::RegionId;

use crate::{
instance::{write_worker::WriteGroup, Instance},
Expand Down Expand Up @@ -192,26 +191,26 @@ pub enum Error {
},

#[snafu(display(
"Failed to get to log batch encoder, table:{}, region_id:{}, err:{}",
"Failed to get to log batch encoder, table:{}, table_location:{:?}, err:{}",
table,
region_id,
table_location,
source
))]
GetLogBatchEncoder {
table: String,
region_id: RegionId,
table_location: Location,
source: wal::manager::Error,
},

#[snafu(display(
"Failed to encode payloads, table:{},region_id:{}, err:{}",
"Failed to encode payloads, table:{},table_location:{:?}, err:{}",
table,
region_id,
table_location,
source
))]
EncodePayloads {
table: String,
region_id: RegionId,
table_location: Location,
source: wal::manager::Error,
},
}
Expand Down Expand Up @@ -332,7 +331,7 @@ impl Instance {
) -> Result<Option<SpaceAndTable>> {
let space = self.find_or_create_space(space_id).await?;

let table_data = self.do_open_table(space.clone(), request.table_id).await?;
let table_data = self.do_open_table(space.clone(), request).await?;

Ok(table_data.map(|v| SpaceAndTable::new(space, v)))
}
Expand Down
22 changes: 13 additions & 9 deletions analytic_engine/src/instance/flush_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use common_types::{
record_batch::{RecordBatchWithKey, RecordBatchWithKeyBuilder},
request_id::RequestId,
row::RowViewOnBatch,
table::Location,
time::TimeRange,
SequenceNumber,
};
Expand All @@ -22,7 +23,6 @@ use log::{error, info};
use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
use table_engine::{predicate::Predicate, table::Result as TableResult};
use tokio::sync::oneshot;
use wal::manager::RegionId;

use crate::{
compaction::{
Expand All @@ -33,7 +33,7 @@ use crate::{
Instance, SpaceStore,
},
memtable::{ColumnarIterPtr, MemTableRef, ScanContext, ScanRequest},
meta::meta_update::{AlterOptionsMeta, MetaUpdate, VersionEditMeta},
meta::meta_update::{AlterOptionsMeta, MetaUpdate, MetaUpdateRequest, VersionEditMeta},
row_iter::{
self,
dedup::DedupIterator,
Expand Down Expand Up @@ -63,9 +63,13 @@ pub enum Error {
source: Box<dyn std::error::Error + Send + Sync>,
},

#[snafu(display("Failed to purge wal, region_id:{}, sequence:{}", region_id, sequence))]
#[snafu(display(
"Failed to purge wal, table_location:{:?}, sequence:{}",
table_location,
sequence
))]
PurgeWal {
region_id: RegionId,
table_location: Location,
sequence: SequenceNumber,
source: wal::manager::Error,
},
Expand Down Expand Up @@ -296,7 +300,7 @@ impl Instance {
});
self.space_store
.manifest
.store_update(meta_update)
.store_update(MetaUpdateRequest::new(table_data.location(), meta_update))
.await
.context(StoreVersionEdit)?;

Expand Down Expand Up @@ -536,7 +540,7 @@ impl Instance {
let meta_update = MetaUpdate::VersionEdit(edit_meta);
self.space_store
.manifest
.store_update(meta_update)
.store_update(MetaUpdateRequest::new(table_data.location(), meta_update))
.await
.context(StoreVersionEdit)?;

Expand All @@ -553,10 +557,10 @@ impl Instance {
// Mark sequence <= flushed_sequence to be deleted.
self.space_store
.wal_manager
.mark_delete_entries_up_to(table_data.wal_region_id(), flushed_sequence)
.mark_delete_entries_up_to(table_data.location(), flushed_sequence)
.await
.context(PurgeWal {
region_id: table_data.wal_region_id(),
table_location: table_data.location(),
sequence: flushed_sequence,
})?;

Expand Down Expand Up @@ -815,7 +819,7 @@ impl SpaceStore {

let meta_update = MetaUpdate::VersionEdit(edit_meta.clone());
self.manifest
.store_update(meta_update)
.store_update(MetaUpdateRequest::new(table_data.location(), meta_update))
.await
.context(StoreVersionEdit)?;

Expand Down
35 changes: 21 additions & 14 deletions analytic_engine/src/instance/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ use std::{
sync::{Arc, RwLock},
};

use common_types::schema::IndexInWriterSchema;
use common_types::{schema::IndexInWriterSchema, table::Location};
use log::{debug, error, info, trace, warn};
use object_store::ObjectStoreRef;
use snafu::ResultExt;
use table_engine::table::TableId;
use table_engine::engine::OpenTableRequest;
use tokio::sync::oneshot;
use wal::{
log_batch::LogEntry,
Expand Down Expand Up @@ -130,12 +130,12 @@ impl Instance {
pub async fn do_open_table(
self: &Arc<Self>,
space: SpaceRef,
table_id: TableId,
request: &OpenTableRequest,
) -> Result<Option<TableDataRef>> {
if let Some(table_data) = space.find_table_by_id(table_id) {
if let Some(table_data) = space.find_table_by_id(request.table_id) {
return Ok(Some(table_data));
}
let table_data = match self.recover_table_meta_data(table_id).await? {
let table_data = match self.recover_table_meta_data(request).await? {
Some(v) => v,
None => return Ok(None),
};
Expand Down Expand Up @@ -196,33 +196,39 @@ impl Instance {
/// Return None if no meta data is found for the table.
async fn recover_table_meta_data(
self: &Arc<Self>,
table_id: TableId,
request: &OpenTableRequest,
) -> Result<Option<TableDataRef>> {
info!("Instance recover table:{} meta begin", table_id);
info!("Instance recover table:{} meta begin", request.table_id);

// Load manifest, also create a new snapshot at startup.
let manifest_data = self
.space_store
.manifest
.load_data(table_id, true)
.load_data(
Location::new(request.shard_id, request.table_id.as_u64()),
true,
)
.await
.context(ReadMetaUpdate { table_id })?;
.context(ReadMetaUpdate {
table_id: request.table_id,
})?;

let table_data = if let Some(manifest_data) = manifest_data {
Some(self.apply_table_manifest_data(manifest_data).await?)
Some(self.recover_table_data(manifest_data, request).await?)
} else {
None
};

info!("Instance recover table:{} meta end", table_id);
info!("Instance recover table:{} meta end", request.table_id);

Ok(table_data)
}

/// Apply manifest data to instance
async fn apply_table_manifest_data(
/// Recover `TableData` by applying manifest data to instance
async fn recover_table_data(
self: &Arc<Self>,
manifest_data: TableManifestData,
request: &OpenTableRequest,
) -> Result<TableDataRef> {
let TableManifestData {
table_meta,
Expand All @@ -243,6 +249,7 @@ impl Instance {
write_handle,
&self.file_purger,
space.mem_usage_collector.clone(),
request.shard_id,
)
.context(RecoverTableData {
space_id: table_meta.space_id,
Expand Down Expand Up @@ -273,7 +280,7 @@ impl Instance {
read_ctx: &ReadContext,
) -> Result<()> {
let read_req = ReadRequest {
region_id: table_data.wal_region_id(),
location: table_data.location(),
start: ReadBoundary::Min,
end: ReadBoundary::Max,
};
Expand Down
Loading

0 comments on commit a5de2d0

Please sign in to comment.