Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ class CorrelationEngine {
struct PendingCluster {
ClusterData data;
std::chrono::steady_clock::time_point steady_first_at;
std::map<std::string, std::string> fault_severities; ///< fault_code -> severity
};

/// Pending clusters being formed (rule_id -> cluster data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ ProcessClearResult CorrelationEngine::process_clear(const std::string & fault_co

auto & codes = pending_cluster.fault_codes;
codes.erase(std::remove(codes.begin(), codes.end(), fault_code), codes.end());
pending_it->second.fault_severities.erase(fault_code);

if (codes.empty()) {
pending_it = pending_clusters_.erase(pending_it);
Expand All @@ -160,16 +161,39 @@ ProcessClearResult CorrelationEngine::process_clear(const std::string & fault_co
for (const auto & rule : config_.rules) {
if (rule.id == pending_it->first) {
switch (rule.representative) {
case Representative::FIRST:
case Representative::HIGHEST_SEVERITY:
// TODO(#213): HIGHEST_SEVERITY reassignment is approximate —
// PendingCluster lacks per-fault severity, so we fall back to
// first remaining fault. Store severities to fix.
pending_cluster.representative_code = codes.front();
case Representative::FIRST: {
auto & sevs = pending_it->second.fault_severities;
const std::string & first_code = codes.front();
pending_cluster.representative_code = first_code;
auto sev_it = sevs.find(first_code);
pending_cluster.representative_severity = (sev_it != sevs.end()) ? sev_it->second : "";
break;
case Representative::MOST_RECENT:
pending_cluster.representative_code = codes.back();
}
case Representative::HIGHEST_SEVERITY: {
auto & sevs = pending_it->second.fault_severities;
std::string best_code = codes.front();
int best_rank = -1;
for (const auto & code : codes) {
auto sev_it = sevs.find(code);
int rank = (sev_it != sevs.end()) ? severity_rank(sev_it->second) : 0;
if (rank > best_rank) {
best_rank = rank;
best_code = code;
}
}
pending_cluster.representative_code = best_code;
auto best_sev_it = sevs.find(best_code);
pending_cluster.representative_severity = (best_sev_it != sevs.end()) ? best_sev_it->second : "";
break;
}
case Representative::MOST_RECENT: {
auto & sevs = pending_it->second.fault_severities;
const std::string & most_recent_code = codes.back();
pending_cluster.representative_code = most_recent_code;
auto sev_it = sevs.find(most_recent_code);
pending_cluster.representative_severity = (sev_it != sevs.end()) ? sev_it->second : "";
break;
}
}
break;
}
Expand All @@ -182,12 +206,21 @@ ProcessClearResult CorrelationEngine::process_clear(const std::string & fault_co
// Remove fault from active cluster
auto active_it = active_clusters_.find(cluster_id);
if (active_it != active_clusters_.end()) {
auto & codes = active_it->second.fault_codes;
auto & active_cluster = active_it->second;
auto & codes = active_cluster.fault_codes;
codes.erase(std::remove(codes.begin(), codes.end(), fault_code), codes.end());

// If cluster is now empty, remove it
if (codes.empty()) {
active_clusters_.erase(active_it);
} else if (active_cluster.representative_code == fault_code) {
// Sync representative from pending cluster (already updated above)
for (const auto & [rule_id, pending] : pending_clusters_) {
if (pending.data.cluster_id == cluster_id) {
active_cluster.representative_code = pending.data.representative_code;
active_cluster.representative_severity = pending.data.representative_severity;
break;
}
}
}
}

Expand Down Expand Up @@ -267,7 +300,15 @@ void CorrelationEngine::cleanup_expired() {
}

for (const auto & rule_id : expired_pending) {
pending_clusters_.erase(rule_id);
auto it = pending_clusters_.find(rule_id);
if (it != pending_clusters_.end()) {
if (active_clusters_.find(it->second.data.cluster_id) == active_clusters_.end()) {
for (const auto & fault_code : it->second.data.fault_codes) {
fault_to_cluster_.erase(fault_code);
}
}
pending_clusters_.erase(it);
}
}
}

Expand Down Expand Up @@ -395,6 +436,7 @@ std::optional<ProcessFaultResult> CorrelationEngine::try_auto_cluster(const std:
pending.data.representative_code = fault_code;
pending.data.representative_severity = severity;
pending.data.fault_codes.push_back(fault_code);
pending.fault_severities[fault_code] = severity;
pending.data.first_at = now_system;
pending.data.last_at = now_system;

Expand Down Expand Up @@ -425,6 +467,7 @@ std::optional<ProcessFaultResult> CorrelationEngine::try_auto_cluster(const std:
}

cluster.fault_codes.push_back(fault_code);
pending.fault_severities[fault_code] = severity;
cluster.last_at = now_system;
fault_to_cluster_[fault_code] = cluster.cluster_id;

Expand Down
137 changes: 137 additions & 0 deletions src/ros2_medkit_fault_manager/test/test_correlation_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,36 @@ TEST_F(CorrelationEngineTest, CleanupExpiredRemovesPendingClusters) {
EXPECT_EQ(0u, engine.get_cluster_count()); // Still not enough
}

TEST_F(CorrelationEngineTest, CleanupExpiredDoesNotBreakActiveCluster) {
auto config = create_auto_cluster_config();
CorrelationEngine engine(config);

// Create an active cluster with old timestamps (past the 500ms window)
auto past = std::chrono::steady_clock::now() - std::chrono::milliseconds(1000);
engine.process_fault("MOTOR_COMM_FL", "ERROR", past);
engine.process_fault("SENSOR_TIMEOUT", "ERROR", past + 10ms);
engine.process_fault("DRIVE_COMM_ERROR", "WARNING", past + 20ms);

// Cluster should be active (3 faults >= min_count=3)
EXPECT_EQ(1u, engine.get_cluster_count());

// Cleanup expires the pending cluster but must NOT wipe fault_to_cluster_
// entries that the active cluster still needs
engine.cleanup_expired();

// Active cluster should still exist
EXPECT_EQ(1u, engine.get_cluster_count());

// process_clear must still find the active cluster via fault_to_cluster_
engine.process_clear("MOTOR_COMM_FL");

// Cluster should still exist with 2 remaining fault codes
EXPECT_EQ(1u, engine.get_cluster_count());
auto clusters = engine.get_clusters();
ASSERT_EQ(1u, clusters.size());
EXPECT_EQ(2u, clusters[0].fault_codes.size());
}

// ============================================================================
// Mixed mode tests
// ============================================================================
Expand Down Expand Up @@ -603,6 +633,7 @@ TEST_F(CorrelationEngineTest, ClearRepresentativeReassignsPendingCluster) {
ASSERT_EQ(1u, clusters.size());
// SECOND_ERROR should be the new representative (first remaining after clear)
EXPECT_EQ("SECOND_ERROR", clusters[0].representative_code);
EXPECT_EQ("ERROR", clusters[0].representative_severity);
}

TEST_F(CorrelationEngineTest, ClearAllFaultsRemovesPendingCluster) {
Expand All @@ -625,6 +656,112 @@ TEST_F(CorrelationEngineTest, ClearAllFaultsRemovesPendingCluster) {
EXPECT_EQ(0u, engine.get_cluster_count()); // Still 2 faults, below min_count=3
}

// ============================================================================
// MOST_RECENT reassignment on clear
// ============================================================================

TEST_F(CorrelationEngineTest, ClearMostRecentRepresentativeReassignsActiveCluster) {
const std::string yaml = R"(
correlation:
enabled: true
patterns:
errors:
codes: ["*_ERROR"]
rules:
- id: test_cluster
mode: auto_cluster
match:
- pattern: errors
min_count: 3
window_ms: 500
show_as_single: true
representative: most_recent
)";
auto config = parse_config_string(yaml);
CorrelationEngine engine(config);

auto t0 = std::chrono::steady_clock::now();

engine.process_fault("FIRST_ERROR", "WARNING", t0);
engine.process_fault("SECOND_ERROR", "ERROR", t0 + 10ms);
engine.process_fault("THIRD_ERROR", "CRITICAL", t0 + 20ms);

EXPECT_EQ(1u, engine.get_cluster_count());
auto clusters = engine.get_clusters();
ASSERT_EQ(1u, clusters.size());
EXPECT_EQ("THIRD_ERROR", clusters[0].representative_code);
EXPECT_EQ("CRITICAL", clusters[0].representative_severity);

engine.process_clear("THIRD_ERROR");

clusters = engine.get_clusters();
ASSERT_EQ(1u, clusters.size());
EXPECT_EQ("SECOND_ERROR", clusters[0].representative_code);
EXPECT_EQ("ERROR", clusters[0].representative_severity);
}

// ============================================================================
// HIGHEST_SEVERITY reassignment on clear (#213)
// ============================================================================

TEST_F(CorrelationEngineTest, ClearHighestSeverityRepresentativeReassigns) {
auto config = create_auto_cluster_config(); // highest_severity, min_count=3
CorrelationEngine engine(config);

auto t0 = std::chrono::steady_clock::now();

// WARNING, CRITICAL (rep), ERROR
engine.process_fault("MOTOR_COMM_FL", "WARNING", t0);
engine.process_fault("SENSOR_TIMEOUT", "CRITICAL", t0 + 10ms);
engine.process_fault("DRIVE_COMM_ERROR", "ERROR", t0 + 20ms);

// SENSOR_TIMEOUT is representative (highest severity)
auto clusters = engine.get_clusters();
ASSERT_EQ(1u, clusters.size());
EXPECT_EQ("SENSOR_TIMEOUT", clusters[0].representative_code);

// Clear the representative
engine.process_clear("SENSOR_TIMEOUT");

// Remaining: WARNING, ERROR -> ERROR should become new representative
clusters = engine.get_clusters();
ASSERT_EQ(1u, clusters.size());
EXPECT_EQ("DRIVE_COMM_ERROR", clusters[0].representative_code);
EXPECT_EQ("ERROR", clusters[0].representative_severity);
}

// ============================================================================
// cleanup_expired removes stale fault_to_cluster_ entries (#214)
// ============================================================================

TEST_F(CorrelationEngineTest, CleanupExpiredRemovesFaultToClusterEntries) {
auto config = create_auto_cluster_config();
CorrelationEngine engine(config);

// Create pending cluster with old timestamp
auto past = std::chrono::steady_clock::now() - std::chrono::milliseconds(1000);
engine.process_fault("MOTOR_COMM_FL", "ERROR", past);
engine.process_fault("SENSOR_TIMEOUT", "ERROR", past + 10ms);
EXPECT_EQ(0u, engine.get_cluster_count());

// Cleanup removes expired pending cluster
engine.cleanup_expired();

// Faults from expired cluster should be able to start fresh.
// If fault_to_cluster_ was NOT cleaned, these would try to join
// a non-existent cluster instead of creating a new one.
auto t1 = std::chrono::steady_clock::now();
engine.process_fault("MOTOR_COMM_FL", "ERROR", t1);
engine.process_fault("SENSOR_TIMEOUT", "ERROR", t1 + 10ms);
engine.process_fault("DRIVE_COMM_ERROR", "ERROR", t1 + 20ms);

// Should form a NEW cluster with all 3 faults
EXPECT_EQ(1u, engine.get_cluster_count());
auto clusters = engine.get_clusters();
ASSERT_EQ(1u, clusters.size());
EXPECT_EQ(3u, clusters[0].fault_codes.size());
}

int main(int argc, char ** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
Expand Down