Skip to content

Commit

Permalink
refactor: add associated type kvapi::Value::KeyType
Browse files Browse the repository at this point in the history
This way the key tyep and the value type becomes one-to-one mapping.
The KeyType is used to build a key from a value.

For example, `Id<DatabaseId>` is a value, to build a key to load
`DatabaseMeta`, it needs a `Tenant` to build the key `TIdent<T>`.
Such a `Tenant` can be retrieve from the key of `Id<DatabaseId>`, i.e.,
`<Id<DatabaseId> as kvapi::Value>::KeyType`

refactor: simplify get_data_mask
  • Loading branch information
drmingdrmer committed Aug 23, 2024
1 parent 5ebdfbe commit c27eb38
Show file tree
Hide file tree
Showing 51 changed files with 307 additions and 125 deletions.
61 changes: 26 additions & 35 deletions src/meta/api/src/data_mask_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use databend_common_meta_app::app_error::DatamaskAlreadyExists;
use databend_common_meta_app::app_error::UnknownDatamask;
use databend_common_meta_app::data_mask::CreateDatamaskReply;
use databend_common_meta_app::data_mask::CreateDatamaskReq;
use databend_common_meta_app::data_mask::DataMaskId;
use databend_common_meta_app::data_mask::DataMaskIdIdent;
use databend_common_meta_app::data_mask::DataMaskNameIdent;
use databend_common_meta_app::data_mask::DatamaskMeta;
Expand All @@ -35,7 +36,7 @@ use databend_common_meta_app::schema::TableMeta;
use databend_common_meta_app::KeyWithTenant;
use databend_common_meta_kvapi::kvapi;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::SeqValue;
use databend_common_meta_types::SeqV;
use databend_common_meta_types::TxnRequest;
use fastrace::func_name;
use log::debug;
Expand Down Expand Up @@ -186,7 +187,7 @@ impl<KV: kvapi::KVApi<Error = MetaError>> DatamaskApi for KV {

let name_key = &req.name;

let (_id_seq, _id, _data_mask_seq, policy) = get_data_mask_or_err(
let (_seq_id, policy) = get_data_mask_or_err(
self,
name_key,
format!("drop_data_mask: {}", name_key.display()),
Expand All @@ -202,34 +203,25 @@ async fn get_data_mask_or_err(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
name_key: &DataMaskNameIdent,
msg: impl Display,
) -> Result<(u64, u64, u64, DatamaskMeta), KVAppError> {
let (id_seq, id) = get_u64_value(kv_api, name_key).await?;
assert_data_mask_exist(id_seq, name_key, &msg)?;

let id_ident = DataMaskIdIdent::new(name_key.tenant(), id);

let seq_v = kv_api.get_pb(&id_ident).await?;
assert_data_mask_exist(seq_v.seq(), name_key, msg)?;
) -> Result<(SeqV<DataMaskId>, SeqV<DatamaskMeta>), KVAppError> {
let seq_id = kv_api.get_pb(name_key).await?.ok_or_else(|| {
AppError::from(UnknownDatamask::new(
name_key.name(),
format!("{}: {}", msg, name_key.data_mask_name()),
))
})?;

let id_ident = DataMaskIdIdent::new_generic(name_key.tenant(), seq_id.data.into_inner());

let seq_v = kv_api.get_pb(&id_ident).await?.ok_or_else(|| {
AppError::from(UnknownDatamask::new(
name_key.name(),
format!("{}: {}", msg, name_key.data_mask_name()),
))
})?;

// Safe unwrap(): data_mask_seq > 0 implies data_mask is not None.
Ok((id_seq, id, seq_v.seq(), seq_v.unwrap().data))
}

pub fn assert_data_mask_exist(
seq: u64,
name_ident: &DataMaskNameIdent,
msg: impl Display,
) -> Result<(), AppError> {
if seq == 0 {
debug!(seq = seq, name_ident :? =(name_ident); "data mask does not exist");

Err(AppError::UnknownDatamask(UnknownDatamask::new(
name_ident.name(),
format!("{}: {}", msg, name_ident.data_mask_name()),
)))
} else {
Ok(())
}
Ok((seq_id.map(|id| id.into_inner()), seq_v))
}

async fn clear_table_column_mask_policy(
Expand Down Expand Up @@ -289,8 +281,8 @@ async fn construct_drop_mask_policy_operations(
)
.await;

let (id_seq, id, data_mask_seq, _) = match result {
Ok((id_seq, id, data_mask_seq, meta)) => (id_seq, id, data_mask_seq, meta),
let (seq_id, seq_meta) = match result {
Ok((seq_id, seq_meta)) => (seq_id, seq_meta),
Err(err) => {
if let KVAppError::AppError(AppError::UnknownDatamask(_)) = err {
if drop_if_exists {
Expand All @@ -302,21 +294,20 @@ async fn construct_drop_mask_policy_operations(
}
};

let id_ident = DataMaskIdIdent::new(name_key.tenant(), id);
let id_ident = DataMaskIdIdent::new_generic(name_key.tenant(), seq_id.data);

txn.condition
.push(txn_cond_eq_seq(&id_ident, data_mask_seq));
txn.condition.push(txn_cond_eq_seq(&id_ident, seq_meta.seq));
txn.if_then.push(txn_op_del(&id_ident));

if if_delete {
txn.condition.push(txn_cond_eq_seq(name_key, id_seq));
txn.condition.push(txn_cond_eq_seq(name_key, seq_id.seq));
txn.if_then.push(txn_op_del(name_key));
clear_table_column_mask_policy(kv_api, name_key, txn).await?;
}

debug!(
name :? =(name_key),
id :? =id,
seq_id :? =seq_id,
ctx = ctx;
"construct_drop_mask_policy_operations"
);
Expand Down
6 changes: 3 additions & 3 deletions src/meta/api/src/kv_pb_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ mod tests {
type Error = MetaError;

async fn upsert_kv(&self, _req: UpsertKVReq) -> Result<UpsertKVReply, Self::Error> {
todo!()
unimplemented!()
}

async fn get_kv_stream(
Expand All @@ -384,11 +384,11 @@ mod tests {
}

async fn list_kv(&self, _prefix: &str) -> Result<KVStream<Self::Error>, Self::Error> {
todo!()
unimplemented!()
}

async fn transaction(&self, _txn: TxnRequest) -> Result<TxnReply, Self::Error> {
todo!()
unimplemented!()
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/meta/app/src/background/background_job_id_ident.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ mod kvapi_impl {

use databend_common_meta_kvapi::kvapi;

use crate::background::BackgroundJobIdIdent;
use crate::background::BackgroundJobInfo;
use crate::tenant_key::resource::TenantResource;

Expand All @@ -48,7 +49,8 @@ mod kvapi_impl {
}

impl kvapi::Value for BackgroundJobInfo {
fn dependency_keys(&self) -> impl IntoIterator<Item = String> {
type KeyType = BackgroundJobIdIdent;
fn dependency_keys(&self, _key: &Self::KeyType) -> impl IntoIterator<Item = String> {
[]
}
}
Expand Down
5 changes: 4 additions & 1 deletion src/meta/app/src/background/job_ident.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ mod kvapi_impl {
use databend_common_meta_kvapi::kvapi::Key;

use crate::background::BackgroundJobIdIdent;
use crate::background::BackgroundJobIdent;
use crate::tenant_key::resource::TenantResource;

pub struct Resource;
Expand All @@ -44,7 +45,9 @@ mod kvapi_impl {
}

impl kvapi::Value for BackgroundJobIdIdent {
fn dependency_keys(&self) -> impl IntoIterator<Item = String> {
type KeyType = BackgroundJobIdent;

fn dependency_keys(&self, _key: &Self::KeyType) -> impl IntoIterator<Item = String> {
[self.to_string_key()]
}
}
Expand Down
4 changes: 3 additions & 1 deletion src/meta/app/src/background/task_ident.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ mod kvapi_impl {

use databend_common_meta_kvapi::kvapi;

use crate::background::BackgroundTaskIdent;
use crate::background::BackgroundTaskInfo;
use crate::tenant_key::resource::TenantResource;

Expand All @@ -35,7 +36,8 @@ mod kvapi_impl {
}

impl kvapi::Value for BackgroundTaskInfo {
fn dependency_keys(&self) -> impl IntoIterator<Item = String> {
type KeyType = BackgroundTaskIdent;
fn dependency_keys(&self, _key: &Self::KeyType) -> impl IntoIterator<Item = String> {
[]
}
}
Expand Down
44 changes: 44 additions & 0 deletions src/meta/app/src/data_mask/data_mask_id.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use databend_common_meta_kvapi::kvapi::KeyBuilder;
use databend_common_meta_kvapi::kvapi::KeyCodec;
use databend_common_meta_kvapi::kvapi::KeyError;
use databend_common_meta_kvapi::kvapi::KeyParser;
use derive_more::Deref;
use derive_more::DerefMut;
use derive_more::From;

#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Deref, DerefMut, From)]
pub struct DataMaskId {
id: u64,
}

impl DataMaskId {
pub fn new(id: u64) -> Self {
DataMaskId { id }
}
}

impl KeyCodec for DataMaskId {
fn encode_key(&self, b: KeyBuilder) -> KeyBuilder {
b.push_u64(self.id)
}

fn decode_key(parser: &mut KeyParser) -> Result<Self, KeyError>
where Self: Sized {
let v = parser.next_u64()?;
Ok(Self::new(v))
}
}
22 changes: 16 additions & 6 deletions src/meta/app/src/data_mask/data_mask_id_ident.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,29 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::data_mask::DataMaskId;
use crate::tenant_key::ident::TIdent;
use crate::tenant_key::raw::TIdentRaw;

pub type DataMaskIdIdent = TIdent<Resource, u64>;
pub type DataMaskIdIdentRaw = TIdentRaw<Resource, u64>;
pub type DataMaskIdIdent = TIdent<Resource, DataMaskId>;
pub type DataMaskIdIdentRaw = TIdentRaw<Resource, DataMaskId>;

pub use kvapi_impl::Resource;

use crate::tenant::ToTenant;

impl DataMaskIdIdent {
pub fn data_mask_id(&self) -> u64 {
pub fn new(tenant: impl ToTenant, data_mask_id: u64) -> Self {
Self::new_generic(tenant, DataMaskId::new(data_mask_id))
}

pub fn data_mask_id(&self) -> DataMaskId {
*self.name()
}
}

impl DataMaskIdIdentRaw {
pub fn data_mask_id(&self) -> u64 {
pub fn data_mask_id(&self) -> DataMaskId {
*self.name()
}
}
Expand All @@ -36,6 +43,7 @@ mod kvapi_impl {

use databend_common_meta_kvapi::kvapi;

use crate::data_mask::DataMaskIdIdent;
use crate::data_mask::DatamaskMeta;
use crate::tenant_key::resource::TenantResource;

Expand All @@ -48,7 +56,8 @@ mod kvapi_impl {
}

impl kvapi::Value for DatamaskMeta {
fn dependency_keys(&self) -> impl IntoIterator<Item = String> {
type KeyType = DataMaskIdIdent;
fn dependency_keys(&self, _key: &Self::KeyType) -> impl IntoIterator<Item = String> {
[]
}
}
Expand All @@ -63,12 +72,13 @@ mod tests {
use databend_common_meta_kvapi::kvapi::Key;

use super::DataMaskIdIdent;
use crate::data_mask::DataMaskId;
use crate::tenant::Tenant;

#[test]
fn test_data_mask_id_ident() {
let tenant = Tenant::new_literal("dummy");
let ident = DataMaskIdIdent::new(tenant, 3);
let ident = DataMaskIdIdent::new_generic(tenant, DataMaskId::new(3));

let key = ident.to_string_key();
assert_eq!(key, "__fd_datamask_by_id/3");
Expand Down
14 changes: 10 additions & 4 deletions src/meta/app/src/data_mask/data_mask_name_ident.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,26 @@ mod kvapi_impl {
use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::Key;

use crate::data_mask::DataMaskId;
use crate::data_mask::DataMaskIdIdent;
use crate::data_mask::DataMaskNameIdent;
use crate::primitive::Id;
use crate::tenant_key::resource::TenantResource;
use crate::KeyWithTenant;

pub struct Resource;
impl TenantResource for Resource {
const PREFIX: &'static str = "__fd_datamask";
const TYPE: &'static str = "DataMaskNameIdent";
const HAS_TENANT: bool = true;
type ValueType = DataMaskIdIdent;
type ValueType = Id<DataMaskId>;
}

impl kvapi::Value for DataMaskIdIdent {
fn dependency_keys(&self) -> impl IntoIterator<Item = String> {
[self.to_string_key()]
impl kvapi::Value for Id<DataMaskId> {
type KeyType = DataMaskNameIdent;

fn dependency_keys(&self, key: &Self::KeyType) -> impl IntoIterator<Item = String> {
[DataMaskIdIdent::new_generic(key.tenant(), self.into_inner()).to_string_key()]
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ mod kvapi_impl {

use databend_common_meta_kvapi::kvapi;

use crate::data_mask::MaskPolicyTableIdListIdent;
use crate::data_mask::MaskpolicyTableIdList;
use crate::tenant_key::resource::TenantResource;

Expand All @@ -36,7 +37,8 @@ mod kvapi_impl {
}

impl kvapi::Value for MaskpolicyTableIdList {
fn dependency_keys(&self) -> impl IntoIterator<Item = String> {
type KeyType = MaskPolicyTableIdListIdent;
fn dependency_keys(&self, _key: &Self::KeyType) -> impl IntoIterator<Item = String> {
[]
}
}
Expand Down
5 changes: 4 additions & 1 deletion src/meta/app/src/data_mask/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod data_mask_id;
pub mod data_mask_id_ident;
pub mod data_mask_name_ident;
pub mod mask_policy_table_id_list_ident;
Expand All @@ -20,8 +21,10 @@ use std::collections::BTreeSet;

use chrono::DateTime;
use chrono::Utc;
pub use data_mask_id::DataMaskId;
pub use data_mask_id_ident::DataMaskIdIdent;
pub use data_mask_name_ident::DataMaskNameIdent;
use databend_common_meta_types::SeqV;
pub use mask_policy_table_id_list_ident::MaskPolicyTableIdListIdent;

use crate::schema::CreateOption;
Expand Down Expand Up @@ -82,7 +85,7 @@ pub struct GetDatamaskReq {

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct GetDatamaskReply {
pub policy: DatamaskMeta,
pub policy: SeqV<DatamaskMeta>,
}

/// A list of table ids
Expand Down
Loading

0 comments on commit c27eb38

Please sign in to comment.