Skip to content

Commit

Permalink
Better
Browse files Browse the repository at this point in the history
  • Loading branch information
kssenii committed Oct 18, 2023
1 parent 3d598b1 commit fc1ed16
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 39 deletions.
113 changes: 76 additions & 37 deletions src/Storages/S3Queue/S3QueueFilesMetadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -310,38 +310,35 @@ S3QueueFilesMetadata::Result S3QueueFilesMetadata::trySetFileAsProcessing(const
return {};
}

/// Let's go and check metadata in zookeeper and try to create a /processing ephemeral node.
/// If successful, return result with processing node holder.
SetFileProcessingResult result;
ProcessingNodeHolderPtr processing_node_holder;

/// Let's go and check metadata in zookeeper and try to create a /processing ephemeral node.
/// If successful, return result with processing node holder.
switch (mode)
{
case S3QueueMode::ORDERED:
{
std::tie(result, processing_node_holder) = trySetFileAsProcessingForOrderedMode(path);
processing_node_holder = trySetFileAsProcessingForOrderedMode(path, *file_status);
break;
}
case S3QueueMode::UNORDERED:
{
std::tie(result, processing_node_holder) = trySetFileAsProcessingForUnorderedMode(path);
processing_node_holder = trySetFileAsProcessingForUnorderedMode(path, *file_status);
break;
}
}

updateCachedFileStatus(result, *file_status);

file_status->profile_counters.increment(ProfileEvents::S3QueueSetFileProcessingMicroseconds, timer.get());
timer.cancel();

if (result == SetFileProcessingResult::Success)
if (processing_node_holder)
return { processing_node_holder, file_status, false };

return {};
}

std::pair<S3QueueFilesMetadata::SetFileProcessingResult,
S3QueueFilesMetadata::ProcessingNodeHolderPtr> S3QueueFilesMetadata::trySetFileAsProcessingForUnorderedMode(const std::string & path)
S3QueueFilesMetadata::ProcessingNodeHolderPtr
S3QueueFilesMetadata::trySetFileAsProcessingForUnorderedMode(const std::string & path, FileStatus & file_status)
{
/// In one zookeeper transaction do the following:
/// 1. check that corresponding persistent nodes do not exist in processed/ and failed/;
Expand All @@ -366,35 +363,40 @@ std::pair<S3QueueFilesMetadata::SetFileProcessingResult,
Coordination::Responses responses;
auto code = zk_client->tryMulti(requests, responses);

SetFileProcessingResult result;
std::unique_ptr<ProcessingNodeHolder> processing_node_holder = nullptr;
if (code == Coordination::Error::ZOK)
{
auto holder = std::make_unique<ProcessingNodeHolder>(node_metadata.processing_id, path, zookeeper_processing_path / node_name, zk_client);
return std::pair{SetFileProcessingResult::Success, std::move(holder)};
processing_node_holder = std::make_unique<ProcessingNodeHolder>(
node_metadata.processing_id, path, zookeeper_processing_path / node_name, zk_client);
result = SetFileProcessingResult::Success;
}

if (responses[0]->error != Coordination::Error::ZOK)
else if (responses[0]->error != Coordination::Error::ZOK)
{
LOG_TEST(log, "File {} is already processed", path);
return std::pair{SetFileProcessingResult::AlreadyProcessed, nullptr};
result = SetFileProcessingResult::AlreadyProcessed;
}
else if (responses[2]->error != Coordination::Error::ZOK)
{
LOG_TEST(log, "File {} is failed and no longer retriable", path);
return std::pair{SetFileProcessingResult::AlreadyFailed, nullptr};
result = SetFileProcessingResult::AlreadyFailed;
}
else if (responses[4]->error != Coordination::Error::ZOK)
{
LOG_TEST(log, "File {} is already processing", path);
return std::pair{SetFileProcessingResult::ProcessingByOtherNode, nullptr};
result = SetFileProcessingResult::ProcessingByOtherNode;
}
else
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected state of zookeeper transaction: {}", magic_enum::enum_name(code));
}

updateCachedFileStatus(result, file_status);
return processing_node_holder;
}

