Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1564,6 +1564,8 @@ DEFINE_mBool(enable_auto_clone_on_mow_publish_missing_version, "false");
// The capacity of segment partial column cache, used to cache column readers for each segment.
DEFINE_mInt32(max_segment_partial_column_cache_size, "500");

DEFINE_mBool(enable_wal_tde, "false");

// clang-format off
#ifdef BE_TEST
// test s3
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1631,6 +1631,8 @@ DECLARE_mBool(enable_auto_clone_on_mow_publish_missing_version);
// The capacity of segment partial column cache, used to cache column readers for each segment.
DECLARE_mInt32(max_segment_partial_column_cache_size);

DECLARE_mBool(enable_wal_tde);

#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ Status SchemaEncryptionKeysScanner::_fill_block_impl(vectorized::Block* block) {

std::vector<StringRef> str_refs(row_num);
std::vector<int32_t> int_vals(row_num);
std::vector<int64_t> int64_vals(row_num);
std::vector<int8_t> bool_vals(row_num);
std::vector<void*> datas(row_num);
std::vector<std::string> column_values(row_num);
Expand Down Expand Up @@ -184,11 +185,15 @@ Status SchemaEncryptionKeysScanner::_fill_block_impl(vectorized::Block* block) {
? encryption_key.parent_version()
: 0;
break;
}
datas[row_idx] = &int_vals[row_idx];
} else if (col_desc.type == TYPE_BIGINT) {
switch (col_idx) {
case 8:
int_vals[row_idx] = encryption_key.has_crc32() ? encryption_key.crc32() : 0;
int64_vals[row_idx] = encryption_key.has_crc32() ? encryption_key.crc32() : 0;
break;
}
datas[row_idx] = &int_vals[row_idx];
datas[row_idx] = &int64_vals[row_idx];
} else if (col_desc.type == TYPE_DATETIMEV2) {
switch (col_idx) {
case 9:
Expand Down
174 changes: 174 additions & 0 deletions be/src/http/action/check_encryption_action.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "http/action/check_encryption_action.h"

#include <gen_cpp/olap_file.pb.h>

#include <exception>
#include <memory>
#include <shared_mutex>
#include <string>
#include <string_view>

#include "cloud/cloud_tablet.h"
#include "cloud/config.h"
#include "http/http_channel.h"
#include "http/http_headers.h"
#include "http/http_status.h"
#include "io/fs/file_reader.h"
#include "io/fs/file_system.h"
#include "io/fs/path.h"
#include "olap/rowset/rowset_fwd.h"
#include "olap/tablet_fwd.h"
#include "runtime/exec_env.h"

namespace doris {

const std::string TABLET_ID = "tablet_id";

CheckEncryptionAction::CheckEncryptionAction(ExecEnv* exec_env, TPrivilegeHier::type hier,
TPrivilegeType::type type)
: HttpHandlerWithAuth(exec_env, hier, type) {}

Result<bool> is_tablet_encrypted(const BaseTabletSPtr& tablet) {
auto tablet_meta = tablet->tablet_meta();
if (tablet_meta->encryption_algorithm() == EncryptionAlgorithmPB::PLAINTEXT) {
return false;
}
Status st;
bool is_encrypted = true;
tablet->traverse_rowsets([&st, &tablet, &is_encrypted](const RowsetSharedPtr& rs) {
if (!st) {
return;
}

auto rs_meta = rs->rowset_meta();
if (config::is_cloud_mode() && rs_meta->start_version() == 0 &&
rs_meta->end_version() == 1) {
return;
}
auto fs = rs_meta->physical_fs();
if (fs == nullptr) {
st = Status::InternalError("failed to get fs for rowset: tablet={}, rs={}",
tablet->tablet_id(), rs->rowset_id().to_string());
return;
}

if (rs->num_segments() == 0) {
return;
}
auto maybe_seg_path = rs->segment_path(0);
if (!maybe_seg_path) {
st = std::move(maybe_seg_path.error());
return;
}

std::vector<std::string_view> file_paths;
const auto& first_seg_path = maybe_seg_path.value();
file_paths.emplace_back(first_seg_path);
if (tablet->tablet_schema()->has_inverted_index() &&
tablet->tablet_schema()->get_inverted_index_storage_format() == V2) {
std::string inverted_index_file_path = InvertedIndexDescriptor::get_index_file_path_v2(
InvertedIndexDescriptor::get_index_file_path_prefix(first_seg_path));
file_paths.emplace_back(inverted_index_file_path);
}

for (const auto path : file_paths) {
io::FileReaderSPtr reader;
st = fs->open_file(path, &reader);
if (!st) {
return;
}
std::vector<uint8_t> magic_code_buf;
magic_code_buf.reserve(sizeof(uint64_t));
Slice magic_code(magic_code_buf.data(), sizeof(uint64_t));
size_t bytes_read;
st = reader->read_at(reader->size() - sizeof(uint64_t), magic_code, &bytes_read);
if (!st) {
return;
}

std::vector<uint8_t> answer = {'A', 'B', 'C', 'D', 'E', 'A', 'B', 'C'};
is_encrypted &= Slice::mem_equal(answer.data(), magic_code.data, magic_code.size);
if (!is_encrypted) {
LOG(INFO) << "found not encrypted segment, path=" << first_seg_path;
}
}
});

if (st) {
return is_encrypted;
}
return st;
}

Status sync_meta(const CloudTabletSPtr& tablet) {
RETURN_IF_ERROR(tablet->sync_meta());
RETURN_IF_ERROR(tablet->sync_rowsets());
return Status::OK();
}

void CheckEncryptionAction::handle(HttpRequest* req) {
req->add_output_header(HttpHeaders::CONTENT_TYPE, HttpHeaders::JSON_TYPE.data());
auto tablet_id_str = req->param(TABLET_ID);

if (tablet_id_str.empty()) {
HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST,
"tablet id should be set in request params");
return;
}
int64_t tablet_id = -1;
try {
tablet_id = std::stoll(tablet_id_str);
} catch (const std::exception& e) {
LOG(WARNING) << "convert tablet id to i64 failed:" << e.what();
auto msg = fmt::format("invalid argument: tablet_id={}", tablet_id_str);

HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, msg);
return;
}

auto maybe_tablet = ExecEnv::get_tablet(tablet_id);
if (!maybe_tablet) {
HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, maybe_tablet.error().to_string());
return;
}
auto tablet = maybe_tablet.value();

if (config::is_cloud_mode()) {
auto cloud_tablet = std::dynamic_pointer_cast<CloudTablet>(tablet);
DCHECK_NE(cloud_tablet, nullptr);
auto st = sync_meta(cloud_tablet);
if (!st) {
HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, st.to_json());
return;
}
}

