Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: modify exist wal #288

Merged
merged 7 commits into from
Oct 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions analytic_engine/src/instance/alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::{
write_worker::{AlterOptionsCommand, AlterSchemaCommand, WorkerLocal},
Instance,
},
meta::meta_update::{AlterOptionsMeta, AlterSchemaMeta, MetaUpdate},
meta::meta_update::{AlterOptionsMeta, AlterSchemaMeta, MetaUpdate, MetaUpdateRequest},
payload::WritePayload,
space::SpaceAndTable,
table::data::TableDataRef,
Expand Down Expand Up @@ -122,19 +122,19 @@ impl Instance {
let payload = WritePayload::AlterSchema(&alter_schema_pb);

// Encode payloads
let region_id = table_data.wal_region_id();
let region_id = table_data.location();
let log_batch_encoder =
self.space_store
.wal_manager
.encoder(region_id)
.context(GetLogBatchEncoder {
table: &table_data.name,
region_id: table_data.wal_region_id(),
table_location: table_data.location(),
})?;

let log_batch = log_batch_encoder.encode(&payload).context(EncodePayloads {
table: &table_data.name,
region_id: table_data.wal_region_id(),
table_location: table_data.location(),
})?;

// Write log batch
Expand All @@ -159,7 +159,7 @@ impl Instance {
let update = MetaUpdate::AlterSchema(manifest_update);
self.space_store
.manifest
.store_update(update)
.store_update(MetaUpdateRequest::new(table_data.location(), update))
.await
.context(WriteManifest {
space_id: table_data.space_id,
Expand Down Expand Up @@ -290,19 +290,19 @@ impl Instance {
let payload = WritePayload::AlterOption(&alter_options_pb);

// Encode payload
let region_id = table_data.wal_region_id();
let region_id = table_data.location();
let log_batch_encoder =
self.space_store
.wal_manager
.encoder(region_id)
.context(GetLogBatchEncoder {
table: &table_data.name,
region_id: table_data.wal_region_id(),
table_location: table_data.location(),
})?;

let log_batch = log_batch_encoder.encode(&payload).context(EncodePayloads {
table: &table_data.name,
region_id: table_data.wal_region_id(),
table_location: table_data.location(),
})?;

// Write log batch
Expand All @@ -322,7 +322,7 @@ impl Instance {
let meta_update = MetaUpdate::AlterOptions(manifest_update);
self.space_store
.manifest
.store_update(meta_update)
.store_update(MetaUpdateRequest::new(table_data.location(), meta_update))
.await
.context(WriteManifest {
space_id: table_data.space_id,
Expand Down
4 changes: 2 additions & 2 deletions analytic_engine/src/instance/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::{
write_worker::{self, CreateTableCommand, WorkerLocal},
Instance,
},
meta::meta_update::{AddTableMeta, MetaUpdate},
meta::meta_update::{AddTableMeta, MetaUpdate, MetaUpdateRequest},
space::SpaceRef,
table::data::{TableData, TableDataRef},
table_options,
Expand Down Expand Up @@ -104,7 +104,7 @@ impl Instance {
});
self.space_store
.manifest
.store_update(update)
.store_update(MetaUpdateRequest::new(table_data.location(), update))
.await
.context(WriteManifest {
space_id: space.id,
Expand Down
4 changes: 2 additions & 2 deletions analytic_engine/src/instance/drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::{
write_worker::{self, DropTableCommand, WorkerLocal},
Instance,
},
meta::meta_update::{DropTableMeta, MetaUpdate},
meta::meta_update::{DropTableMeta, MetaUpdate, MetaUpdateRequest},
space::SpaceRef,
};

Expand Down Expand Up @@ -111,7 +111,7 @@ impl Instance {
});
self.space_store
.manifest
.store_update(update)
.store_update(MetaUpdateRequest::new(table_data.location(), update))
.await
.context(WriteManifest {
space_id: space.id,
Expand Down
17 changes: 8 additions & 9 deletions analytic_engine/src/instance/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@

use std::sync::Arc;

use common_types::schema::Version;
use common_types::{schema::Version, table::Location};
use common_util::define_result;
use snafu::{Backtrace, OptionExt, Snafu};
use table_engine::{
engine::{CloseTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest},
table::TableId,
};
use wal::manager::RegionId;

use crate::{
instance::{write_worker::WriteGroup, Instance},
Expand Down Expand Up @@ -192,26 +191,26 @@ pub enum Error {
},

#[snafu(display(
"Failed to get to log batch encoder, table:{}, region_id:{}, err:{}",
"Failed to get to log batch encoder, table:{}, table_location:{:?}, err:{}",
table,
region_id,
table_location,
source
))]
GetLogBatchEncoder {
table: String,
region_id: RegionId,
table_location: Location,
source: wal::manager::Error,
},

#[snafu(display(
"Failed to encode payloads, table:{},region_id:{}, err:{}",
"Failed to encode payloads, table:{},table_location:{:?}, err:{}",
table,
region_id,
table_location,
source
))]
EncodePayloads {
table: String,
region_id: RegionId,
table_location: Location,
source: wal::manager::Error,
},
}
Expand Down Expand Up @@ -332,7 +331,7 @@ impl Instance {
) -> Result<Option<SpaceAndTable>> {
let space = self.find_or_create_space(space_id).await?;

let table_data = self.do_open_table(space.clone(), request.table_id).await?;
let table_data = self.do_open_table(space.clone(), request).await?;

Ok(table_data.map(|v| SpaceAndTable::new(space, v)))
}
Expand Down
22 changes: 13 additions & 9 deletions analytic_engine/src/instance/flush_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use common_types::{
record_batch::{RecordBatchWithKey, RecordBatchWithKeyBuilder},
request_id::RequestId,
row::RowViewOnBatch,
table::Location,
time::TimeRange,
SequenceNumber,
};
Expand All @@ -22,7 +23,6 @@ use log::{error, info};
use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
use table_engine::{predicate::Predicate, table::Result as TableResult};
use tokio::sync::oneshot;
use wal::manager::RegionId;

use crate::{
compaction::{
Expand All @@ -33,7 +33,7 @@ use crate::{
Instance, SpaceStore,
},
memtable::{ColumnarIterPtr, MemTableRef, ScanContext, ScanRequest},
meta::meta_update::{AlterOptionsMeta, MetaUpdate, VersionEditMeta},
meta::meta_update::{AlterOptionsMeta, MetaUpdate, MetaUpdateRequest, VersionEditMeta},
row_iter::{
self,
dedup::DedupIterator,
Expand Down Expand Up @@ -63,9 +63,13 @@ pub enum Error {
source: Box<dyn std::error::Error + Send + Sync>,
},

#[snafu(display("Failed to purge wal, region_id:{}, sequence:{}", region_id, sequence))]
#[snafu(display(
"Failed to purge wal, table_location:{:?}, sequence:{}",
table_location,
sequence
))]
PurgeWal {
region_id: RegionId,
table_location: Location,
sequence: SequenceNumber,
source: wal::manager::Error,
},
Expand Down Expand Up @@ -296,7 +300,7 @@ impl Instance {
});
self.space_store
.manifest
.store_update(meta_update)
.store_update(MetaUpdateRequest::new(table_data.location(), meta_update))
.await
.context(StoreVersionEdit)?;

Expand Down Expand Up @@ -536,7 +540,7 @@ impl Instance {
let meta_update = MetaUpdate::VersionEdit(edit_meta);
self.space_store
.manifest
.store_update(meta_update)
.store_update(MetaUpdateRequest::new(table_data.location(), meta_update))
.await
.context(StoreVersionEdit)?;

Expand All @@ -553,10 +557,10 @@ impl Instance {
// Mark sequence <= flushed_sequence to be deleted.
self.space_store
.wal_manager
.mark_delete_entries_up_to(table_data.wal_region_id(), flushed_sequence)
.mark_delete_entries_up_to(table_data.location(), flushed_sequence)
.await
.context(PurgeWal {
region_id: table_data.wal_region_id(),
table_location: table_data.location(),
sequence: flushed_sequence,
})?;

Expand Down Expand Up @@ -815,7 +819,7 @@ impl SpaceStore {

let meta_update = MetaUpdate::VersionEdit(edit_meta.clone());
self.manifest
.store_update(meta_update)
.store_update(MetaUpdateRequest::new(table_data.location(), meta_update))
.await
.context(StoreVersionEdit)?;

Expand Down
35 changes: 21 additions & 14 deletions analytic_engine/src/instance/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ use std::{
sync::{Arc, RwLock},
};

use common_types::schema::IndexInWriterSchema;
use common_types::{schema::IndexInWriterSchema, table::Location};
use log::{debug, error, info, trace, warn};
use object_store::ObjectStoreRef;
use snafu::ResultExt;
use table_engine::table::TableId;
use table_engine::engine::OpenTableRequest;
use tokio::sync::oneshot;
use wal::{
log_batch::LogEntry,
Expand Down Expand Up @@ -130,12 +130,12 @@ impl Instance {
pub async fn do_open_table(
self: &Arc<Self>,
space: SpaceRef,
table_id: TableId,
request: &OpenTableRequest,
) -> Result<Option<TableDataRef>> {
if let Some(table_data) = space.find_table_by_id(table_id) {
if let Some(table_data) = space.find_table_by_id(request.table_id) {
return Ok(Some(table_data));
}
let table_data = match self.recover_table_meta_data(table_id).await? {
let table_data = match self.recover_table_meta_data(request).await? {
Some(v) => v,
None => return Ok(None),
};
Expand Down Expand Up @@ -196,33 +196,39 @@ impl Instance {
/// Return None if no meta data is found for the table.
async fn recover_table_meta_data(
self: &Arc<Self>,
table_id: TableId,
request: &OpenTableRequest,
) -> Result<Option<TableDataRef>> {
info!("Instance recover table:{} meta begin", table_id);
info!("Instance recover table:{} meta begin", request.table_id);

// Load manifest, also create a new snapshot at startup.
let manifest_data = self
.space_store
.manifest
.load_data(table_id, true)
.load_data(
Location::new(request.shard_id, request.table_id.as_u64()),
true,
)
.await
.context(ReadMetaUpdate { table_id })?;
.context(ReadMetaUpdate {
table_id: request.table_id,
})?;

let table_data = if let Some(manifest_data) = manifest_data {
Some(self.apply_table_manifest_data(manifest_data).await?)
Some(self.recover_table_data(manifest_data, request).await?)
} else {
None
};

info!("Instance recover table:{} meta end", table_id);
info!("Instance recover table:{} meta end", request.table_id);

Ok(table_data)
}

/// Apply manifest data to instance
async fn apply_table_manifest_data(
/// Recover `TableData` by applying manifest data to instance
async fn recover_table_data(
ShiKaiWi marked this conversation as resolved.
Show resolved Hide resolved
self: &Arc<Self>,
manifest_data: TableManifestData,
request: &OpenTableRequest,
) -> Result<TableDataRef> {
let TableManifestData {
table_meta,
Expand All @@ -243,6 +249,7 @@ impl Instance {
write_handle,
&self.file_purger,
space.mem_usage_collector.clone(),
request.shard_id,
)
.context(RecoverTableData {
space_id: table_meta.space_id,
Expand Down Expand Up @@ -273,7 +280,7 @@ impl Instance {
read_ctx: &ReadContext,
) -> Result<()> {
let read_req = ReadRequest {
region_id: table_data.wal_region_id(),
location: table_data.location(),
start: ReadBoundary::Min,
end: ReadBoundary::Max,
};
Expand Down
Loading