Skip to content

Commit

Permalink
Merge 3412b98 into a7d3f72
Browse files Browse the repository at this point in the history
  • Loading branch information
chunshao90 authored Feb 14, 2023
2 parents a7d3f72 + 3412b98 commit da95497
Show file tree
Hide file tree
Showing 51 changed files with 244 additions and 938 deletions.
41 changes: 14 additions & 27 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ members = [
"integration_tests",
"interpreters",
"meta_client",
"proto",
"query_engine",
"remote_engine_client",
"router",
Expand Down Expand Up @@ -92,7 +91,6 @@ paste = "1.0"
profile = { path = "components/profile" }
prometheus = "0.12"
prometheus-static-metric = "0.5"
proto = { path = "proto" }
prost = "0.11"
query_engine = { path = "query_engine" }
rand = "0.7"
Expand Down Expand Up @@ -120,7 +118,7 @@ zstd = { version = "0.12", default-features = false }

[workspace.dependencies.ceresdbproto]
git = "https://github.com/CeresDB/ceresdbproto.git"
rev = "dd6921dbb59bbe9a2fcc27960e76a8c870b44415"
rev = "81a6d9ead104b2910f5c4484135054d51095090b"

[dependencies]
analytic_engine = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion analytic_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ parquet = { workspace = true }
parquet_ext = { workspace = true }
prometheus = { workspace = true }
prost = { workspace = true }
proto = { workspace = true }
ceresdbproto = { workspace = true }
remote_engine_client = { workspace = true }
router = { workspace = true }
serde = { workspace = true }
Expand Down
4 changes: 2 additions & 2 deletions analytic_engine/src/instance/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@

use std::sync::Arc;

use ceresdbproto::{schema as schema_pb, table_requests};
use common_types::{
bytes::ByteVec,
row::RowGroup,
schema::{IndexInWriterSchema, Schema},
};
use common_util::{codec::row, define_result};
use log::{debug, error, info, trace, warn};
use proto::{common as common_pb, table_requests};
use smallvec::SmallVec;
use snafu::{ensure, Backtrace, ResultExt, Snafu};
use table_engine::table::WriteRequest;
Expand Down Expand Up @@ -381,7 +381,7 @@ impl Instance {
version: 0,
// Use the table schema instead of the schema in request to avoid schema
// mismatch during replaying
schema: Some(common_pb::TableSchema::from(&table_data.schema())),
schema: Some(schema_pb::TableSchema::from(&table_data.schema())),
rows: encoded_rows,
};

Expand Down
16 changes: 8 additions & 8 deletions analytic_engine/src/manifest/details.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ use std::{
};

use async_trait::async_trait;
use ceresdbproto::manifest as manifest_pb;
use common_util::{config::ReadableDuration, define_result};
use log::{debug, info, warn};
use object_store::{ObjectStoreRef, Path};
use parquet::data_type::AsBytes;
use prost::Message;
use proto::meta_update;
use serde_derive::Deserialize;
use snafu::{Backtrace, ResultExt, Snafu};
use table_engine::table::TableId;
Expand Down Expand Up @@ -398,7 +398,7 @@ impl MetaUpdateSnapshotStore for ObjectStoreBasedSnapshotStore {
/// Store the latest snapshot to the underlying store by overwriting the old
/// snapshot.
async fn store(&self, snapshot: &Snapshot) -> Result<()> {
let snapshot_pb = meta_update::Snapshot::from(snapshot.clone());
let snapshot_pb = manifest_pb::Snapshot::from(snapshot.clone());
let payload = snapshot_pb.encode_to_vec();
// The atomic write is ensured by the [`ObjectStore`] implementation.
self.store
Expand Down Expand Up @@ -426,7 +426,7 @@ impl MetaUpdateSnapshotStore for ObjectStoreBasedSnapshotStore {
.bytes()
.await
.context(FetchSnapshot)?;
let snapshot = meta_update::Snapshot::decode(payload.as_bytes()).context(DecodeSnapshot)?;
let snapshot = manifest_pb::Snapshot::decode(payload.as_bytes()).context(DecodeSnapshot)?;
Ok(Some(Snapshot::try_from(snapshot)?))
}
}
Expand Down Expand Up @@ -519,10 +519,10 @@ struct Snapshot {
data: Option<TableManifestData>,
}

impl TryFrom<meta_update::Snapshot> for Snapshot {
impl TryFrom<manifest_pb::Snapshot> for Snapshot {
type Error = Error;

fn try_from(src: meta_update::Snapshot) -> Result<Self> {
fn try_from(src: manifest_pb::Snapshot) -> Result<Self> {
let meta = src
.meta
.map(AddTableMeta::try_from)
Expand Down Expand Up @@ -553,12 +553,12 @@ impl TryFrom<meta_update::Snapshot> for Snapshot {
}
}

impl From<Snapshot> for meta_update::Snapshot {
impl From<Snapshot> for manifest_pb::Snapshot {
fn from(src: Snapshot) -> Self {
if let Some((meta, version_edit)) = src.data.map(|v| {
let space_id = v.table_meta.space_id;
let table_id = v.table_meta.table_id;
let table_meta = meta_update::AddTableMeta::from(v.table_meta);
let table_meta = manifest_pb::AddTableMeta::from(v.table_meta);
let version_edit = v.version_meta.map(|version_meta| VersionEditMeta {
space_id,
table_id,
Expand All @@ -568,7 +568,7 @@ impl From<Snapshot> for meta_update::Snapshot {
});
(
table_meta,
version_edit.map(meta_update::VersionEditMeta::from),
version_edit.map(manifest_pb::VersionEditMeta::from),
)
}) {
Self {
Expand Down
Loading

0 comments on commit da95497

Please sign in to comment.