auto maybe_is_encrypted = is_tablet_encrypted(tablet);
if (maybe_is_encrypted.has_value()) {
HttpChannel::send_reply(
req, HttpStatus::OK,
maybe_is_encrypted.value() ? "all encrypted" : "some are not encrypted");
return;
}
HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR,
maybe_is_encrypted.error().to_json());
}

} // namespace doris
35 changes: 35 additions & 0 deletions be/src/http/action/check_encryption_action.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <gen_cpp/FrontendService_types.h>

#include "cloud/cloud_tablet_mgr.h"
#include "http/http_handler_with_auth.h"
#include "http/http_request.h"
#include "olap/tablet_manager.h"
#include "runtime/exec_env.h"
namespace doris {

class CheckEncryptionAction : public HttpHandlerWithAuth {
public:
explicit CheckEncryptionAction(ExecEnv* exec_env, TPrivilegeHier::type hier,
TPrivilegeType::type type);

void handle(HttpRequest* req) override;
};

} // namespace doris
2 changes: 0 additions & 2 deletions be/src/io/fs/encrypted_fs_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

#include <gen_cpp/olap_file.pb.h>

#include <memory>

#include "io/fs/file_system.h"
namespace doris::io {

Expand Down
2 changes: 0 additions & 2 deletions be/src/io/fs/file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,9 @@
#include <memory>

#include "common/status.h"
#include "gutil/macros.h"
#include "io/cache/block_file_cache.h"
#include "io/cache/block_file_cache_factory.h"
#include "io/cache/file_cache_common.h"
#include "io/fs/file_reader.h"
#include "io/fs/file_reader_writer_fwd.h"
#include "io/fs/path.h"
#include "util/slice.h"
Expand Down
28 changes: 15 additions & 13 deletions be/src/olap/rowset/rowset_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,20 +97,22 @@ bool RowsetMeta::json_rowset_meta(std::string* json_rowset_meta) {
return ret;
}

io::FileSystemSPtr RowsetMeta::fs() {
auto fs = [this]() -> io::FileSystemSPtr {
if (is_local()) {
return io::global_local_filesystem();
}
io::FileSystemSPtr RowsetMeta::physical_fs() {
if (is_local()) {
return io::global_local_filesystem();
}

auto storage_resource = remote_storage_resource();
if (storage_resource) {
return storage_resource.value()->fs;
} else {
LOG(WARNING) << storage_resource.error();
return nullptr;
}
}();
auto storage_resource = remote_storage_resource();
if (storage_resource) {
return storage_resource.value()->fs;
} else {
LOG(WARNING) << storage_resource.error();
return nullptr;
}
}

io::FileSystemSPtr RowsetMeta::fs() {
auto fs = physical_fs();

#ifndef BE_TEST
auto algorithm = _determine_encryption_once.call([this]() -> Result<EncryptionAlgorithmPB> {
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/rowset/rowset_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ class RowsetMeta : public MetadataAdder<RowsetMeta> {
// Note that if the resource id cannot be found for the corresponding remote file system, nullptr will be returned.
io::FileSystemSPtr fs();

io::FileSystemSPtr physical_fs();

Result<const StorageResource*> remote_storage_resource();

void set_remote_storage_resource(StorageResource resource);
Expand Down
23 changes: 13 additions & 10 deletions be/src/olap/rowset/segment_v2/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,25 +105,28 @@ Status Segment::_open(io::FileSystemSPtr fs, const std::string& path, uint32_t s
const io::FileReaderOptions& reader_options, std::shared_ptr<Segment>* output,
InvertedIndexFileInfo idx_file_info, OlapReaderStatistics* stats) {
io::FileReaderSPtr file_reader;
RETURN_IF_ERROR(fs->open_file(path, &file_reader, &reader_options));
auto st = fs->open_file(path, &file_reader, &reader_options);
TEST_INJECTION_POINT_CALLBACK("Segment::open:corruption", &st);
std::shared_ptr<Segment> segment(
new Segment(segment_id, rowset_id, std::move(tablet_schema), idx_file_info));
segment->_fs = fs;
segment->_file_reader = std::move(file_reader);
auto st = segment->_open(stats);
TEST_INJECTION_POINT_CALLBACK("Segment::open:corruption", &st);
if (st.is<ErrorCode::CORRUPTION>() &&
reader_options.cache_type == io::FileCachePolicy::FILE_BLOCK_CACHE) {
if (st) {
segment->_fs = fs;
segment->_file_reader = std::move(file_reader);
st = segment->_open(stats);
} else if (st.is<ErrorCode::CORRUPTION>() &&
reader_options.cache_type == io::FileCachePolicy::FILE_BLOCK_CACHE) {
LOG(WARNING) << "bad segment file may be read from file cache, try to read remote source "
"file directly, file path: "
<< path << " cache_key: " << file_cache_key_str(path);
auto file_key = file_cache_key_from_path(path);
auto* file_cache = io::FileCacheFactory::instance()->get_by_path(file_key);
file_cache->remove_if_cached(file_key);

RETURN_IF_ERROR(fs->open_file(path, &file_reader, &reader_options));
segment->_file_reader = std::move(file_reader);
st = segment->_open(stats);
st = fs->open_file(path, &file_reader, &reader_options);
if (st) {
segment->_file_reader = std::move(file_reader);
st = segment->_open(stats);
}
TEST_INJECTION_POINT_CALLBACK("Segment::open:corruption1", &st);
if (st.is<ErrorCode::CORRUPTION>()) { // corrupt again
LOG(WARNING) << "failed to try to read remote source file again with cache support,"
Expand Down
5 changes: 5 additions & 0 deletions be/src/olap/wal/wal_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ WalWriter::WalWriter(const std::string& file_name) : _file_name(file_name) {}
WalWriter::~WalWriter() {}

Status determine_wal_fs(int64_t db_id, int64_t tb_id, io::FileSystemSPtr& fs) {
if (!config::enable_wal_tde) {
fs = io::global_local_filesystem();
return Status::OK();
}

#ifndef BE_TEST
TNetworkAddress master_addr = ExecEnv::GetInstance()->cluster_info()->master_fe_addr;
TGetTableTDEInfoRequest req;
Expand Down
Loading
Loading