Skip to content

Commit

Permalink
feat(cluster): support system-managed cluster (#17051)
Browse files Browse the repository at this point in the history
* feat(cluster): support custom management cluster

* feat(cluster): support custom management cluster

* feat(cluster): support custom management cluster

* feat(cluster): support custom management cluster

* feat(cluster): support custom management cluster

* feat(cluster): support custom management cluster

* feat(cluster): support custom management cluster

* feat(cluster): support custom management cluster

* feat(cluster): support custom management cluster

* feat(cluster): support custom management cluster

* feat(cluster): support custom management cluster

* feat(cluster): support custom management cluster

* feat(cluster): support custom management cluster

* feat(cluster): support custom management cluster

* feat(cluster): support custom management cluster

* feat(cluster): support custom management cluster

* feat(cluster): support custom management cluster

* feat(cluster): support custom management cluster

* feat(cluster): support custom management cluster

* feat(cluster): support custom management cluster

* feat(cluster): support custom management cluster

* feat(cluster): support custom management cluster

* feat(cluster): support custom management cluster

* feat(cluster): support custom management cluster

* feat(cluster): support custom management cluster

* feat(cluster): support custom management cluster

* feat(cluster): support custom management cluster

* feat(cluster): support custom management cluster

* feat(cluster): support custom management cluster

* feat(cluster): support custom management cluster

* feat(cluster): support custom management cluster

* feat(cluster): support custom management cluster

* feat(cluster): support custom management cluster

* feat(cluster): support custom management cluster

* refactor: improve and clean up `warehouse_mgr::upsert_self_managed()`

### Improvements:

- Added retry mechanism with a fallback in the retry loop.
- Return an error if an unexpected response is received when `TxnGetResponse` is expected.
- Refined quit-retry condition: now only triggered when the seq of `NodeInfo` changes.

### Refactoring:

- Simplified and decoupled nested branching for better readability and maintainability.
- Consolidated related logic, e.g., building `txn if_then` operations in a single place.
- Differentiated `NodeInfo` with and without warehouse-related information.

### Documentation:
- Added details explaining behavioral differences between insert and update modes.

* feat(cluster): clean code

* feat(cluster): clean code

* feat(cluster): clean code

* feat(cluster): clean code

* feat(cluster): clean code

* feat(cluster): clean code

* feat(cluster): clean code

* feat(cluster): clean code

* feat(cluster): clean code

* feat(cluster): clean code

* feat(cluster): clean code

* feat(cluster): clean code

* feat(cluster): clean code

* feat(cluster): clean code

* feat(cluster): add concurrent unit test

* feat(cluster): add concurrent unit test

* feat(cluster): clean code

* feat(cluster): clean code

---------

Co-authored-by: 张炎泼 <drdr.xp@gmail.com>
  • Loading branch information
zhang2014 and drmingdrmer authored Jan 6, 2025
1 parent 783d155 commit 6362ff4
Show file tree
Hide file tree
Showing 83 changed files with 7,228 additions and 627 deletions.
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ members = [
"src/query/ee_features/storage_quota",
"src/query/ee_features/inverted_index",
"src/query/ee_features/virtual_column",
"src/query/ee_features/resources_management",
"src/query/service",
"src/query/ee",
"src/meta/api",
Expand Down Expand Up @@ -185,6 +186,7 @@ databend-enterprise-fail-safe = { path = "src/query/ee_features/fail_safe" }
databend-enterprise-inverted-index = { path = "src/query/ee_features/inverted_index" }
databend-enterprise-meta = { path = "src/meta/ee" }
databend-enterprise-query = { path = "src/query/ee" }
databend-enterprise-resources-management = { path = "src/query/ee_features/resources_management" }
databend-enterprise-storage-encryption = { path = "src/query/ee_features/storage_encryption" }
databend-enterprise-storage-quota = { path = "src/query/ee_features/storage_quota" }
databend-enterprise-stream-handler = { path = "src/query/ee_features/stream_handler" }
Expand Down
96 changes: 96 additions & 0 deletions scripts/ci/deploy/config/databend-query-node-system-managed.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# Usage:
# databend-query -c databend_query_config_spec.toml

[query]
max_active_sessions = 256
shutdown_wait_timeout_ms = 5000

# For flight rpc.
flight_api_address = "0.0.0.0:flight_port"

# Databend Query http address.
# For admin RESET API.
admin_api_address = "0.0.0.0:admin_api_port"

# Databend Query metrics RESET API.
metric_api_address = "0.0.0.0:metric_api_port"

# Databend Query MySQL Handler.
mysql_handler_host = "0.0.0.0"
mysql_handler_port = mysql_port

# Databend Query ClickHouse Handler.
clickhouse_http_handler_host = "0.0.0.0"
clickhouse_http_handler_port = clickhouse_port

# Databend Query HTTP Handler.
http_handler_host = "0.0.0.0"
http_handler_port = http_port

# Databend Query FlightSQL Handler.
flight_sql_handler_host = "0.0.0.0"
flight_sql_handler_port = flight_sql_port

tenant_id = "test_tenant"

table_engine_memory_enabled = true
default_storage_format = 'parquet'
default_compression = 'zstd'

[[query.users]]
name = "root"
auth_type = "no_password"

[[query.users]]
name = "default"
auth_type = "no_password"

# This for test
[[query.udfs]]
name = "ping"
definition = "CREATE FUNCTION ping(STRING) RETURNS STRING LANGUAGE python HANDLER = 'ping' ADDRESS = 'http://0.0.0.0:8815'"

[query.resources_management]
type = "system_managed"
node_group

[log]

[log.file]
level = "INFO"
format = "text"
dir = "./.databend/query_logs"
prefix_filter = ""

[meta]
# It is a list of `grpc_api_advertise_host:<grpc-api-port>` of databend-meta config
endpoints = ["0.0.0.0:9191"]
username = "root"
password = "root"
client_timeout_in_second = 60
auto_sync_interval = 60

# Storage config.
[storage]
# fs | s3 | azblob | obs | oss
type = "fs"

# Set a local folder to store your data.
# Comment out this block if you're NOT using local file system as storage.
[storage.fs]
data_path = "./.databend/stateless_test_data"

# Cache config.
[cache]
# Type of storage to keep the table data cache
#
# available options: [none|disk]
# default is "none", which disable table data cache
# use "disk" to enabled disk cache
data_cache_storage = "none"

[cache.disk]
# cache path
path = "./.databend/_cache"
# max bytes of cached data 20G
max_bytes = 21474836480
135 changes: 135 additions & 0 deletions scripts/ci/deploy/databend-query-system-managed.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
#!/bin/bash
# Copyright 2022 The Databend Authors.
# SPDX-License-Identifier: Apache-2.0.

set -e

SCRIPT_PATH="$(cd "$(dirname "$0")" >/dev/null 2>&1 && pwd)"
cd "$SCRIPT_PATH/../../.." || exit
BUILD_PROFILE=${BUILD_PROFILE:-debug}

if [ $# -eq 1 ]; then
num=$1
node_group=""
elif [ $# -eq 2 ]; then
num=$1
node_group=$2
else
echo "Usage: $0 <number> - Start number of databend-query with system-managed mode"
exit 1
fi

if ! [[ "$num" =~ ^[0-9]*$ ]]; then
echo "Error: Argument must be an integer."
exit 1
fi

# Caveat: has to kill query first.
# `query` tries to remove its liveness record from meta before shutting down.
# If meta is stopped, `query` will receive an error that hangs graceful
# shutdown.
killall databend-query || true
sleep 3

killall databend-meta || true
sleep 3

for bin in databend-query databend-meta; do
if test -n "$(pgrep $bin)"; then
echo "The $bin is not killed. force killing."
killall -9 $bin || true
fi
done

# Wait for killed process to cleanup resources
sleep 1

echo 'Start Meta service HA cluster(3 nodes)...'

mkdir -p ./.databend/

nohup ./target/${BUILD_PROFILE}/databend-meta -c scripts/ci/deploy/config/databend-meta-node-1.toml >./.databend/meta-1.out 2>&1 &
python3 scripts/ci/wait_tcp.py --timeout 30 --port 9191

# wait for cluster formation to complete.
sleep 1

nohup ./target/${BUILD_PROFILE}/databend-meta -c scripts/ci/deploy/config/databend-meta-node-2.toml >./.databend/meta-2.out 2>&1 &
python3 scripts/ci/wait_tcp.py --timeout 30 --port 28202

# wait for cluster formation to complete.
sleep 1

nohup ./target/${BUILD_PROFILE}/databend-meta -c scripts/ci/deploy/config/databend-meta-node-3.toml >./.databend/meta-3.out 2>&1 &
python3 scripts/ci/wait_tcp.py --timeout 30 --port 28302

# wait for cluster formation to complete.
sleep 1

find_available_port() {
local base_port=20000
local max_port=65535
local attempts=10

for ((i=0; i<attempts; i++)); do
port=$(( RANDOM % (max_port - base_port + 1) + base_port ))
if ! lsof -i :$port >/dev/null 2>&1; then
echo $port
return
fi
done

echo "Unable to find an available port after $attempts attempts" >&2
exit 1
}


start_databend_query() {
local http_port=$1
local mysql_port=$2
local log_dir=$3
local node_group=$4
system_managed_config="./scripts/ci/deploy/config/databend-query-node-system-managed.toml"

temp_file=$(mktemp)

if [ -f "$system_managed_config" ]; then
sed -e "s/flight_port/$(find_available_port)/g" \
-e "s/admin_api_port/$(find_available_port)/g" \
-e "s/metric_api_port/$(find_available_port)/g" \
-e "s/mysql_port/${mysql_port}/g" \
-e "s/clickhouse_port/$(find_available_port)/g" \
-e "s/http_port/${http_port}/g" \
-e "s/flight_sql_port/$(find_available_port)/g" \
-e "s/query_logs/${log_dir}/g" \
-e "s/node_group/node_group=\"${node_group}\"/g" \
"$system_managed_config" > "$temp_file"

if [ $? -eq 0 ]; then
echo "Start databend-query on port $http_port..."
nohup target/${BUILD_PROFILE}/databend-query -c $temp_file --internal-enable-sandbox-tenant &

echo "Waiting on databend-query 10 seconds..."
python3 scripts/ci/wait_tcp.py --timeout 30 --port $http_port
else
echo "Error occurred during port replacement."
rm -f "$temp_file"
exit 1
fi
else
echo "Error: system-managed config file is not exists."
exit 1
fi
}

if ! lsof -i :8000 >/dev/null 2>&1; then
start_databend_query 8000 3307 "logs_1" $node_group
num=$(( num - 1 ))
fi

for (( i=0; i<$num; i++ ))
do
http_port=$(find_available_port)
mysql_port=$(find_available_port)
start_databend_query $http_port $mysql_port "logs_$http_port" $node_group
done
2 changes: 1 addition & 1 deletion src/binaries/query/ee_main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ pub async fn main_entrypoint() -> Result<(), MainError> {
return Ok(());
}

init_services(&conf).await.with_context(make_error)?;
init_services(&conf, true).await.with_context(make_error)?;
EnterpriseServices::init(conf.clone())
.await
.with_context(make_error)?;
Expand Down
6 changes: 4 additions & 2 deletions src/binaries/query/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ pub async fn run_cmd(conf: &InnerConfig) -> Result<bool, MainError> {
Ok(true)
}

pub async fn init_services(conf: &InnerConfig) -> Result<(), MainError> {
pub async fn init_services(conf: &InnerConfig, ee_mode: bool) -> Result<(), MainError> {
let make_error = || "failed to init services";

let binary_version = DATABEND_COMMIT_VERSION.clone();
Expand All @@ -93,7 +93,9 @@ pub async fn init_services(conf: &InnerConfig) -> Result<(), MainError> {
.with_context(make_error);
}
// Make sure global services have been inited.
GlobalServices::init(conf).await.with_context(make_error)
GlobalServices::init(conf, ee_mode)
.await
.with_context(make_error)
}

async fn precheck_services(conf: &InnerConfig) -> Result<(), MainError> {
Expand Down
2 changes: 1 addition & 1 deletion src/binaries/query/oss_main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ async fn main_entrypoint() -> Result<(), MainError> {
return Ok(());
}

init_services(&conf).await?;
init_services(&conf, false).await?;
// init oss license manager
OssLicenseManager::init(conf.query.tenant_id.tenant_name().to_string())
.with_context(make_error)?;
Expand Down
2 changes: 1 addition & 1 deletion src/binaries/tool/table_meta_inspector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ async fn parse_input_data(config: &InspectorConfig) -> Result<Vec<u8>> {
builder = builder.collect(from_file(Toml, config_file));
let read_config = builder.build()?;
let inner_config: InnerConfig = read_config.clone().try_into()?;
GlobalServices::init(&inner_config).await?;
GlobalServices::init(&inner_config, false).await?;
let storage_config: StorageConfig = read_config.storage.try_into()?;
init_operator(&storage_config.params)?
}
Expand Down
1 change: 1 addition & 0 deletions src/common/base/src/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub const HEADER_QUERY_STATE: &str = "X-DATABEND-QUERY-STATE";
pub const HEADER_QUERY_PAGE_ROWS: &str = "X-DATABEND-QUERY-PAGE-ROWS";
pub const HEADER_VERSION: &str = "X-DATABEND-VERSION";
pub const HEADER_STICKY: &str = "X-DATABEND-STICKY-NODE";
pub const HEADER_WAREHOUSE: &str = "X-DATABEND-WAREHOUSE";

pub const HEADER_SIGNATURE: &str = "X-DATABEND-SIGNATURE";
pub const HEADER_AUTH_METHOD: &str = "X-DATABEND-AUTH-METHOD";
7 changes: 7 additions & 0 deletions src/common/exception/src/exception_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,13 @@ build_exceptions! {
ClusterUnknownNode(2401),
ClusterNodeAlreadyExists(2402),
InvalidWarehouse(2403),
NoResourcesAvailable(2404),
WarehouseAlreadyExists(2405),
UnknownWarehouse(2406),
WarehouseOperateConflict(2407),
EmptyNodesForWarehouse(2408),
WarehouseClusterAlreadyExists(2409),
WarehouseClusterNotExists(2410),

// Stage error codes.
UnknownStage(2501),
Expand Down
3 changes: 3 additions & 0 deletions src/common/license/src/license.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ pub enum Feature {
StorageQuota(StorageQuota),
#[serde(alias = "amend_table", alias = "AMEND_TABLE")]
AmendTable,
#[serde(alias = "system_management", alias = "SYSTEM_MANAGEMENT")]
SystemManagement,
#[serde(other)]
Unknown,
}
Expand Down Expand Up @@ -119,6 +121,7 @@ impl fmt::Display for Feature {
write!(f, ")")
}
Feature::AmendTable => write!(f, "amend_table"),
Feature::SystemManagement => write!(f, "system_management"),
Feature::Unknown => write!(f, "unknown"),
}
}
Expand Down
Loading

0 comments on commit 6362ff4

Please sign in to comment.