Skip to content

Commit

Permalink
fix: bug in distributed wal(obkv, kafka) (#422)
Browse files Browse the repository at this point in the history
* change `shard_id` in `TableData` to `shard_info`

* modify wal to adapt adding shard version.

* add test for makeing wal perceive shard version.

* add more logs in key path to help debug.

* change the mapping from `shard version -> region version` to `cluster version -> region version`.

* address CR.
  • Loading branch information
Rachelint authored Dec 1, 2022
1 parent 9250cef commit 4d39868
Show file tree
Hide file tree
Showing 37 changed files with 944 additions and 449 deletions.
21 changes: 12 additions & 9 deletions analytic_engine/src/instance/alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,19 +122,19 @@ impl Instance {
let payload = WritePayload::AlterSchema(&alter_schema_pb);

// Encode payloads
let region_id = table_data.location();
let wal_location = table_data.wal_location();
let log_batch_encoder =
self.space_store
.wal_manager
.encoder(region_id)
.encoder(wal_location)
.context(GetLogBatchEncoder {
table: &table_data.name,
table_location: table_data.location(),
wal_location: table_data.wal_location(),
})?;

let log_batch = log_batch_encoder.encode(&payload).context(EncodePayloads {
table: &table_data.name,
table_location: table_data.location(),
wal_location: table_data.wal_location(),
})?;

// Write log batch
Expand All @@ -159,7 +159,7 @@ impl Instance {
let update = MetaUpdate::AlterSchema(manifest_update);
self.space_store
.manifest
.store_update(MetaUpdateRequest::new(table_data.location(), update))
.store_update(MetaUpdateRequest::new(table_data.wal_location(), update))
.await
.context(WriteManifest {
space_id: table_data.space_id,
Expand Down Expand Up @@ -290,19 +290,19 @@ impl Instance {
let payload = WritePayload::AlterOption(&alter_options_pb);

// Encode payload
let region_id = table_data.location();
let region_id = table_data.wal_location();
let log_batch_encoder =
self.space_store
.wal_manager
.encoder(region_id)
.context(GetLogBatchEncoder {
table: &table_data.name,
table_location: table_data.location(),
wal_location: table_data.wal_location(),
})?;

let log_batch = log_batch_encoder.encode(&payload).context(EncodePayloads {
table: &table_data.name,
table_location: table_data.location(),
wal_location: table_data.wal_location(),
})?;

// Write log batch
Expand All @@ -322,7 +322,10 @@ impl Instance {
let meta_update = MetaUpdate::AlterOptions(manifest_update);
self.space_store
.manifest
.store_update(MetaUpdateRequest::new(table_data.location(), meta_update))
.store_update(MetaUpdateRequest::new(
table_data.wal_location(),
meta_update,
))
.await
.context(WriteManifest {
space_id: table_data.space_id,
Expand Down
2 changes: 1 addition & 1 deletion analytic_engine/src/instance/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl Instance {
});
self.space_store
.manifest
.store_update(MetaUpdateRequest::new(table_data.location(), update))
.store_update(MetaUpdateRequest::new(table_data.wal_location(), update))
.await
.context(WriteManifest {
space_id: space.id,
Expand Down
2 changes: 1 addition & 1 deletion analytic_engine/src/instance/drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ impl Instance {
});
self.space_store
.manifest
.store_update(MetaUpdateRequest::new(table_data.location(), update))
.store_update(MetaUpdateRequest::new(table_data.wal_location(), update))
.await
.context(WriteManifest {
space_id: space.id,
Expand Down
15 changes: 8 additions & 7 deletions analytic_engine/src/instance/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@

use std::sync::Arc;

use common_types::{schema::Version, table::Location};
use common_types::schema::Version;
use common_util::define_result;
use snafu::{Backtrace, OptionExt, Snafu};
use table_engine::{
engine::{CloseTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest},
table::TableId,
};
use wal::manager::WalLocation;

use crate::{
instance::{write_worker::WriteGroup, Instance},
Expand Down Expand Up @@ -191,26 +192,26 @@ pub enum Error {
},

#[snafu(display(
"Failed to get to log batch encoder, table:{}, table_location:{:?}, err:{}",
"Failed to get to log batch encoder, table:{}, wal_location:{:?}, err:{}",
table,
table_location,
wal_location,
source
))]
GetLogBatchEncoder {
table: String,
table_location: Location,
wal_location: WalLocation,
source: wal::manager::Error,
},

#[snafu(display(
"Failed to encode payloads, table:{},table_location:{:?}, err:{}",
"Failed to encode payloads, table:{}, wal_location:{:?}, err:{}",
table,
table_location,
wal_location,
source
))]
EncodePayloads {
table: String,
table_location: Location,
wal_location: WalLocation,
source: wal::manager::Error,
},
}
Expand Down
27 changes: 18 additions & 9 deletions analytic_engine/src/instance/flush_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use common_types::{
record_batch::{RecordBatchWithKey, RecordBatchWithKeyBuilder},
request_id::RequestId,
row::RowViewOnBatch,
table::Location,
time::TimeRange,
SequenceNumber,
};
Expand All @@ -23,6 +22,7 @@ use log::{error, info};
use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
use table_engine::{predicate::Predicate, table::Result as TableResult};
use tokio::sync::oneshot;
use wal::manager::WalLocation;

use crate::{
compaction::{
Expand Down Expand Up @@ -64,12 +64,12 @@ pub enum Error {
},

#[snafu(display(
"Failed to purge wal, table_location:{:?}, sequence:{}",
table_location,
"Failed to purge wal, wal_location:{:?}, sequence:{}",
wal_location,
sequence
))]
PurgeWal {
table_location: Location,
wal_location: WalLocation,
sequence: SequenceNumber,
source: wal::manager::Error,
},
Expand Down Expand Up @@ -300,7 +300,10 @@ impl Instance {
});
self.space_store
.manifest
.store_update(MetaUpdateRequest::new(table_data.location(), meta_update))
.store_update(MetaUpdateRequest::new(
table_data.wal_location(),
meta_update,
))
.await
.context(StoreVersionEdit)?;

Expand Down Expand Up @@ -540,7 +543,10 @@ impl Instance {
let meta_update = MetaUpdate::VersionEdit(edit_meta);
self.space_store
.manifest
.store_update(MetaUpdateRequest::new(table_data.location(), meta_update))
.store_update(MetaUpdateRequest::new(
table_data.wal_location(),
meta_update,
))
.await
.context(StoreVersionEdit)?;

Expand All @@ -557,10 +563,10 @@ impl Instance {
// Mark sequence <= flushed_sequence to be deleted.
self.space_store
.wal_manager
.mark_delete_entries_up_to(table_data.location(), flushed_sequence)
.mark_delete_entries_up_to(table_data.wal_location(), flushed_sequence)
.await
.context(PurgeWal {
table_location: table_data.location(),
wal_location: table_data.wal_location(),
sequence: flushed_sequence,
})?;

Expand Down Expand Up @@ -821,7 +827,10 @@ impl SpaceStore {

let meta_update = MetaUpdate::VersionEdit(edit_meta.clone());
self.manifest
.store_update(MetaUpdateRequest::new(table_data.location(), meta_update))
.store_update(MetaUpdateRequest::new(
table_data.wal_location(),
meta_update,
))
.await
.context(StoreVersionEdit)?;

Expand Down
17 changes: 11 additions & 6 deletions analytic_engine/src/instance/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ use std::{
sync::{Arc, RwLock},
};

use common_types::{schema::IndexInWriterSchema, table::Location};
use common_types::schema::IndexInWriterSchema;
use log::{debug, error, info, trace, warn};
use object_store::ObjectStoreRef;
use snafu::ResultExt;
use table_engine::engine::OpenTableRequest;
use tokio::sync::oneshot;
use wal::{
log_batch::LogEntry,
manager::{ReadBoundary, ReadContext, ReadRequest, WalManagerRef},
manager::{ReadBoundary, ReadContext, ReadRequest, RegionId, WalLocation, WalManagerRef},
};

use crate::{
Expand Down Expand Up @@ -203,7 +203,11 @@ impl Instance {
.space_store
.manifest
.load_data(
Location::new(request.shard_id, request.table_id.as_u64()),
WalLocation::new(
request.shard_id as RegionId,
request.cluster_version,
request.table_id.as_u64(),
),
true,
)
.await
Expand Down Expand Up @@ -248,6 +252,7 @@ impl Instance {
&self.file_purger,
space.mem_usage_collector.clone(),
request.shard_id,
request.cluster_version,
)
.context(RecoverTableData {
space_id: table_meta.space_id,
Expand Down Expand Up @@ -278,12 +283,12 @@ impl Instance {
read_ctx: &ReadContext,
) -> Result<()> {
debug!(
"Instance recover table from wal, replay batch size:{}, table id:{}, shard id:{}",
replay_batch_size, table_data.id, table_data.shard_id
"Instance recover table from wal, replay batch size:{}, table id:{}, shard info:{:?}",
replay_batch_size, table_data.id, table_data.shard_info
);

let read_req = ReadRequest {
location: table_data.location(),
location: table_data.wal_location(),
start: ReadBoundary::Excluded(table_data.current_version().flushed_sequence()),
end: ReadBoundary::Max,
};
Expand Down
23 changes: 11 additions & 12 deletions analytic_engine/src/instance/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use common_types::{
bytes::ByteVec,
row::RowGroup,
schema::{IndexInWriterSchema, Schema},
table::Location,
};
use common_util::{codec::row, define_result};
use log::{debug, error, info, trace, warn};
Expand All @@ -17,7 +16,7 @@ use smallvec::SmallVec;
use snafu::{ensure, Backtrace, ResultExt, Snafu};
use table_engine::table::WriteRequest;
use tokio::sync::oneshot;
use wal::manager::{SequenceNumber, WriteContext};
use wal::manager::{SequenceNumber, WalLocation, WriteContext};

use crate::{
instance::{
Expand All @@ -38,26 +37,26 @@ use crate::{
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display(
"Failed to get to log batch encoder, table:{}, table_location:{:?}, err:{}",
"Failed to get to log batch encoder, table:{}, wal_location:{:?}, err:{}",
table,
table_location,
wal_location,
source
))]
GetLogBatchEncoder {
table: String,
table_location: Location,
wal_location: WalLocation,
source: wal::manager::Error,
},

#[snafu(display(
"Failed to encode payloads, table:{}, table_location:{:?}, err:{}",
"Failed to encode payloads, table:{}, wal_location:{:?}, err:{}",
table,
table_location,
wal_location,
source
))]
EncodePayloads {
table: String,
table_location: Location,
wal_location: WalLocation,
source: wal::manager::Error,
},

Expand Down Expand Up @@ -402,18 +401,18 @@ impl Instance {

// Encode payload
let payload = WritePayload::Write(&write_req_pb);
let region_id = table_data.location();
let wal_location = table_data.wal_location();
let log_batch_encoder =
self.space_store
.wal_manager
.encoder(region_id)
.encoder(wal_location)
.context(GetLogBatchEncoder {
table: &table_data.name,
table_location: table_data.location(),
wal_location,
})?;
let log_batch = log_batch_encoder.encode(&payload).context(EncodePayloads {
table: &table_data.name,
table_location: table_data.location(),
wal_location,
})?;

// Write to wal manager
Expand Down
Loading

0 comments on commit 4d39868

Please sign in to comment.