std::pair<S3QueueFilesMetadata::SetFileProcessingResult,
S3QueueFilesMetadata::ProcessingNodeHolderPtr> S3QueueFilesMetadata::trySetFileAsProcessingForOrderedMode(const std::string & path)
S3QueueFilesMetadata::ProcessingNodeHolderPtr
S3QueueFilesMetadata::trySetFileAsProcessingForOrderedMode(const std::string & path, FileStatus & file_status)
{
/// Same as for Unordered mode.
/// The only difference is the check if the file is already processed.
Expand All @@ -407,6 +409,8 @@ std::pair<S3QueueFilesMetadata::SetFileProcessingResult,
auto node_metadata = createNodeMetadata(path);
node_metadata.processing_id = getRandomASCIIString(10);

SetFileProcessingResult result;
std::unique_ptr<ProcessingNodeHolder> processing_node_holder = nullptr;
while (true)
{
/// Get a /processed node content - max_processed path.
Expand All @@ -425,7 +429,10 @@ std::pair<S3QueueFilesMetadata::SetFileProcessingResult,

auto max_processed_file_path = processed_node_metadata.file_path;
if (!max_processed_file_path.empty() && path <= max_processed_file_path)
return std::pair{SetFileProcessingResult::AlreadyProcessed, nullptr};
{
result = SetFileProcessingResult::AlreadyProcessed;
break;
}

Coordination::Requests requests;
requests.push_back(zkutil::makeCreateRequest(zookeeper_failed_path / node_name, "", zkutil::CreateMode::Persistent));
Expand All @@ -438,25 +445,32 @@ std::pair<S3QueueFilesMetadata::SetFileProcessingResult,
auto code = zk_client->tryMulti(requests, responses);
if (code == Coordination::Error::ZOK)
{
auto holder = std::make_unique<ProcessingNodeHolder>(node_metadata.processing_id, path, zookeeper_processing_path / node_name, zk_client);
return std::pair{SetFileProcessingResult::Success, std::move(holder)};
processing_node_holder = std::make_unique<ProcessingNodeHolder>(
node_metadata.processing_id, path, zookeeper_processing_path / node_name, zk_client);
result = SetFileProcessingResult::Success;
break;
}

if (responses[0]->error != Coordination::Error::ZOK)
{
LOG_TEST(log, "Skipping file `{}`: failed", path);
return std::pair{SetFileProcessingResult::AlreadyFailed, nullptr};
result = SetFileProcessingResult::AlreadyFailed;
break;
}
else if (responses[2]->error != Coordination::Error::ZOK)
{
LOG_TEST(log, "Skipping file `{}`: already processing", path);
return std::pair{SetFileProcessingResult::ProcessingByOtherNode, nullptr};
result = SetFileProcessingResult::ProcessingByOtherNode;
break;
}
else
{
LOG_TEST(log, "Version of max processed file changed. Retring the check for file `{}`", path);
}
}

updateCachedFileStatus(result, file_status);
return processing_node_holder;
}

