Skip to content

Commit

Permalink
Merge branch 'main' into short-sql-default-setting
Browse files Browse the repository at this point in the history
  • Loading branch information
arkzuse authored Sep 23, 2024
2 parents 311b773 + 414c99c commit 74c161f
Show file tree
Hide file tree
Showing 12 changed files with 231 additions and 177 deletions.
48 changes: 24 additions & 24 deletions src/binaries/query/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,30 @@ pub async fn start_services(conf: &InnerConfig) -> Result<(), MainError> {

info!("Databend Query start with config: {:?}", conf);

// Cluster register.
{
ClusterDiscovery::instance()
.register_to_metastore(conf)
.await
.with_context(make_error)?;
info!(
"Databend query has been registered:{:?} to metasrv:{:?}.",
conf.query.cluster_id, conf.meta.endpoints
);
}

// RPC API service.
{
let address = conf.query.flight_api_address.clone();
let mut srv = FlightService::create(conf.clone()).with_context(make_error)?;
let listening = srv
.start(address.parse().with_context(make_error)?)
.await
.with_context(make_error)?;
shutdown_handle.add_service("RPCService", srv);
info!("Listening for RPC API (interserver): {}", listening);
}

// MySQL handler.
{
let hostname = conf.query.mysql_handler_host.clone();
Expand Down Expand Up @@ -229,30 +253,6 @@ pub async fn start_services(conf: &InnerConfig) -> Result<(), MainError> {
info!("Listening for FlightSQL API: {}", listening);
}

// RPC API service.
{
let address = conf.query.flight_api_address.clone();
let mut srv = FlightService::create(conf.clone()).with_context(make_error)?;
let listening = srv
.start(address.parse().with_context(make_error)?)
.await
.with_context(make_error)?;
shutdown_handle.add_service("RPCService", srv);
info!("Listening for RPC API (interserver): {}", listening);
}

// Cluster register.
{
ClusterDiscovery::instance()
.register_to_metastore(conf)
.await
.with_context(make_error)?;
info!(
"Databend query has been registered:{:?} to metasrv:{:?}.",
conf.query.cluster_id, conf.meta.endpoints
);
}

// Print information to users.
println!("Databend Query");

Expand Down
1 change: 1 addition & 0 deletions src/common/exception/src/exception_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ build_exceptions! {
/// For example: license key is expired
LicenseKeyInvalid(1402),
EnterpriseFeatureNotEnable(1403),
LicenseKeyExpired(1404),

BackgroundJobAlreadyExists(1501),
UnknownBackgroundJob(1502),
Expand Down
21 changes: 13 additions & 8 deletions src/common/license/src/license.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::fmt;

use databend_common_base::display::display_option::DisplayOptionExt;
use databend_common_base::display::display_slice::DisplaySliceExt;
use databend_common_exception::ErrorCode;
use serde::Deserialize;
use serde::Serialize;

Expand Down Expand Up @@ -124,31 +125,35 @@ impl fmt::Display for Feature {
}

impl Feature {
pub fn verify(&self, feature: &Feature) -> bool {
pub fn verify_default(&self, message: impl Into<String>) -> Result<(), ErrorCode> {
Err(ErrorCode::LicenseKeyInvalid(message.into()))
}

pub fn verify(&self, feature: &Feature) -> Result<bool, ErrorCode> {
match (self, feature) {
(Feature::ComputeQuota(c), Feature::ComputeQuota(v)) => {
if let Some(thread_num) = c.threads_num {
if thread_num <= v.threads_num.unwrap_or(usize::MAX) {
return false;
return Ok(false);
}
}

if let Some(max_memory_usage) = c.memory_usage {
if max_memory_usage <= v.memory_usage.unwrap_or(usize::MAX) {
return false;
return Ok(false);
}
}

true
Ok(true)
}
(Feature::StorageQuota(c), Feature::StorageQuota(v)) => {
if let Some(max_storage_usage) = c.storage_usage {
if max_storage_usage <= v.storage_usage.unwrap_or(usize::MAX) {
return false;
return Ok(false);
}
}

true
Ok(true)
}
(Feature::Test, Feature::Test)
| (Feature::AggregateIndex, Feature::AggregateIndex)
Expand All @@ -161,8 +166,8 @@ impl Feature {
| (Feature::InvertedIndex, Feature::InvertedIndex)
| (Feature::VirtualColumn, Feature::VirtualColumn)
| (Feature::AttacheTable, Feature::AttacheTable)
| (Feature::StorageEncryption, Feature::StorageEncryption) => true,
(_, _) => false,
| (Feature::StorageEncryption, Feature::StorageEncryption) => Ok(true),
(_, _) => Ok(false),
}
}
}
Expand Down
7 changes: 3 additions & 4 deletions src/common/license/src/license_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,9 @@ impl LicenseManager for OssLicenseManager {
GlobalInstance::get()
}

fn check_enterprise_enabled(&self, _license_key: String, _feature: Feature) -> Result<()> {
Err(ErrorCode::LicenseKeyInvalid(
"Need Commercial License".to_string(),
))
fn check_enterprise_enabled(&self, _license_key: String, feature: Feature) -> Result<()> {
// oss ignore license key.
feature.verify_default("Need Commercial License".to_string())
}

fn parse_license(&self, _raw: &str) -> Result<JWTClaims<LicenseInfo>> {
Expand Down
18 changes: 12 additions & 6 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2753,7 +2753,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
});
} else {
for (table_info, db_id) in table_infos.iter().take(capacity) {
vacuum_ids.push(DroppedId::Table(
vacuum_ids.push(DroppedId::new_table(
*db_id,
table_info.ident.table_id,
table_info.name.clone(),
Expand Down Expand Up @@ -2803,7 +2803,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
let mut drop_table_infos = vec![];

for (table_info, db_id) in table_infos.iter().take(the_limit) {
drop_ids.push(DroppedId::Table(
drop_ids.push(DroppedId::new_table(
*db_id,
table_info.ident.table_id,
table_info.name.clone(),
Expand All @@ -2826,8 +2826,15 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
db_name,
tables: _,
} => gc_dropped_db_by_id(self, db_id, &req.tenant, db_name).await?,
DroppedId::Table(db_id, table_id, table_name) => {
gc_dropped_table_by_id(self, &req.tenant, db_id, table_id, table_name).await?
DroppedId::Table { name, id } => {
gc_dropped_table_by_id(
self,
&req.tenant,
name.db_id,
id.table_id,
name.table_name.clone(),
)
.await?
}
}
}
Expand Down Expand Up @@ -3613,8 +3620,7 @@ async fn batch_filter_table_info(
.iter()
.map(|(_f, _db, table_id, _table_name)| TableId::new(*table_id));

let strm = kv_api.get_pb_values(table_id_idents).await?;
let seq_metas = strm.try_collect::<Vec<_>>().await?;
let seq_metas = kv_api.get_pb_values_vec(table_id_idents).await?;

for (seq_meta, (filter, db_info, table_id, table_name)) in seq_metas
.into_iter()
Expand Down
Loading

0 comments on commit 74c161f

Please sign in to comment.