diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 3d2492996076c1..c9e63c54de6b47 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1564,6 +1564,8 @@ DEFINE_mBool(enable_auto_clone_on_mow_publish_missing_version, "false"); // The capacity of segment partial column cache, used to cache column readers for each segment. DEFINE_mInt32(max_segment_partial_column_cache_size, "500"); +DEFINE_mBool(enable_wal_tde, "false"); + // clang-format off #ifdef BE_TEST // test s3 diff --git a/be/src/common/config.h b/be/src/common/config.h index 39d087cc515d45..6f214361524ecd 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1631,6 +1631,8 @@ DECLARE_mBool(enable_auto_clone_on_mow_publish_missing_version); // The capacity of segment partial column cache, used to cache column readers for each segment. DECLARE_mInt32(max_segment_partial_column_cache_size); +DECLARE_mBool(enable_wal_tde); + #ifdef BE_TEST // test s3 DECLARE_String(test_s3_resource); diff --git a/be/src/exec/schema_scanner/schema_encryption_keys_scanner.cpp b/be/src/exec/schema_scanner/schema_encryption_keys_scanner.cpp index c6e3e9f6f0134a..72c449f9199c68 100644 --- a/be/src/exec/schema_scanner/schema_encryption_keys_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_encryption_keys_scanner.cpp @@ -113,6 +113,7 @@ Status SchemaEncryptionKeysScanner::_fill_block_impl(vectorized::Block* block) { std::vector str_refs(row_num); std::vector int_vals(row_num); + std::vector int64_vals(row_num); std::vector bool_vals(row_num); std::vector datas(row_num); std::vector column_values(row_num); @@ -184,11 +185,15 @@ Status SchemaEncryptionKeysScanner::_fill_block_impl(vectorized::Block* block) { ? encryption_key.parent_version() : 0; break; + } + datas[row_idx] = &int_vals[row_idx]; + } else if (col_desc.type == TYPE_BIGINT) { + switch (col_idx) { case 8: - int_vals[row_idx] = encryption_key.has_crc32() ? encryption_key.crc32() : 0; + int64_vals[row_idx] = encryption_key.has_crc32() ? encryption_key.crc32() : 0; break; } - datas[row_idx] = &int_vals[row_idx]; + datas[row_idx] = &int64_vals[row_idx]; } else if (col_desc.type == TYPE_DATETIMEV2) { switch (col_idx) { case 9: diff --git a/be/src/http/action/check_encryption_action.cpp b/be/src/http/action/check_encryption_action.cpp new file mode 100644 index 00000000000000..b122464a53925a --- /dev/null +++ b/be/src/http/action/check_encryption_action.cpp @@ -0,0 +1,174 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "http/action/check_encryption_action.h" + +#include + +#include +#include +#include +#include +#include + +#include "cloud/cloud_tablet.h" +#include "cloud/config.h" +#include "http/http_channel.h" +#include "http/http_headers.h" +#include "http/http_status.h" +#include "io/fs/file_reader.h" +#include "io/fs/file_system.h" +#include "io/fs/path.h" +#include "olap/rowset/rowset_fwd.h" +#include "olap/tablet_fwd.h" +#include "runtime/exec_env.h" + +namespace doris { + +const std::string TABLET_ID = "tablet_id"; + +CheckEncryptionAction::CheckEncryptionAction(ExecEnv* exec_env, TPrivilegeHier::type hier, + TPrivilegeType::type type) + : HttpHandlerWithAuth(exec_env, hier, type) {} + +Result is_tablet_encrypted(const BaseTabletSPtr& tablet) { + auto tablet_meta = tablet->tablet_meta(); + if (tablet_meta->encryption_algorithm() == EncryptionAlgorithmPB::PLAINTEXT) { + return false; + } + Status st; + bool is_encrypted = true; + tablet->traverse_rowsets([&st, &tablet, &is_encrypted](const RowsetSharedPtr& rs) { + if (!st) { + return; + } + + auto rs_meta = rs->rowset_meta(); + if (config::is_cloud_mode() && rs_meta->start_version() == 0 && + rs_meta->end_version() == 1) { + return; + } + auto fs = rs_meta->physical_fs(); + if (fs == nullptr) { + st = Status::InternalError("failed to get fs for rowset: tablet={}, rs={}", + tablet->tablet_id(), rs->rowset_id().to_string()); + return; + } + + if (rs->num_segments() == 0) { + return; + } + auto maybe_seg_path = rs->segment_path(0); + if (!maybe_seg_path) { + st = std::move(maybe_seg_path.error()); + return; + } + + std::vector file_paths; + const auto& first_seg_path = maybe_seg_path.value(); + file_paths.emplace_back(first_seg_path); + if (tablet->tablet_schema()->has_inverted_index() && + tablet->tablet_schema()->get_inverted_index_storage_format() == V2) { + std::string inverted_index_file_path = InvertedIndexDescriptor::get_index_file_path_v2( + InvertedIndexDescriptor::get_index_file_path_prefix(first_seg_path)); + file_paths.emplace_back(inverted_index_file_path); + } + + for (const auto path : file_paths) { + io::FileReaderSPtr reader; + st = fs->open_file(path, &reader); + if (!st) { + return; + } + std::vector magic_code_buf; + magic_code_buf.reserve(sizeof(uint64_t)); + Slice magic_code(magic_code_buf.data(), sizeof(uint64_t)); + size_t bytes_read; + st = reader->read_at(reader->size() - sizeof(uint64_t), magic_code, &bytes_read); + if (!st) { + return; + } + + std::vector answer = {'A', 'B', 'C', 'D', 'E', 'A', 'B', 'C'}; + is_encrypted &= Slice::mem_equal(answer.data(), magic_code.data, magic_code.size); + if (!is_encrypted) { + LOG(INFO) << "found not encrypted segment, path=" << first_seg_path; + } + } + }); + + if (st) { + return is_encrypted; + } + return st; +} + +Status sync_meta(const CloudTabletSPtr& tablet) { + RETURN_IF_ERROR(tablet->sync_meta()); + RETURN_IF_ERROR(tablet->sync_rowsets()); + return Status::OK(); +} + +void CheckEncryptionAction::handle(HttpRequest* req) { + req->add_output_header(HttpHeaders::CONTENT_TYPE, HttpHeaders::JSON_TYPE.data()); + auto tablet_id_str = req->param(TABLET_ID); + + if (tablet_id_str.empty()) { + HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, + "tablet id should be set in request params"); + return; + } + int64_t tablet_id = -1; + try { + tablet_id = std::stoll(tablet_id_str); + } catch (const std::exception& e) { + LOG(WARNING) << "convert tablet id to i64 failed:" << e.what(); + auto msg = fmt::format("invalid argument: tablet_id={}", tablet_id_str); + + HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, msg); + return; + } + + auto maybe_tablet = ExecEnv::get_tablet(tablet_id); + if (!maybe_tablet) { + HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, maybe_tablet.error().to_string()); + return; + } + auto tablet = maybe_tablet.value(); + + if (config::is_cloud_mode()) { + auto cloud_tablet = std::dynamic_pointer_cast(tablet); + DCHECK_NE(cloud_tablet, nullptr); + auto st = sync_meta(cloud_tablet); + if (!st) { + HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, st.to_json()); + return; + } + } + + auto maybe_is_encrypted = is_tablet_encrypted(tablet); + if (maybe_is_encrypted.has_value()) { + HttpChannel::send_reply( + req, HttpStatus::OK, + maybe_is_encrypted.value() ? "all encrypted" : "some are not encrypted"); + return; + } + HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, + maybe_is_encrypted.error().to_json()); +} + +} // namespace doris diff --git a/be/src/http/action/check_encryption_action.h b/be/src/http/action/check_encryption_action.h new file mode 100644 index 00000000000000..d5545379c50e72 --- /dev/null +++ b/be/src/http/action/check_encryption_action.h @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include "cloud/cloud_tablet_mgr.h" +#include "http/http_handler_with_auth.h" +#include "http/http_request.h" +#include "olap/tablet_manager.h" +#include "runtime/exec_env.h" +namespace doris { + +class CheckEncryptionAction : public HttpHandlerWithAuth { +public: + explicit CheckEncryptionAction(ExecEnv* exec_env, TPrivilegeHier::type hier, + TPrivilegeType::type type); + + void handle(HttpRequest* req) override; +}; + +} // namespace doris diff --git a/be/src/io/fs/encrypted_fs_factory.h b/be/src/io/fs/encrypted_fs_factory.h index 3cf24b605d11c8..526c232a9c86ae 100644 --- a/be/src/io/fs/encrypted_fs_factory.h +++ b/be/src/io/fs/encrypted_fs_factory.h @@ -19,8 +19,6 @@ #include -#include - #include "io/fs/file_system.h" namespace doris::io { diff --git a/be/src/io/fs/file_writer.h b/be/src/io/fs/file_writer.h index 12ec8170f7ccf4..3aa8fe7e3276c8 100644 --- a/be/src/io/fs/file_writer.h +++ b/be/src/io/fs/file_writer.h @@ -21,11 +21,9 @@ #include #include "common/status.h" -#include "gutil/macros.h" #include "io/cache/block_file_cache.h" #include "io/cache/block_file_cache_factory.h" #include "io/cache/file_cache_common.h" -#include "io/fs/file_reader.h" #include "io/fs/file_reader_writer_fwd.h" #include "io/fs/path.h" #include "util/slice.h" diff --git a/be/src/olap/rowset/rowset_meta.cpp b/be/src/olap/rowset/rowset_meta.cpp index e240bf16c453ab..ec77f06855dad4 100644 --- a/be/src/olap/rowset/rowset_meta.cpp +++ b/be/src/olap/rowset/rowset_meta.cpp @@ -97,20 +97,22 @@ bool RowsetMeta::json_rowset_meta(std::string* json_rowset_meta) { return ret; } -io::FileSystemSPtr RowsetMeta::fs() { - auto fs = [this]() -> io::FileSystemSPtr { - if (is_local()) { - return io::global_local_filesystem(); - } +io::FileSystemSPtr RowsetMeta::physical_fs() { + if (is_local()) { + return io::global_local_filesystem(); + } - auto storage_resource = remote_storage_resource(); - if (storage_resource) { - return storage_resource.value()->fs; - } else { - LOG(WARNING) << storage_resource.error(); - return nullptr; - } - }(); + auto storage_resource = remote_storage_resource(); + if (storage_resource) { + return storage_resource.value()->fs; + } else { + LOG(WARNING) << storage_resource.error(); + return nullptr; + } +} + +io::FileSystemSPtr RowsetMeta::fs() { + auto fs = physical_fs(); #ifndef BE_TEST auto algorithm = _determine_encryption_once.call([this]() -> Result { diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h index 1b8817b0650ff4..1ec5b30f0feb9e 100644 --- a/be/src/olap/rowset/rowset_meta.h +++ b/be/src/olap/rowset/rowset_meta.h @@ -60,6 +60,8 @@ class RowsetMeta : public MetadataAdder { // Note that if the resource id cannot be found for the corresponding remote file system, nullptr will be returned. io::FileSystemSPtr fs(); + io::FileSystemSPtr physical_fs(); + Result remote_storage_resource(); void set_remote_storage_resource(StorageResource resource); diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index a956a24323b416..548ac7935ea858 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -105,15 +105,16 @@ Status Segment::_open(io::FileSystemSPtr fs, const std::string& path, uint32_t s const io::FileReaderOptions& reader_options, std::shared_ptr* output, InvertedIndexFileInfo idx_file_info, OlapReaderStatistics* stats) { io::FileReaderSPtr file_reader; - RETURN_IF_ERROR(fs->open_file(path, &file_reader, &reader_options)); + auto st = fs->open_file(path, &file_reader, &reader_options); + TEST_INJECTION_POINT_CALLBACK("Segment::open:corruption", &st); std::shared_ptr segment( new Segment(segment_id, rowset_id, std::move(tablet_schema), idx_file_info)); - segment->_fs = fs; - segment->_file_reader = std::move(file_reader); - auto st = segment->_open(stats); - TEST_INJECTION_POINT_CALLBACK("Segment::open:corruption", &st); - if (st.is() && - reader_options.cache_type == io::FileCachePolicy::FILE_BLOCK_CACHE) { + if (st) { + segment->_fs = fs; + segment->_file_reader = std::move(file_reader); + st = segment->_open(stats); + } else if (st.is() && + reader_options.cache_type == io::FileCachePolicy::FILE_BLOCK_CACHE) { LOG(WARNING) << "bad segment file may be read from file cache, try to read remote source " "file directly, file path: " << path << " cache_key: " << file_cache_key_str(path); @@ -121,9 +122,11 @@ Status Segment::_open(io::FileSystemSPtr fs, const std::string& path, uint32_t s auto* file_cache = io::FileCacheFactory::instance()->get_by_path(file_key); file_cache->remove_if_cached(file_key); - RETURN_IF_ERROR(fs->open_file(path, &file_reader, &reader_options)); - segment->_file_reader = std::move(file_reader); - st = segment->_open(stats); + st = fs->open_file(path, &file_reader, &reader_options); + if (st) { + segment->_file_reader = std::move(file_reader); + st = segment->_open(stats); + } TEST_INJECTION_POINT_CALLBACK("Segment::open:corruption1", &st); if (st.is()) { // corrupt again LOG(WARNING) << "failed to try to read remote source file again with cache support," diff --git a/be/src/olap/wal/wal_writer.cpp b/be/src/olap/wal/wal_writer.cpp index b5151217cc2e20..385aef5245b64d 100644 --- a/be/src/olap/wal/wal_writer.cpp +++ b/be/src/olap/wal/wal_writer.cpp @@ -42,6 +42,11 @@ WalWriter::WalWriter(const std::string& file_name) : _file_name(file_name) {} WalWriter::~WalWriter() {} Status determine_wal_fs(int64_t db_id, int64_t tb_id, io::FileSystemSPtr& fs) { + if (!config::enable_wal_tde) { + fs = io::global_local_filesystem(); + return Status::OK(); + } + #ifndef BE_TEST TNetworkAddress master_addr = ExecEnv::GetInstance()->cluster_info()->master_fe_addr; TGetTableTDEInfoRequest req; diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp index 6c604dd09d006c..429da49dd351fc 100644 --- a/be/src/service/http_service.cpp +++ b/be/src/service/http_service.cpp @@ -34,6 +34,7 @@ #include "http/action/batch_download_action.h" #include "http/action/be_proc_thread_action.h" #include "http/action/calc_file_crc_action.h" +#include "http/action/check_encryption_action.h" #include "http/action/check_rpc_channel_action.h" #include "http/action/check_tablet_segment_action.h" #include "http/action/checksum_action.h" @@ -427,6 +428,10 @@ void HttpService::register_local_handler(StorageEngine& engine) { _env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN, engine.tablet_manager())); _ev_http_server->register_handler(HttpMethod::GET, "/api/compaction_score", compaction_score_action); + CheckEncryptionAction* check_encryption_action = + _pool.add(new CheckEncryptionAction(_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ALL)); + _ev_http_server->register_handler(HttpMethod::GET, "/api/check_tablet_encryption", + check_encryption_action); } void HttpService::register_cloud_handler(CloudStorageEngine& engine) { @@ -477,6 +482,10 @@ void HttpService::register_cloud_handler(CloudStorageEngine& engine) { _env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN, engine.tablet_mgr())); _ev_http_server->register_handler(HttpMethod::GET, "/api/compaction_score", compaction_score_action); + CheckEncryptionAction* check_encryption_action = + _pool.add(new CheckEncryptionAction(_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ALL)); + _ev_http_server->register_handler(HttpMethod::GET, "/api/check_tablet_encryption", + check_encryption_action); } // NOLINTEND(readability-function-size) diff --git a/be/test/io/fs/hdfs_file_system_test.cpp b/be/test/io/fs/hdfs_file_system_test.cpp index b69b7792cac33e..2dcf1696a8b755 100644 --- a/be/test/io/fs/hdfs_file_system_test.cpp +++ b/be/test/io/fs/hdfs_file_system_test.cpp @@ -19,6 +19,7 @@ #include "common/config.h" #include "cpp/sync_point.h" +#include "io/fs/file_reader.h" #include "io/fs/file_writer.h" #include "io/fs/hdfs_file_writer.h" #include "io/fs/local_file_system.h" diff --git a/build.sh b/build.sh index a096c0a9454a69..9f8614df4ee33b 100755 --- a/build.sh +++ b/build.sh @@ -526,6 +526,9 @@ modules=("") if [[ "${BUILD_FE}" -eq 1 ]]; then modules+=("fe-common") modules+=("fe-core") + if [[ "${WITH_TDE_DIR}" != "" ]]; then + modules+=("fe-${WITH_TDE_DIR}") + fi fi if [[ "${BUILD_HIVE_UDF}" -eq 1 ]]; then modules+=("fe-common") @@ -746,6 +749,10 @@ if [[ "${BUILD_FE}" -eq 1 ]]; then install -d "${DORIS_OUTPUT}/fe/lib/jindofs" cp -r -p "${DORIS_HOME}/fe/fe-core/target/lib"/* "${DORIS_OUTPUT}/fe/lib"/ cp -r -p "${DORIS_HOME}/fe/fe-core/target/doris-fe.jar" "${DORIS_OUTPUT}/fe/lib"/ + if [[ "${WITH_TDE_DIR}" != "" ]]; then + cp -r -p "${DORIS_HOME}/fe/fe-${WITH_TDE_DIR}/target/fe-${WITH_TDE_DIR}-1.2-SNAPSHOT.jar" "${DORIS_OUTPUT}/fe/lib"/ + fi + #cp -r -p "${DORIS_HOME}/docs/build/help-resource.zip" "${DORIS_OUTPUT}/fe/lib"/ # copy jindofs jars, only support for Linux x64 or arm diff --git a/docker/runtime/doris-compose/cluster.py b/docker/runtime/doris-compose/cluster.py index cf63f403240fce..5fb2b687069f7c 100644 --- a/docker/runtime/doris-compose/cluster.py +++ b/docker/runtime/doris-compose/cluster.py @@ -350,6 +350,12 @@ def get_ip(self): int(seq / IP_PART4_SIZE), seq % IP_PART4_SIZE) + def get_tde_ak(self): + return self.cluster.tde_ak + + def get_tde_sk(self): + return self.cluster.tde_sk + def get_default_named_ports(self): # port_name : default_port # the port_name come from fe.conf, be.conf, cloud.conf, etc @@ -390,6 +396,8 @@ def docker_env(self): "STOP_GRACE": 1 if enable_coverage else 0, "IS_CLOUD": 1 if self.cluster.is_cloud else 0, "SQL_MODE_NODE_MGR": 1 if self.cluster.sql_mode_node_mgr else 0, + "TDE_AK": self.get_tde_ak(), + "TDE_SK": self.get_tde_sk(), } if self.cluster.is_cloud: @@ -810,7 +818,7 @@ def __init__(self, name, subnet, image, is_cloud, is_root_user, fe_config, be_config, ms_config, recycle_config, remote_master_fe, local_network_ip, fe_follower, be_disks, be_cluster, reg_be, extra_hosts, coverage_dir, cloud_store_config, - sql_mode_node_mgr, be_metaservice_endpoint, be_cluster_id): + sql_mode_node_mgr, be_metaservice_endpoint, be_cluster_id, tde_ak, tde_sk): self.name = name self.subnet = subnet self.image = image @@ -839,13 +847,15 @@ def __init__(self, name, subnet, image, is_cloud, is_root_user, fe_config, self.sql_mode_node_mgr = sql_mode_node_mgr self.be_metaservice_endpoint = be_metaservice_endpoint self.be_cluster_id = be_cluster_id + self.tde_ak = tde_ak + self.tde_sk = tde_sk @staticmethod def new(name, image, is_cloud, is_root_user, fe_config, be_config, ms_config, recycle_config, remote_master_fe, local_network_ip, fe_follower, be_disks, be_cluster, reg_be, extra_hosts, coverage_dir, cloud_store_config, sql_mode_node_mgr, - be_metaservice_endpoint, be_cluster_id): + be_metaservice_endpoint, be_cluster_id, tde_ak, tde_sk): if not os.path.exists(LOCAL_DORIS_PATH): os.makedirs(LOCAL_DORIS_PATH, exist_ok=True) os.chmod(LOCAL_DORIS_PATH, 0o777) @@ -860,7 +870,7 @@ def new(name, image, is_cloud, is_root_user, fe_config, be_config, be_disks, be_cluster, reg_be, extra_hosts, coverage_dir, cloud_store_config, sql_mode_node_mgr, be_metaservice_endpoint, - be_cluster_id) + be_cluster_id, tde_ak, tde_sk) os.makedirs(cluster.get_path(), exist_ok=True) os.makedirs(get_status_path(name), exist_ok=True) cluster._save_meta() diff --git a/docker/runtime/doris-compose/command.py b/docker/runtime/doris-compose/command.py index 35adfacf7aae92..97ec79a213bf0d 100644 --- a/docker/runtime/doris-compose/command.py +++ b/docker/runtime/doris-compose/command.py @@ -493,6 +493,18 @@ def add_parser(self, args_parsers): default="7.1.26", help="fdb image version. Only use in cloud cluster.") + parser.add_argument( + "--tde-ak", + type=str, + default="", + help="tde ak") + + parser.add_argument( + "--tde-sk", + type=str, + default="", + help="tde sk") + # if default==True, use this style to parser, like --detach if self._support_boolean_action(): parser.add_argument( @@ -603,7 +615,7 @@ def run(self, args): args.remote_master_fe, args.local_network_ip, args.fe_follower, args.be_disks, args.be_cluster, args.reg_be, args.extra_hosts, args.coverage_dir, cloud_store_config, args.sql_mode_node_mgr, - args.be_metaservice_endpoint, args.be_cluster_id) + args.be_metaservice_endpoint, args.be_cluster_id, args.tde_ak, args.tde_sk) LOG.info("Create new cluster {} succ, cluster path is {}".format( args.NAME, cluster.get_path())) diff --git a/docker/runtime/doris-compose/resource/init_fe.sh b/docker/runtime/doris-compose/resource/init_fe.sh index 022928cbce61dc..4e846ed182f4ec 100755 --- a/docker/runtime/doris-compose/resource/init_fe.sh +++ b/docker/runtime/doris-compose/resource/init_fe.sh @@ -90,6 +90,8 @@ fe_daemon() { } run_fe() { + export DORIS_TDE_AK=${TDE_AK} + export DORIS_TDE_SK=${TDE_SK} health_log "run start_fe.sh" bash $DORIS_HOME/bin/start_fe.sh --daemon $@ | tee -a $DORIS_HOME/log/fe.out } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java index 7a9f8567988c4a..35664e768e005b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java @@ -1878,16 +1878,19 @@ public static boolean analyzeEnableTypedPathsToSparse(Map proper public static TEncryptionAlgorithm analyzeTDEAlgorithm(Map properties) throws AnalysisException { String name; + //if (properties == null || !properties.containsKey(PROPERTIES_TDE_ALGORITHM)) { + // name = Config.doris_tde_algorithm; + //} else if (!PLAINTEXT.equals(Config.doris_tde_algorithm)) { + // throw new AnalysisException("Cannot create a table on encrypted FE," + // + " please set Config.doris_tde_algorithm to PLAINTEXT"); + //} else { + // name = properties.remove(PROPERTIES_TDE_ALGORITHM); + //} + // if (properties == null || !properties.containsKey(PROPERTIES_TDE_ALGORITHM)) { - if (Config.doris_tde_algorithm.isEmpty()) { - return TEncryptionAlgorithm.PLAINTEXT; - } name = Config.doris_tde_algorithm; - } else if (!PLAINTEXT.equals(Config.doris_tde_algorithm)) { - throw new AnalysisException("Cannot create a table on encrypted FE," - + " please set Config.doris_tde_algorithm to PLAINTEXT"); } else { - name = properties.remove(PROPERTIES_TDE_ALGORITHM); + throw new AnalysisException("Do not support tde_algorithm property currently"); } if (AES256.equalsIgnoreCase(name)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index a0d9a69b9d9baa..e5c3ba5bf77496 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -139,6 +139,7 @@ import org.apache.doris.common.util.TimeUtils; import org.apache.doris.common.util.Util; import org.apache.doris.datasource.es.EsRepository; +import org.apache.doris.encryption.EncryptionKey; import org.apache.doris.event.DropPartitionEvent; import org.apache.doris.mtmv.MTMVUtil; import org.apache.doris.mysql.privilege.PrivPredicate; @@ -3102,9 +3103,23 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx try { TEncryptionAlgorithm tdeAlgorithm = PropertyAnalyzer.analyzeTDEAlgorithm(properties); + if (tdeAlgorithm != TEncryptionAlgorithm.PLAINTEXT) { + List masterKeys = Env.getCurrentEnv().getKeyManager().getAllMasterKeys(); + if (masterKeys == null || masterKeys.isEmpty()) { + throw new DdlException("The TDE master key does not exist, so encrypted table cannot be created. " + + "Please check whether the root key is correctly set"); + } + + for (EncryptionKey masterKey : masterKeys) { + if (masterKey.algorithm.toThrift() == tdeAlgorithm && !masterKey.isDecrypted()) { + throw new DdlException("The master key has not been decrypted. Please check whether" + + " the root key is functioning properly or configured correctly."); + } + } + } olapTable.setEncryptionAlgorithm(tdeAlgorithm); } catch (Exception e) { - throw new DdlException(e.getMessage()); + throw new DdlException("Failed to set TDE algorithm: " + e.getMessage(), e); } olapTable.initSchemaColumnUniqueId(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/encryption/EncryptionKey.java b/fe/fe-core/src/main/java/org/apache/doris/encryption/EncryptionKey.java index b1663264cf3c4e..d07c864d0da601 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/encryption/EncryptionKey.java +++ b/fe/fe-core/src/main/java/org/apache/doris/encryption/EncryptionKey.java @@ -17,11 +17,29 @@ package org.apache.doris.encryption; +import org.apache.doris.proto.OlapFile.EncryptionAlgorithmPB; +import org.apache.doris.proto.OlapFile.EncryptionKeyPB; +import org.apache.doris.proto.OlapFile.EncryptionKeyPB.Builder; +import org.apache.doris.proto.OlapFile.EncryptionKeyTypePB; +import org.apache.doris.thrift.TEncryptionAlgorithm; +import org.apache.doris.thrift.TEncryptionKey; + import com.google.gson.annotations.SerializedName; +import com.google.protobuf.ByteString; public class EncryptionKey { public enum Algorithm { AES256, SM4; + public TEncryptionAlgorithm toThrift() { + switch (this) { + case AES256: + return TEncryptionAlgorithm.AES256; + case SM4: + return TEncryptionAlgorithm.SM4; + default: + throw new RuntimeException("invalid algorithm: " + this); + } + } } public enum KeyType { @@ -61,6 +79,50 @@ public enum KeyType { @SerializedName(value = "mtime") public long mtime; + public boolean isDecrypted() { + return plaintext != null && plaintext.length > 0; + } + + public TEncryptionKey toThrift() { + Builder builder = EncryptionKeyPB.newBuilder(); + builder.setId(id); + builder.setVersion(version); + builder.setParentId(parentId); + builder.setParentVersion(parentVersion); + switch (algorithm) { + case AES256: + builder.setAlgorithm(EncryptionAlgorithmPB.AES_256_CTR); + break; + case SM4: + builder.setAlgorithm(EncryptionAlgorithmPB.SM4_128_CTR); + break; + default: + // do nothing + } + switch (type) { + case DATA_KEY: + builder.setType(EncryptionKeyTypePB.DATA_KEY); + break; + case MASTER_KEY: + builder.setType(EncryptionKeyTypePB.MASTER_KEY); + break; + default: + // do nothing + } + builder.setCiphertextBase64(ciphertext); + if (isDecrypted()) { + builder.setPlaintext(ByteString.copyFrom(plaintext)); + } + builder.setCrc32(crc); + builder.setCtime(ctime); + builder.setMtime(mtime); + EncryptionKeyPB keyPB = builder.build(); + + TEncryptionKey tk = new TEncryptionKey(); + tk.setKeyPb(keyPB.toByteArray()); + return tk; + } + @Override public String toString() { return "EncryptionKey{" diff --git a/fe/fe-core/src/main/java/org/apache/doris/encryption/KeyManagerStore.java b/fe/fe-core/src/main/java/org/apache/doris/encryption/KeyManagerStore.java index 89ca1c3a6b7325..1e3a96c82a315f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/encryption/KeyManagerStore.java +++ b/fe/fe-core/src/main/java/org/apache/doris/encryption/KeyManagerStore.java @@ -22,8 +22,6 @@ import org.apache.doris.persist.gson.GsonUtils; import com.google.gson.annotations.SerializedName; -import lombok.Getter; -import lombok.Setter; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -32,22 +30,69 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.locks.ReentrantReadWriteLock; public class KeyManagerStore implements Writable { private static final Logger LOG = LogManager.getLogger(KeyManagerStore.class); - @Setter - @Getter @SerializedName(value = "rootKeyInfo") private RootKeyInfo rootKeyInfo; - @Setter - @Getter @SerializedName(value = "masterKeys") - private List masterKeys = new ArrayList<>(); + private final List masterKeys = new ArrayList<>(); + + private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); + + private void readLock() { + lock.readLock().lock(); + } + + private void readUnlock() { + lock.readLock().unlock(); + } + + private void writeLock() { + lock.writeLock().lock(); + } + + private void writeUnlock() { + lock.writeLock().unlock(); + } public void addMasterKey(EncryptionKey masterKey) { - masterKeys.add(masterKey); + writeLock(); + try { + masterKeys.add(masterKey); + } finally { + writeUnlock(); + } + } + + public List getMasterKeys() { + readLock(); + try { + return masterKeys; + } finally { + readUnlock(); + } + } + + public void setRootKeyInfo(RootKeyInfo info) { + writeLock(); + try { + this.rootKeyInfo = info; + } finally { + writeUnlock(); + } + } + + public RootKeyInfo getRootKeyInfo() { + readLock(); + try { + return rootKeyInfo; + } finally { + readUnlock(); + } } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 4a79c7827d5021..6057a20d9f389f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -99,10 +99,6 @@ import org.apache.doris.plsql.metastore.PlsqlPackage; import org.apache.doris.plsql.metastore.PlsqlProcedureKey; import org.apache.doris.plsql.metastore.PlsqlStoredProcedure; -import org.apache.doris.proto.OlapFile.EncryptionAlgorithmPB; -import org.apache.doris.proto.OlapFile.EncryptionKeyPB; -import org.apache.doris.proto.OlapFile.EncryptionKeyPB.Builder; -import org.apache.doris.proto.OlapFile.EncryptionKeyTypePB; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.ConnectContext.ConnectType; import org.apache.doris.qe.ConnectProcessor; @@ -294,7 +290,6 @@ import com.google.common.collect.Maps; import com.google.common.collect.Multimap; import com.google.common.collect.Sets; -import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -1506,43 +1501,6 @@ public TLoadTxnCommitResult loadTxnPreCommit(TLoadTxnCommitRequest request) thro return result; } - public TEncryptionKey encryptionKeyToThrift(EncryptionKey encryptionKey) { - Builder builder = EncryptionKeyPB.newBuilder(); - builder.setId(encryptionKey.id); - builder.setVersion(encryptionKey.version); - builder.setParentId(encryptionKey.parentId); - builder.setParentVersion(encryptionKey.parentVersion); - switch (encryptionKey.algorithm) { - case AES256: - builder.setAlgorithm(EncryptionAlgorithmPB.AES_256_CTR); - break; - case SM4: - builder.setAlgorithm(EncryptionAlgorithmPB.SM4_128_CTR); - break; - default: - // do nothing - } - switch (encryptionKey.type) { - case DATA_KEY: - builder.setType(EncryptionKeyTypePB.DATA_KEY); - break; - case MASTER_KEY: - builder.setType(EncryptionKeyTypePB.MASTER_KEY); - break; - default: - // do nothing - } - builder.setCiphertextBase64(encryptionKey.ciphertext); - builder.setPlaintext(ByteString.copyFrom(encryptionKey.plaintext)); - builder.setCrc32(encryptionKey.crc); - builder.setCtime(encryptionKey.ctime); - builder.setMtime(encryptionKey.mtime); - EncryptionKeyPB keyPB = builder.build(); - - TEncryptionKey tk = new TEncryptionKey(); - tk.setKeyPb(keyPB.toByteArray()); - return tk; - } public TGetEncryptionKeysResult getEncryptionKeys(TGetEncryptionKeysRequest request) { String clientAddr = getClientAddrAsString(); @@ -1562,9 +1520,9 @@ public TGetEncryptionKeysResult getEncryptionKeys(TGetEncryptionKeysRequest requ } try { List tKeys = new ArrayList<>(); - List keys = Env.getCurrentEnv().getKeyManager().getAllMasterKeys(); + List keys = Env.getCurrentEnv().getKeyManager().getAllMasterKeys(); for (EncryptionKey key : keys) { - tKeys.add(encryptionKeyToThrift(key)); + tKeys.add(key.toThrift()); } result.setMasterKeys(tKeys); } catch (Exception e) { diff --git a/regression-test/framework/pom.xml b/regression-test/framework/pom.xml index 14556af1e1f330..60746483928518 100644 --- a/regression-test/framework/pom.xml +++ b/regression-test/framework/pom.xml @@ -169,6 +169,13 @@ under the License. pom import + + software.amazon.awssdk + bom + 2.29.26 + pom + import + @@ -419,6 +426,11 @@ under the License. aliyun-sdk-oss 3.18.1 - + + software.amazon.awssdk + kms + + + diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy index 9a9f9456da0c8c..bccd3b585c3c20 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy @@ -168,6 +168,14 @@ class Config { public String regressionAwsBucket public String regressionAwsPrefix + public String tdeAk + public String tdeSk + public String tdeKeyEndpoint + public String tdeKeyRegion + public String tdeKeyProvider + public String tdeAlgorithm + public String tdeKeyId + Config() {} Config( @@ -225,7 +233,14 @@ class Config { String stageIamUserId, String clusterDir, String kafkaBrokerList, - String cloudVersion) { + String cloudVersion, + String tdeAk, + String tdeSk, + String tdeKeyEndpoint, + String tdeKeyRegion, + String tdeKeyProvider, + String tdeAlgorithm, + String tdeKeyId) { this.s3Source = s3Source this.caseNamePrefix = caseNamePrefix this.validateBackupPrefix = validateBackupPrefix @@ -281,6 +296,13 @@ class Config { this.clusterDir = clusterDir this.kafkaBrokerList = kafkaBrokerList this.cloudVersion = cloudVersion + this.tdeAk = tdeAk + this.tdeSk = tdeSk + this.tdeKeyEndpoint = tdeKeyEndpoint + this.tdeKeyRegion = tdeKeyRegion + this.tdeKeyProvider = tdeKeyProvider + this.tdeAlgorithm = tdeAlgorithm + this.tdeKeyId = tdeKeyId } static String removeDirectoryPrefix(String str) { @@ -482,6 +504,21 @@ class Config { config.cloudVersion = cmd.getOptionValue(cloudVersionOpt, config.cloudVersion) log.info("cloudVersion is ${config.cloudVersion}".toString()) + config.tdeAk = cmd.getOptionValue(tdeAkOpt, config.tdeAk) + log.info("tdeAk is ${config.tdeAk}".toString()) + config.tdeSk = cmd.getOptionValue(tdeSkOpt, config.tdeSk) + log.info("tdeSk is ${config.tdeSk}".toString()) + config.tdeKeyEndpoint = cmd.getOptionValue(tdeKeyEndpointOpt, config.tdeKeyEndpoint) + log.info("tdeKeyEndpoint is ${config.tdeKeyEndpoint}".toString()) + config.tdeKeyRegion = cmd.getOptionValue(tdeKeyRegionOpt, config.tdeKeyRegion) + log.info("tdeKeyRegion is ${config.tdeKeyRegion}".toString()) + config.tdeKeyProvider = cmd.getOptionValue(tdeKeyProviderOpt, config.tdeKeyProvider) + log.info("tdeKeyProvider is ${config.tdeKeyProvider}".toString()) + config.tdeAlgorithm = cmd.getOptionValue(tdeAlgorithmOpt, config.tdeAlgorithm) + log.info("tdeAlgorithm is ${config.tdeAlgorithm}".toString()) + config.tdeKeyId = cmd.getOptionValue(tdeKeyIdOpt, config.tdeKeyId) + log.info("tdeKeyId is ${config.tdeKeyId}".toString()) + config.kafkaBrokerList = cmd.getOptionValue(kafkaBrokerListOpt, config.kafkaBrokerList) config.recycleServiceHttpAddress = cmd.getOptionValue(recycleServiceHttpAddressOpt, config.recycleServiceHttpAddress) @@ -610,7 +647,14 @@ class Config { configToString(obj.stageIamUserId), configToString(obj.clusterDir), configToString(obj.kafkaBrokerList), - configToString(obj.cloudVersion) + configToString(obj.cloudVersion), + configToString(obj.tdeAk), + configToString(obj.tdeSk), + configToString(obj.tdeKeyEndpoint), + configToString(obj.tdeKeyRegion), + configToString(obj.tdeKeyProvider), + configToString(obj.tdeAlgorithm), + configToString(obj.tdeKeyId) ) config.ccrDownstreamUrl = configToString(obj.ccrDownstreamUrl) diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/ConfigOptions.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/ConfigOptions.groovy index 1f51a91059170c..870cc2baf10bce 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/ConfigOptions.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/ConfigOptions.groovy @@ -99,6 +99,13 @@ class ConfigOptions { static Option clusterDirOpt static Option kafkaBrokerListOpt static Option cloudVersionOpt + static Option tdeAkOpt + static Option tdeSkOpt + static Option tdeKeyEndpointOpt + static Option tdeKeyRegionOpt + static Option tdeKeyProviderOpt + static Option tdeAlgorithmOpt + static Option tdeKeyIdOpt static CommandLine initCommands(String[] args) { helpOption = Option.builder("h") @@ -612,6 +619,41 @@ class ConfigOptions { .hasArg(false) .desc("selectdb cloud version") .build() + tdeAkOpt = Option.builder("tdeAk") + .required(false) + .hasArg(false) + .desc("TDE Access Key") + .build(); + tdeSkOpt = Option.builder("tdeSk") + .required(false) + .hasArg(false) + .desc("TDE Secret Key") + .build(); + tdeKeyEndpointOpt = Option.builder("tdeKeyEndpoint") + .required(false) + .hasArg(false) + .desc("TDE Key Endpoint") + .build(); + tdeKeyRegionOpt = Option.builder("tdeKeyRegion") + .required(false) + .hasArg(false) + .desc("TDE Key Region") + .build(); + tdeKeyProviderOpt = Option.builder("tdeKeyProvider") + .required(false) + .hasArg(false) + .desc("TDE Key Provider") + .build(); + tdeAlgorithmOpt = Option.builder("tdeAlgorithm") + .required(false) + .hasArg(false) + .desc("TDE Algorithm") + .build(); + tdeKeyIdOpt = Option.builder("tdeKeyId") + .required(false) + .hasArg(false) + .desc("TDE Key Id") + .build(); Options options = new Options() .addOption(helpOption) @@ -680,6 +722,13 @@ class ConfigOptions { .addOption(clusterDirOpt) .addOption(kafkaBrokerListOpt) .addOption(cloudVersionOpt) + .addOption(tdeAkOpt) + .addOption(tdeSkOpt) + .addOption(tdeKeyEndpointOpt) + .addOption(tdeKeyRegionOpt) + .addOption(tdeKeyProviderOpt) + .addOption(tdeAlgorithmOpt) + .addOption(tdeKeyIdOpt) CommandLine cmd = new DefaultParser().parse(options, args, true) if (cmd.hasOption(helpOption)) { diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy index d6d250266040d3..cb9c34b064a2a2 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy @@ -91,6 +91,9 @@ class ClusterOptions { // if not specific, docker will let each be contains 1 HDD disk. List beDisks = null + String tdeAk = ""; + String tdeSk = ""; + void enableDebugPoints() { feConfigs.add('enable_debug_points=true') beConfigs.add('enable_debug_points=true') @@ -367,6 +370,16 @@ class SuiteCluster { cmd += ['--be-cluster-id'] } + if (options.tdeAk != null && options.tdeAk != "") { + cmd += ['--tde-ak'] + cmd += options.tdeAk + } + + if (options.tdeSk != null && options.tdeSk != "") { + cmd += ['--tde-sk'] + cmd += options.tdeSk + } + cmd += ['--wait-timeout', String.valueOf(options.waitTimeout)] sqlModeNodeMgr = options.sqlModeNodeMgr diff --git a/run-be-ut.sh b/run-be-ut.sh index 1040dd75449771..bdc71b46b45d9c 100755 --- a/run-be-ut.sh +++ b/run-be-ut.sh @@ -138,6 +138,7 @@ echo "Get params: PARALLEL -- ${PARALLEL} CLEAN -- ${CLEAN} ENABLE_PCH -- ${ENABLE_PCH} + WITH_TDE_DIR -- ${WITH_TDE_DIR} " echo "Build Backend UT" @@ -259,6 +260,7 @@ cd "${CMAKE_BUILD_DIR}" -DENABLE_PCH="${ENABLE_PCH}" \ -DDORIS_JAVA_HOME="${JAVA_HOME}" \ -DBUILD_AZURE="${BUILD_AZURE}" \ + -DWITH_TDE_DIR="${WITH_TDE_DIR}" \ "${DORIS_HOME}/be" "${BUILD_SYSTEM}" -j "${PARALLEL}"