Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 46 additions & 1 deletion cloud/src/recycler/hdfs_accessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,19 @@ std::string HdfsAccessor::to_uri(const std::string& relative_path) {
return uri_ + '/' + relative_path;
}

// extract parent path from prefix
// e.g.
// data/492211/02000000008a012957476a3e174dfdaa71ee5f80a3abafa3_ -> data/492211/
std::string extract_parent_path(const std::string& path) {
// Find the last '/'
size_t last_slash = path.find_last_of('/');
if (last_slash == std::string::npos) {
LOG_WARNING("no '/' found in path").tag("path", path);
return "";
}
return path.substr(0, last_slash + 1);
}

int HdfsAccessor::init() {
// TODO(plat1ko): Cache hdfsFS
fs_ = HDFSBuilder::create_fs(info_.build_conf());
Expand All @@ -356,8 +369,35 @@ int HdfsAccessor::init() {
int HdfsAccessor::delete_prefix(const std::string& path_prefix, int64_t expiration_time) {
auto uri = to_uri(path_prefix);
LOG(INFO) << "delete prefix, uri=" << uri;
// If path prefix exists, assume it is a dir or a file.
if (exists(path_prefix) == 0) {
// If it exists, then it is a dir or a file.
// delete_directory func can delete a dir or a file.
if (delete_directory(path_prefix) == 0) {
LOG(INFO) << "delete prefix succ"
<< ", is dir or file = true"
<< ", uri=" << uri;
return 0;
}
// delete failed, return err
LOG_WARNING("delete prefix failed, this is a dir or a file")
.tag("path prefix", path_prefix);
return -1;
}
// If path prefix is not a dir or a file,
// for example: data/492211/02000000008a012957476a3e174dfdaa71ee5f80a3abafa3_.
// Then we need to extract the parent id path from the given prefix,
// traverse all files in the parent id path, and delete the files that match the prefix.
std::unique_ptr<ListIterator> list_iter;
int ret = list_all(&list_iter);
auto parent_path = extract_parent_path(path_prefix);
if (parent_path.empty()) {
LOG_WARNING("extract parent path failed").tag("path prefix", path_prefix);
return -1;
}
LOG_INFO("path prefix is not a dir, extract parent path success")
.tag("path prefix", path_prefix)
.tag("parent path", parent_path);
int ret = list_directory(parent_path, &list_iter);
if (ret != 0) {
LOG(WARNING) << "delete prefix, failed to list" << uri;
return ret;
Expand All @@ -372,6 +412,10 @@ int HdfsAccessor::delete_prefix(const std::string& path_prefix, int64_t expirati
}
++num_deleted;
}
if (num_deleted == 0) {
LOG_WARNING("recycler delete prefix num = 0, maybe there are some problems?")
.tag("path prefix", path_prefix);
}
LOG(INFO) << "delete prefix " << (ret != 0 ? "failed" : "succ") << " ret=" << ret
<< " uri=" << uri << " num_listed=" << num_listed << " num_deleted=" << num_deleted;
return ret;
Expand All @@ -382,6 +426,7 @@ int HdfsAccessor::delete_directory_impl(const std::string& dir_path) {
// `hdfsDelete`'s return value or errno to avoid exist rpc?
int ret = exists(dir_path);
if (ret == 1) {
// dir does not exist
return 0;
} else if (ret < 0) {
return ret;
Expand Down
200 changes: 147 additions & 53 deletions cloud/test/hdfs_accessor_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <gtest/gtest.h>

#include <iostream>
#include <vector>

#include "common/config.h"
#include "common/logging.h"
Expand Down Expand Up @@ -59,20 +60,20 @@ TEST(HdfsAccessorTest, normal) {

HdfsAccessor accessor(info);
int ret = accessor.init();
ASSERT_EQ(ret, 0);
EXPECT_EQ(ret, 0);

std::string file1 = "data/10000/1_0.dat";

ret = accessor.delete_directory("");
ASSERT_NE(ret, 0);
EXPECT_NE(ret, 0);
ret = accessor.delete_all();
ASSERT_EQ(ret, 0);
EXPECT_EQ(ret, 0);

ret = accessor.put_file(file1, "");
ASSERT_EQ(ret, 0);
EXPECT_EQ(ret, 0);

ret = accessor.exists(file1);
ASSERT_EQ(ret, 0);
EXPECT_EQ(ret, 0);

auto* sp = SyncPoint::get_instance();
sp->enable_processing();
Expand All @@ -90,46 +91,46 @@ TEST(HdfsAccessorTest, normal) {

std::unique_ptr<ListIterator> iter;
ret = accessor.list_directory("data", &iter);
ASSERT_EQ(ret, 0);
ASSERT_TRUE(iter);
ASSERT_TRUE(iter->is_valid());
ASSERT_TRUE(iter->has_next());
ASSERT_EQ(iter->next()->path, file1);
ASSERT_FALSE(iter->has_next());
EXPECT_EQ(ret, 0);
EXPECT_TRUE(iter);
EXPECT_TRUE(iter->is_valid());
EXPECT_TRUE(iter->has_next());
EXPECT_EQ(iter->next()->path, file1);
EXPECT_FALSE(iter->has_next());
iter.reset();
ASSERT_EQ(alloc_entries, 0);
EXPECT_EQ(alloc_entries, 0);

ret = accessor.list_directory("data/", &iter);
ASSERT_EQ(ret, 0);
ASSERT_TRUE(iter->is_valid());
ASSERT_TRUE(iter->has_next());
ASSERT_EQ(iter->next()->path, file1);
ASSERT_FALSE(iter->has_next());
ASSERT_FALSE(iter->next());
EXPECT_EQ(ret, 0);
EXPECT_TRUE(iter->is_valid());
EXPECT_TRUE(iter->has_next());
EXPECT_EQ(iter->next()->path, file1);
EXPECT_FALSE(iter->has_next());
EXPECT_FALSE(iter->next());
iter.reset();
ASSERT_EQ(alloc_entries, 0);
EXPECT_EQ(alloc_entries, 0);

ret = accessor.list_directory("data/100", &iter);
ASSERT_EQ(ret, 0);
ASSERT_FALSE(iter->has_next());
ASSERT_FALSE(iter->next());
EXPECT_EQ(ret, 0);
EXPECT_FALSE(iter->has_next());
EXPECT_FALSE(iter->next());
iter.reset();
ASSERT_EQ(alloc_entries, 0);
EXPECT_EQ(alloc_entries, 0);

ret = accessor.delete_file(file1);
ASSERT_EQ(ret, 0);
EXPECT_EQ(ret, 0);
ret = accessor.exists(file1);
ASSERT_EQ(ret, 1);
EXPECT_EQ(ret, 1);
ret = accessor.list_directory("", &iter);
ASSERT_NE(ret, 0);
EXPECT_NE(ret, 0);
ret = accessor.list_all(&iter);
ASSERT_EQ(ret, 0);
ASSERT_FALSE(iter->has_next());
ASSERT_FALSE(iter->next());
EXPECT_EQ(ret, 0);
EXPECT_FALSE(iter->has_next());
EXPECT_FALSE(iter->next());
iter.reset();
ASSERT_EQ(alloc_entries, 0);
EXPECT_EQ(alloc_entries, 0);
ret = accessor.delete_file(file1);
ASSERT_EQ(ret, 0);
EXPECT_EQ(ret, 0);

std::vector<std::string> files;
for (int dir = 10000; dir < 10005; ++dir) {
Expand All @@ -140,18 +141,18 @@ TEST(HdfsAccessorTest, normal) {

for (auto&& file : files) {
ret = accessor.put_file(file, "");
ASSERT_EQ(ret, 0);
EXPECT_EQ(ret, 0);
}

std::unordered_set<std::string> list_files;
ret = accessor.list_all(&iter);
ASSERT_EQ(ret, 0);
EXPECT_EQ(ret, 0);
for (auto file = iter->next(); file.has_value(); file = iter->next()) {
list_files.insert(std::move(file->path));
}
iter.reset();
ASSERT_EQ(alloc_entries, 0);
ASSERT_EQ(list_files.size(), files.size());
EXPECT_EQ(alloc_entries, 0);
EXPECT_EQ(list_files.size(), files.size());
for (auto&& file : files) {
EXPECT_TRUE(list_files.contains(file));
}
Expand All @@ -163,69 +164,162 @@ TEST(HdfsAccessorTest, normal) {
files.pop_back();
}
ret = accessor.delete_files(to_delete_files);
ASSERT_EQ(ret, 0);
EXPECT_EQ(ret, 0);

ret = accessor.list_all(&iter);
ASSERT_EQ(ret, 0);
EXPECT_EQ(ret, 0);
list_files.clear();
for (auto file = iter->next(); file.has_value(); file = iter->next()) {
list_files.insert(std::move(file->path));
}
iter.reset();
ASSERT_EQ(alloc_entries, 0);
ASSERT_EQ(list_files.size(), files.size());
EXPECT_EQ(alloc_entries, 0);
EXPECT_EQ(list_files.size(), files.size());
for (auto&& file : files) {
EXPECT_TRUE(list_files.contains(file));
}

std::string to_delete_dir = "data/10001";
ret = accessor.delete_directory(to_delete_dir);
ASSERT_EQ(ret, 0);
EXPECT_EQ(ret, 0);
ret = accessor.list_directory(to_delete_dir, &iter);
ASSERT_EQ(ret, 0);
ASSERT_FALSE(iter->has_next());
EXPECT_EQ(ret, 0);
EXPECT_FALSE(iter->has_next());

files.erase(std::remove_if(files.begin(), files.end(),
[&](auto&& file) { return file.starts_with(to_delete_dir); }),
files.end());
ret = accessor.list_all(&iter);
ASSERT_EQ(ret, 0);
EXPECT_EQ(ret, 0);
list_files.clear();
for (auto file = iter->next(); file.has_value(); file = iter->next()) {
list_files.insert(std::move(file->path));
}
iter.reset();
ASSERT_EQ(alloc_entries, 0);
ASSERT_EQ(list_files.size(), files.size());
EXPECT_EQ(alloc_entries, 0);
EXPECT_EQ(list_files.size(), files.size());
for (auto&& file : files) {
EXPECT_TRUE(list_files.contains(file));
}

std::string to_delete_prefix = "data/10003/";
ret = accessor.delete_directory(to_delete_prefix);
ASSERT_EQ(ret, 0);
EXPECT_EQ(ret, 0);
files.erase(std::remove_if(files.begin(), files.end(),
[&](auto&& file) { return file.starts_with(to_delete_prefix); }),
files.end());
ret = accessor.list_all(&iter);
ASSERT_EQ(ret, 0);
EXPECT_EQ(ret, 0);
list_files.clear();
for (auto file = iter->next(); file.has_value(); file = iter->next()) {
list_files.insert(std::move(file->path));
}
iter.reset();
ASSERT_EQ(alloc_entries, 0);
ASSERT_EQ(list_files.size(), files.size());
EXPECT_EQ(alloc_entries, 0);
EXPECT_EQ(list_files.size(), files.size());
for (auto&& file : files) {
EXPECT_TRUE(list_files.contains(file));
}

ret = accessor.delete_all();
ASSERT_EQ(ret, 0);
EXPECT_EQ(ret, 0);
ret = accessor.list_all(&iter);
ASSERT_EQ(ret, 0);
ASSERT_FALSE(iter->has_next());
ASSERT_FALSE(iter->next());
EXPECT_EQ(ret, 0);
EXPECT_FALSE(iter->has_next());
EXPECT_FALSE(iter->next());
}

TEST(HdfsAccessorTest, delete_prefix) {
HdfsVaultInfo info;
info.set_prefix(config::test_hdfs_prefix + "/HdfsAccessorTest/" + butil::GenerateGUID());
auto* conf = info.mutable_build_conf();
conf->set_fs_name(config::test_hdfs_fs_name);

HdfsAccessor accessor(info);
int ret = accessor.init();
EXPECT_EQ(ret, 0);

auto put_and_verify = [&accessor](const std::string& file) {
int ret = accessor.put_file(file, "");
EXPECT_EQ(ret, 0);
ret = accessor.exists(file);
EXPECT_EQ(ret, 0);
};

ret = accessor.delete_directory("");
EXPECT_NE(ret, 0);
ret = accessor.delete_all();
EXPECT_EQ(ret, 0);

put_and_verify("data/10000/1_0.dat");
put_and_verify("data/10000/2_0.dat");
put_and_verify("data/10000/20000/1_0.dat");
put_and_verify("data/10000/20000/30000/1_0.dat");
put_and_verify("data/20000/1_0.dat");
put_and_verify("data111/10000/1_0.dat");

ret = accessor.delete_prefix("data/10000/1_");
EXPECT_EQ(ret, 0);
ret = accessor.delete_prefix("data/10000/2_");
EXPECT_EQ(ret, 0);

std::unordered_set<std::string> list_files;
std::unique_ptr<ListIterator> iter;
ret = accessor.list_all(&iter);
EXPECT_EQ(ret, 0);
list_files.clear();
for (auto file = iter->next(); file.has_value(); file = iter->next()) {
list_files.insert(std::move(file->path));
}
EXPECT_EQ(list_files.size(), 4);
EXPECT_TRUE(list_files.contains("data/10000/20000/1_0.dat"));
EXPECT_TRUE(list_files.contains("data/10000/20000/30000/1_0.dat"));
EXPECT_TRUE(list_files.contains("data/20000/1_0.dat"));
EXPECT_TRUE(list_files.contains("data111/10000/1_0.dat"));

ret = accessor.delete_prefix("data/10000/1_");
EXPECT_EQ(ret, 0);
ret = accessor.delete_prefix("data/10000/2_");
EXPECT_EQ(ret, 0);
ret = accessor.delete_prefix("data/20000/1_");
EXPECT_EQ(ret, 0);

iter.reset();
ret = accessor.list_all(&iter);
EXPECT_EQ(ret, 0);
list_files.clear();
for (auto file = iter->next(); file.has_value(); file = iter->next()) {
list_files.insert(std::move(file->path));
}
EXPECT_EQ(list_files.size(), 3);
EXPECT_TRUE(list_files.contains("data/10000/20000/30000/1_0.dat"));
EXPECT_TRUE(list_files.contains("data/10000/20000/1_0.dat"));
EXPECT_TRUE(list_files.contains("data111/10000/1_0.dat"));

ret = accessor.delete_prefix("data/10000/20000");
EXPECT_EQ(ret, 0);

iter.reset();
ret = accessor.list_all(&iter);
EXPECT_EQ(ret, 0);
list_files.clear();
for (auto file = iter->next(); file.has_value(); file = iter->next()) {
list_files.insert(std::move(file->path));
}
EXPECT_EQ(list_files.size(), 1);
EXPECT_TRUE(list_files.contains("data111/10000/1_0.dat"));

ret = accessor.delete_prefix("data111/10000/1_0.dat");
EXPECT_EQ(ret, 0);

iter.reset();
ret = accessor.list_all(&iter);
EXPECT_EQ(ret, 0);
list_files.clear();
for (auto file = iter->next(); file.has_value(); file = iter->next()) {
list_files.insert(std::move(file->path));
}
EXPECT_EQ(list_files.size(), 0);
}

} // namespace doris::cloud
Loading