Skip to content

Commit

Permalink
feat: implement Background service for databend (databendlabs#11751)
Browse files Browse the repository at this point in the history
* feat: background service patch 1(schema definitions)

* feat: add kv api for background tasks

* feat: add background_task apis with unit-test

* feat: add background job schema

* save

* feat: add compaction background job

* save

* save

* save

* add task logics

* save

* save

* save

* save

* feat: refactor background apis

* feat: add apis

* save

* save

* lint

* license

* license

* license

* fix debug settings

* fix comment

* fix

* remove unneeded stats

* fix comment

* fix unit tests

* fix

* fix comment

* remove unneeded kv call

* fix

* f
  • Loading branch information
ZhiHanZ authored and andylokandy committed Nov 27, 2023
1 parent e7c6720 commit 81e7f20
Show file tree
Hide file tree
Showing 50 changed files with 4,083 additions and 13 deletions.
38 changes: 35 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ members = [
"src/query/storages/result_cache",
"src/query/users",
"src/query/ee-features/vacuum-handler",
"src/query/ee-features/background-service",
"src/query/ee-features/aggregating-index",
"src/query/ee-features/data-mask",
"src/query/ee-features/table-lock",
Expand Down
2 changes: 1 addition & 1 deletion src/binaries/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ databend-meta = { path = "../meta/service" }
databend-query = { path = "../query/service" }
enterprise-query = { path = "../query/ee" }
# enterprise-meta = { path = "../meta/ee" }
background-service = { path = "../query/ee-features/background-service" }
sharing-endpoint = { path = "../query/sharing-endpoint" }
storages-common-table-meta = { path = "../query/storages/common/table-meta" }

# Crates.io dependencies
anyerror = { workspace = true }
anyhow = { workspace = true }
Expand Down
19 changes: 17 additions & 2 deletions src/binaries/query/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::env;

use background_service::get_background_service_handler;
use common_base::mem_allocator::GlobalAllocator;
use common_base::runtime::GLOBAL_MEM_STAT;
use common_base::set_alloc_error_hook;
Expand Down Expand Up @@ -93,7 +94,7 @@ pub async fn init_services(conf: &InnerConfig) -> Result<()> {
GlobalServices::init(conf.clone()).await
}

pub async fn start_services(conf: &InnerConfig) -> Result<()> {
async fn precheck_services(conf: &InnerConfig) -> Result<()> {
if conf.query.max_memory_limit_enabled {
let size = conf.query.max_server_memory_usage as i64;
info!("Set memory limit: {}", size);
Expand Down Expand Up @@ -125,6 +126,11 @@ pub async fn start_services(conf: &InnerConfig) -> Result<()> {

#[cfg(not(target_os = "macos"))]
check_max_open_files();
Ok(())
}

pub async fn start_services(conf: &InnerConfig) -> Result<()> {
precheck_services(conf).await?;

let mut shutdown_handle = ShutdownHandle::create()?;

Expand Down Expand Up @@ -333,7 +339,16 @@ pub async fn start_services(conf: &InnerConfig) -> Result<()> {
}

info!("Ready for connections.");
shutdown_handle.wait_for_termination_request().await;
if conf.background.enable {
println!("Start background service");
get_background_service_handler()
.start(&mut shutdown_handle)
.await?;
// for one shot background service, we need to drop it manually.
drop(shutdown_handle);
} else {
shutdown_handle.wait_for_termination_request().await;
}
info!("Shutdown server.");
Ok(())
}
Expand Down
5 changes: 4 additions & 1 deletion src/common/exception/src/exception_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,10 @@ build_exceptions! {
/// LicenseKeyInvalid is used when license key verification error occurs
///
/// For example: license key is expired
LicenseKeyInvalid(1402)
LicenseKeyInvalid(1402),

BackgroundJobAlreadyExists(1501),
UnknownBackgroundJob(1502)
}

// Meta service errors [2001, 3000].
Expand Down
80 changes: 80 additions & 0 deletions src/meta/api/src/background_api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use common_meta_app::background::BackgroundJobInfo;
use common_meta_app::background::BackgroundTaskInfo;
use common_meta_app::background::CreateBackgroundJobReply;
use common_meta_app::background::CreateBackgroundJobReq;
use common_meta_app::background::DeleteBackgroundJobReply;
use common_meta_app::background::DeleteBackgroundJobReq;
use common_meta_app::background::GetBackgroundJobReply;
use common_meta_app::background::GetBackgroundJobReq;
use common_meta_app::background::GetBackgroundTaskReply;
use common_meta_app::background::GetBackgroundTaskReq;
use common_meta_app::background::ListBackgroundJobsReq;
use common_meta_app::background::ListBackgroundTasksReq;
use common_meta_app::background::UpdateBackgroundJobParamsReq;
use common_meta_app::background::UpdateBackgroundJobReply;
use common_meta_app::background::UpdateBackgroundJobStatusReq;
use common_meta_app::background::UpdateBackgroundTaskReply;
use common_meta_app::background::UpdateBackgroundTaskReq;

use crate::kv_app_error::KVAppError;

#[async_trait::async_trait]
pub trait BackgroundApi: Send + Sync {
async fn create_background_job(
&self,
req: CreateBackgroundJobReq,
) -> Result<CreateBackgroundJobReply, KVAppError>;

async fn drop_background_job(
&self,
req: DeleteBackgroundJobReq,
) -> Result<DeleteBackgroundJobReply, KVAppError>;

async fn update_background_job_status(
&self,
req: UpdateBackgroundJobStatusReq,
) -> Result<UpdateBackgroundJobReply, KVAppError>;

async fn update_background_job_params(
&self,
req: UpdateBackgroundJobParamsReq,
) -> Result<UpdateBackgroundJobReply, KVAppError>;

async fn get_background_job(
&self,
req: GetBackgroundJobReq,
) -> Result<GetBackgroundJobReply, KVAppError>;
async fn list_background_jobs(
&self,
req: ListBackgroundJobsReq,
) -> Result<Vec<(u64, BackgroundJobInfo)>, KVAppError>;
// Return a list of background tasks (task_id, BackgroundInfo)
async fn list_background_tasks(
&self,
req: ListBackgroundTasksReq,
) -> Result<Vec<(u64, BackgroundTaskInfo)>, KVAppError>;

async fn update_background_task(
&self,
req: UpdateBackgroundTaskReq,
) -> Result<UpdateBackgroundTaskReply, KVAppError>;

async fn get_background_task(
&self,
req: GetBackgroundTaskReq,
) -> Result<GetBackgroundTaskReply, KVAppError>;
}
Loading

0 comments on commit 81e7f20

Please sign in to comment.