S3QueueFilesMetadata::Result S3QueueFilesMetadata::trySetFileAsProcessing(const std::deque<std::string> & paths)
Expand Down Expand Up @@ -494,6 +508,7 @@ S3QueueFilesMetadata::trySetFileAsProcessingForOrderedMode(const std::deque<std:
std::string max_processed_file;
std::optional<int> window_version;
std::deque<std::string> waitlist;
bool window_finished_or_empty = false;
{
Coordination::Requests requests;
requests.push_back(zkutil::makeGetRequest(zookeeper_batch_min_path));
Expand All @@ -510,26 +525,36 @@ S3QueueFilesMetadata::trySetFileAsProcessingForOrderedMode(const std::deque<std:
window_version = dynamic_cast<const Coordination::GetResponse *>(responses[0].get())->stat.version;
max_processed_file = dynamic_cast<const Coordination::GetResponse *>(responses[3].get())->data;

if (!window_min.empty())
window_finished_or_empty = window_max.empty() || window_max <= max_processed_file;
if (!window_finished_or_empty)
{
LOG_TEST(log, "Got processing window: [{}, {}]", window_min, window_max);
LOG_TEST(log, "Having acrive processing window: [{}, {}]", window_min, window_max);

if (paths.back() < window_min)
{
LOG_TEST(log, "Window from file {} to file {} is already processed", paths.front(), paths.back());
LOG_TEST(log,
"Window from file {} to file {} is already processed",
paths.front(), paths.back());

return { {}, nullptr, true };
}
if (paths.front() > window_max && window_max > max_processed_file)

if (paths.front() > window_max)
{
LOG_TEST(log, "Window from file {} to file {} cannot be processed at the moment", paths.front(), paths.back());
LOG_TEST(log,
"Window from file {} to file {} cannot be processed at the moment",
paths.front(), paths.back());

return { {}, nullptr, false };
}

auto max_processed_path_version = dynamic_cast<const Coordination::GetResponse *>(responses[3].get())->stat.version;
auto waitlist_str = dynamic_cast<const Coordination::GetResponse *>(responses[2].get())->data;
waitlist = fromFilesList(waitlist_str);

LOG_TEST(log, "Window version: {}, max processed file: {}, waitlist: {}", *window_version, max_processed_file, waitlist_str);
LOG_TEST(log,
"Window version: {}, max processed file: {}, waitlist: {}",
*window_version, max_processed_file, waitlist_str);

if (waitlist.empty())
{
Expand All @@ -544,24 +569,33 @@ S3QueueFilesMetadata::trySetFileAsProcessingForOrderedMode(const std::deque<std:
{
if (paths.back() <= max_processed_file)
{
LOG_TEST(log, "Window from file {} to file {} is fully processed", paths.front(), paths.back());
LOG_TEST(log,
"Window from file {} to file {} is fully processed",
paths.front(), paths.back());

return { {}, nullptr, true };
}

else
LOG_TEST(log, "Paths list partially intersects processed window, will split it");
}
else if (code == Coordination::Error::ZBADVERSION)
{
LOG_TEST(log, "Window version changed");
return { {}, nullptr, true };
LOG_TEST(log, "Window version changed, will retry");
continue;
}
else
throw zkutil::KeeperException::fromPath(code, zookeeper_batch_min_path);
}

}
}
else
{
chassert(responses[0]->error != Coordination::Error::ZOK);
window_finished_or_empty = true;
}

if (window_min.empty())
if (window_finished_or_empty)
{
if (!max_processed_file.empty())
{
Expand Down Expand Up @@ -618,17 +652,22 @@ S3QueueFilesMetadata::trySetFileAsProcessingForOrderedMode(const std::deque<std:

Coordination::Requests requests;
requests.push_back(zkutil::makeCheckRequest(zookeeper_batch_min_path, *window_version));
requests.push_back(zkutil::makeCreateRequest(zookeeper_processing_path / node_name, node_metadata.toString(), zkutil::CreateMode::Ephemeral));
requests.push_back(zkutil::makeCreateRequest(zookeeper_processing_path / node_name,
node_metadata.toString(),
zkutil::CreateMode::Ephemeral));
requests.push_back(zkutil::makeCreateRequest(zookeeper_failed_path / node_name, "", zkutil::CreateMode::Persistent));
requests.push_back(zkutil::makeRemoveRequest(zookeeper_failed_path / node_name, -1));
requests.push_back(zkutil::makeCreateRequest(zookeeper_processed_path / node_name, "", zkutil::CreateMode::Persistent));
requests.push_back(zkutil::makeRemoveRequest(zookeeper_processed_path / node_name, -1));

Coordination::Responses responses;
code = zk_client->tryMulti(requests, responses);

if (code == Coordination::Error::ZOK)
{
auto holder = std::make_unique<ProcessingNodeHolder>(node_metadata.processing_id, path, zookeeper_processing_path / node_name, zk_client);
auto holder = std::make_unique<ProcessingNodeHolder>(
node_metadata.processing_id, path, zookeeper_processing_path / node_name, zk_client);

LOG_TEST(log, "Will process file (2) : {}", path);
return {SetFileProcessingResult::Success, std::move(holder), false};
}
Expand Down
4 changes: 2 additions & 2 deletions src/Storages/S3Queue/S3QueueFilesMetadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ class S3QueueFilesMetadata
AlreadyProcessed,
AlreadyFailed,
};
std::pair<SetFileProcessingResult, ProcessingNodeHolderPtr> trySetFileAsProcessingForOrderedMode(const std::string & path);
std::pair<SetFileProcessingResult, ProcessingNodeHolderPtr> trySetFileAsProcessingForUnorderedMode(const std::string & path);
ProcessingNodeHolderPtr trySetFileAsProcessingForOrderedMode(const std::string & path, FileStatus & file_status);
ProcessingNodeHolderPtr trySetFileAsProcessingForUnorderedMode(const std::string & path, FileStatus & file_status);

std::tuple<SetFileProcessingResult, ProcessingNodeHolderPtr, bool> trySetFileAsProcessingForOrderedMode(const std::deque<std::string> & paths);

Expand Down

0 comments on commit fc1ed16

Please sign in to comment.