diff --git a/cloud/src/recycler/hdfs_accessor.cpp b/cloud/src/recycler/hdfs_accessor.cpp index 97a4670d2bfc6b..1999bcfa16543a 100644 --- a/cloud/src/recycler/hdfs_accessor.cpp +++ b/cloud/src/recycler/hdfs_accessor.cpp @@ -342,6 +342,19 @@ std::string HdfsAccessor::to_uri(const std::string& relative_path) { return uri_ + '/' + relative_path; } +// extract parent path from prefix +// e.g. +// data/492211/02000000008a012957476a3e174dfdaa71ee5f80a3abafa3_ -> data/492211/ +std::string extract_parent_path(const std::string& path) { + // Find the last '/' + size_t last_slash = path.find_last_of('/'); + if (last_slash == std::string::npos) { + LOG_WARNING("no '/' found in path").tag("path", path); + return ""; + } + return path.substr(0, last_slash + 1); +} + int HdfsAccessor::init() { // TODO(plat1ko): Cache hdfsFS fs_ = HDFSBuilder::create_fs(info_.build_conf()); @@ -356,8 +369,35 @@ int HdfsAccessor::init() { int HdfsAccessor::delete_prefix(const std::string& path_prefix, int64_t expiration_time) { auto uri = to_uri(path_prefix); LOG(INFO) << "delete prefix, uri=" << uri; + // If path prefix exists, assume it is a dir or a file. + if (exists(path_prefix) == 0) { + // If it exists, then it is a dir or a file. + // delete_directory func can delete a dir or a file. + if (delete_directory(path_prefix) == 0) { + LOG(INFO) << "delete prefix succ" + << ", is dir or file = true" + << ", uri=" << uri; + return 0; + } + // delete failed, return err + LOG_WARNING("delete prefix failed, this is a dir or a file") + .tag("path prefix", path_prefix); + return -1; + } + // If path prefix is not a dir or a file, + // for example: data/492211/02000000008a012957476a3e174dfdaa71ee5f80a3abafa3_. + // Then we need to extract the parent id path from the given prefix, + // traverse all files in the parent id path, and delete the files that match the prefix. std::unique_ptr list_iter; - int ret = list_all(&list_iter); + auto parent_path = extract_parent_path(path_prefix); + if (parent_path.empty()) { + LOG_WARNING("extract parent path failed").tag("path prefix", path_prefix); + return -1; + } + LOG_INFO("path prefix is not a dir, extract parent path success") + .tag("path prefix", path_prefix) + .tag("parent path", parent_path); + int ret = list_directory(parent_path, &list_iter); if (ret != 0) { LOG(WARNING) << "delete prefix, failed to list" << uri; return ret; @@ -372,6 +412,10 @@ int HdfsAccessor::delete_prefix(const std::string& path_prefix, int64_t expirati } ++num_deleted; } + if (num_deleted == 0) { + LOG_WARNING("recycler delete prefix num = 0, maybe there are some problems?") + .tag("path prefix", path_prefix); + } LOG(INFO) << "delete prefix " << (ret != 0 ? "failed" : "succ") << " ret=" << ret << " uri=" << uri << " num_listed=" << num_listed << " num_deleted=" << num_deleted; return ret; @@ -382,6 +426,7 @@ int HdfsAccessor::delete_directory_impl(const std::string& dir_path) { // `hdfsDelete`'s return value or errno to avoid exist rpc? int ret = exists(dir_path); if (ret == 1) { + // dir does not exist return 0; } else if (ret < 0) { return ret; diff --git a/cloud/test/hdfs_accessor_test.cpp b/cloud/test/hdfs_accessor_test.cpp index cabd07f8c9a051..11c0af3853b2cd 100644 --- a/cloud/test/hdfs_accessor_test.cpp +++ b/cloud/test/hdfs_accessor_test.cpp @@ -22,6 +22,7 @@ #include #include +#include #include "common/config.h" #include "common/logging.h" @@ -59,20 +60,20 @@ TEST(HdfsAccessorTest, normal) { HdfsAccessor accessor(info); int ret = accessor.init(); - ASSERT_EQ(ret, 0); + EXPECT_EQ(ret, 0); std::string file1 = "data/10000/1_0.dat"; ret = accessor.delete_directory(""); - ASSERT_NE(ret, 0); + EXPECT_NE(ret, 0); ret = accessor.delete_all(); - ASSERT_EQ(ret, 0); + EXPECT_EQ(ret, 0); ret = accessor.put_file(file1, ""); - ASSERT_EQ(ret, 0); + EXPECT_EQ(ret, 0); ret = accessor.exists(file1); - ASSERT_EQ(ret, 0); + EXPECT_EQ(ret, 0); auto* sp = SyncPoint::get_instance(); sp->enable_processing(); @@ -90,46 +91,46 @@ TEST(HdfsAccessorTest, normal) { std::unique_ptr iter; ret = accessor.list_directory("data", &iter); - ASSERT_EQ(ret, 0); - ASSERT_TRUE(iter); - ASSERT_TRUE(iter->is_valid()); - ASSERT_TRUE(iter->has_next()); - ASSERT_EQ(iter->next()->path, file1); - ASSERT_FALSE(iter->has_next()); + EXPECT_EQ(ret, 0); + EXPECT_TRUE(iter); + EXPECT_TRUE(iter->is_valid()); + EXPECT_TRUE(iter->has_next()); + EXPECT_EQ(iter->next()->path, file1); + EXPECT_FALSE(iter->has_next()); iter.reset(); - ASSERT_EQ(alloc_entries, 0); + EXPECT_EQ(alloc_entries, 0); ret = accessor.list_directory("data/", &iter); - ASSERT_EQ(ret, 0); - ASSERT_TRUE(iter->is_valid()); - ASSERT_TRUE(iter->has_next()); - ASSERT_EQ(iter->next()->path, file1); - ASSERT_FALSE(iter->has_next()); - ASSERT_FALSE(iter->next()); + EXPECT_EQ(ret, 0); + EXPECT_TRUE(iter->is_valid()); + EXPECT_TRUE(iter->has_next()); + EXPECT_EQ(iter->next()->path, file1); + EXPECT_FALSE(iter->has_next()); + EXPECT_FALSE(iter->next()); iter.reset(); - ASSERT_EQ(alloc_entries, 0); + EXPECT_EQ(alloc_entries, 0); ret = accessor.list_directory("data/100", &iter); - ASSERT_EQ(ret, 0); - ASSERT_FALSE(iter->has_next()); - ASSERT_FALSE(iter->next()); + EXPECT_EQ(ret, 0); + EXPECT_FALSE(iter->has_next()); + EXPECT_FALSE(iter->next()); iter.reset(); - ASSERT_EQ(alloc_entries, 0); + EXPECT_EQ(alloc_entries, 0); ret = accessor.delete_file(file1); - ASSERT_EQ(ret, 0); + EXPECT_EQ(ret, 0); ret = accessor.exists(file1); - ASSERT_EQ(ret, 1); + EXPECT_EQ(ret, 1); ret = accessor.list_directory("", &iter); - ASSERT_NE(ret, 0); + EXPECT_NE(ret, 0); ret = accessor.list_all(&iter); - ASSERT_EQ(ret, 0); - ASSERT_FALSE(iter->has_next()); - ASSERT_FALSE(iter->next()); + EXPECT_EQ(ret, 0); + EXPECT_FALSE(iter->has_next()); + EXPECT_FALSE(iter->next()); iter.reset(); - ASSERT_EQ(alloc_entries, 0); + EXPECT_EQ(alloc_entries, 0); ret = accessor.delete_file(file1); - ASSERT_EQ(ret, 0); + EXPECT_EQ(ret, 0); std::vector files; for (int dir = 10000; dir < 10005; ++dir) { @@ -140,18 +141,18 @@ TEST(HdfsAccessorTest, normal) { for (auto&& file : files) { ret = accessor.put_file(file, ""); - ASSERT_EQ(ret, 0); + EXPECT_EQ(ret, 0); } std::unordered_set list_files; ret = accessor.list_all(&iter); - ASSERT_EQ(ret, 0); + EXPECT_EQ(ret, 0); for (auto file = iter->next(); file.has_value(); file = iter->next()) { list_files.insert(std::move(file->path)); } iter.reset(); - ASSERT_EQ(alloc_entries, 0); - ASSERT_EQ(list_files.size(), files.size()); + EXPECT_EQ(alloc_entries, 0); + EXPECT_EQ(list_files.size(), files.size()); for (auto&& file : files) { EXPECT_TRUE(list_files.contains(file)); } @@ -163,69 +164,162 @@ TEST(HdfsAccessorTest, normal) { files.pop_back(); } ret = accessor.delete_files(to_delete_files); - ASSERT_EQ(ret, 0); + EXPECT_EQ(ret, 0); ret = accessor.list_all(&iter); - ASSERT_EQ(ret, 0); + EXPECT_EQ(ret, 0); list_files.clear(); for (auto file = iter->next(); file.has_value(); file = iter->next()) { list_files.insert(std::move(file->path)); } iter.reset(); - ASSERT_EQ(alloc_entries, 0); - ASSERT_EQ(list_files.size(), files.size()); + EXPECT_EQ(alloc_entries, 0); + EXPECT_EQ(list_files.size(), files.size()); for (auto&& file : files) { EXPECT_TRUE(list_files.contains(file)); } std::string to_delete_dir = "data/10001"; ret = accessor.delete_directory(to_delete_dir); - ASSERT_EQ(ret, 0); + EXPECT_EQ(ret, 0); ret = accessor.list_directory(to_delete_dir, &iter); - ASSERT_EQ(ret, 0); - ASSERT_FALSE(iter->has_next()); + EXPECT_EQ(ret, 0); + EXPECT_FALSE(iter->has_next()); files.erase(std::remove_if(files.begin(), files.end(), [&](auto&& file) { return file.starts_with(to_delete_dir); }), files.end()); ret = accessor.list_all(&iter); - ASSERT_EQ(ret, 0); + EXPECT_EQ(ret, 0); list_files.clear(); for (auto file = iter->next(); file.has_value(); file = iter->next()) { list_files.insert(std::move(file->path)); } iter.reset(); - ASSERT_EQ(alloc_entries, 0); - ASSERT_EQ(list_files.size(), files.size()); + EXPECT_EQ(alloc_entries, 0); + EXPECT_EQ(list_files.size(), files.size()); for (auto&& file : files) { EXPECT_TRUE(list_files.contains(file)); } std::string to_delete_prefix = "data/10003/"; ret = accessor.delete_directory(to_delete_prefix); - ASSERT_EQ(ret, 0); + EXPECT_EQ(ret, 0); files.erase(std::remove_if(files.begin(), files.end(), [&](auto&& file) { return file.starts_with(to_delete_prefix); }), files.end()); ret = accessor.list_all(&iter); - ASSERT_EQ(ret, 0); + EXPECT_EQ(ret, 0); list_files.clear(); for (auto file = iter->next(); file.has_value(); file = iter->next()) { list_files.insert(std::move(file->path)); } iter.reset(); - ASSERT_EQ(alloc_entries, 0); - ASSERT_EQ(list_files.size(), files.size()); + EXPECT_EQ(alloc_entries, 0); + EXPECT_EQ(list_files.size(), files.size()); for (auto&& file : files) { EXPECT_TRUE(list_files.contains(file)); } ret = accessor.delete_all(); - ASSERT_EQ(ret, 0); + EXPECT_EQ(ret, 0); ret = accessor.list_all(&iter); - ASSERT_EQ(ret, 0); - ASSERT_FALSE(iter->has_next()); - ASSERT_FALSE(iter->next()); + EXPECT_EQ(ret, 0); + EXPECT_FALSE(iter->has_next()); + EXPECT_FALSE(iter->next()); +} + +TEST(HdfsAccessorTest, delete_prefix) { + HdfsVaultInfo info; + info.set_prefix(config::test_hdfs_prefix + "/HdfsAccessorTest/" + butil::GenerateGUID()); + auto* conf = info.mutable_build_conf(); + conf->set_fs_name(config::test_hdfs_fs_name); + + HdfsAccessor accessor(info); + int ret = accessor.init(); + EXPECT_EQ(ret, 0); + + auto put_and_verify = [&accessor](const std::string& file) { + int ret = accessor.put_file(file, ""); + EXPECT_EQ(ret, 0); + ret = accessor.exists(file); + EXPECT_EQ(ret, 0); + }; + + ret = accessor.delete_directory(""); + EXPECT_NE(ret, 0); + ret = accessor.delete_all(); + EXPECT_EQ(ret, 0); + + put_and_verify("data/10000/1_0.dat"); + put_and_verify("data/10000/2_0.dat"); + put_and_verify("data/10000/20000/1_0.dat"); + put_and_verify("data/10000/20000/30000/1_0.dat"); + put_and_verify("data/20000/1_0.dat"); + put_and_verify("data111/10000/1_0.dat"); + + ret = accessor.delete_prefix("data/10000/1_"); + EXPECT_EQ(ret, 0); + ret = accessor.delete_prefix("data/10000/2_"); + EXPECT_EQ(ret, 0); + + std::unordered_set list_files; + std::unique_ptr iter; + ret = accessor.list_all(&iter); + EXPECT_EQ(ret, 0); + list_files.clear(); + for (auto file = iter->next(); file.has_value(); file = iter->next()) { + list_files.insert(std::move(file->path)); + } + EXPECT_EQ(list_files.size(), 4); + EXPECT_TRUE(list_files.contains("data/10000/20000/1_0.dat")); + EXPECT_TRUE(list_files.contains("data/10000/20000/30000/1_0.dat")); + EXPECT_TRUE(list_files.contains("data/20000/1_0.dat")); + EXPECT_TRUE(list_files.contains("data111/10000/1_0.dat")); + + ret = accessor.delete_prefix("data/10000/1_"); + EXPECT_EQ(ret, 0); + ret = accessor.delete_prefix("data/10000/2_"); + EXPECT_EQ(ret, 0); + ret = accessor.delete_prefix("data/20000/1_"); + EXPECT_EQ(ret, 0); + + iter.reset(); + ret = accessor.list_all(&iter); + EXPECT_EQ(ret, 0); + list_files.clear(); + for (auto file = iter->next(); file.has_value(); file = iter->next()) { + list_files.insert(std::move(file->path)); + } + EXPECT_EQ(list_files.size(), 3); + EXPECT_TRUE(list_files.contains("data/10000/20000/30000/1_0.dat")); + EXPECT_TRUE(list_files.contains("data/10000/20000/1_0.dat")); + EXPECT_TRUE(list_files.contains("data111/10000/1_0.dat")); + + ret = accessor.delete_prefix("data/10000/20000"); + EXPECT_EQ(ret, 0); + + iter.reset(); + ret = accessor.list_all(&iter); + EXPECT_EQ(ret, 0); + list_files.clear(); + for (auto file = iter->next(); file.has_value(); file = iter->next()) { + list_files.insert(std::move(file->path)); + } + EXPECT_EQ(list_files.size(), 1); + EXPECT_TRUE(list_files.contains("data111/10000/1_0.dat")); + + ret = accessor.delete_prefix("data111/10000/1_0.dat"); + EXPECT_EQ(ret, 0); + + iter.reset(); + ret = accessor.list_all(&iter); + EXPECT_EQ(ret, 0); + list_files.clear(); + for (auto file = iter->next(); file.has_value(); file = iter->next()) { + list_files.insert(std::move(file->path)); + } + EXPECT_EQ(list_files.size(), 0); } } // namespace doris::cloud