Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -139,15 +139,47 @@ ProcessClearResult CorrelationEngine::process_clear(const std::string & fault_co
if (cluster_it != fault_to_cluster_.end()) {
const std::string cluster_id = cluster_it->second;

// TODO(#127): Clean up pending_clusters_ when fault is cleared.
// Currently, if a fault is cleared before cluster reaches min_count,
// the pending cluster retains a phantom reference. Low impact since
// pending clusters expire after window_ms, but could cause brief
// inconsistency if cluster becomes active with cleared fault in list.
// Fix requires: iteration (pending_clusters_ keyed by rule_id),
// representative reassignment if cleared fault was representative.

// Remove fault from cluster
// Clean up pending_clusters_ when fault is cleared before min_count (#127)
for (auto pending_it = pending_clusters_.begin(); pending_it != pending_clusters_.end();) {
auto & pending_cluster = pending_it->second.data;
if (pending_cluster.cluster_id != cluster_id) {
++pending_it;
continue;
}

auto & codes = pending_cluster.fault_codes;
codes.erase(std::remove(codes.begin(), codes.end(), fault_code), codes.end());

if (codes.empty()) {
pending_it = pending_clusters_.erase(pending_it);
continue;
}

// Reassign representative if the cleared fault was the representative
if (pending_cluster.representative_code == fault_code) {
for (const auto & rule : config_.rules) {
if (rule.id == pending_it->first) {
switch (rule.representative) {
case Representative::FIRST:
case Representative::HIGHEST_SEVERITY:
// TODO(#213): HIGHEST_SEVERITY reassignment is approximate —
// PendingCluster lacks per-fault severity, so we fall back to
// first remaining fault. Store severities to fix.
pending_cluster.representative_code = codes.front();
break;
case Representative::MOST_RECENT:
pending_cluster.representative_code = codes.back();
break;
}
break;
}
}
}

++pending_it;
}

// Remove fault from active cluster
auto active_it = active_clusters_.find(cluster_id);
if (active_it != active_clusters_.end()) {
auto & codes = active_it->second.fault_codes;
Expand Down
84 changes: 84 additions & 0 deletions src/ros2_medkit_fault_manager/test/test_correlation_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,90 @@ TEST_F(CorrelationEngineTest, AutoClusterRetroactiveMuting) {
EXPECT_EQ("SENSOR_001", clusters[0].representative_code); // FIRST policy
}

// ============================================================================
// Pending cluster cleanup on clear (#127)
// ============================================================================

TEST_F(CorrelationEngineTest, ClearFaultRemovesFromPendingCluster) {
auto config = create_auto_cluster_config();
CorrelationEngine engine(config);

auto t0 = std::chrono::steady_clock::now();

// Add 2 faults (below min_count=3), creating a pending cluster
engine.process_fault("MOTOR_COMM_FL", "ERROR", t0);
engine.process_fault("SENSOR_TIMEOUT", "ERROR", t0 + 10ms);
EXPECT_EQ(0u, engine.get_cluster_count()); // Still pending

// Clear one of the faults
engine.process_clear("MOTOR_COMM_FL");

// Now add a third fault - should NOT activate the cluster because
// the cleared fault was removed from pending, so only 2 faults total
engine.process_fault("DRIVE_COMM_ERROR", "ERROR", t0 + 20ms);
EXPECT_EQ(0u, engine.get_cluster_count()); // Still not enough
}

TEST_F(CorrelationEngineTest, ClearRepresentativeReassignsPendingCluster) {
const std::string yaml = R"(
correlation:
enabled: true
patterns:
errors:
codes: ["*_ERROR"]
rules:
- id: test_cluster
mode: auto_cluster
match:
- pattern: errors
min_count: 3
window_ms: 500
show_as_single: true
representative: first
)";
auto config = parse_config_string(yaml);
CorrelationEngine engine(config);

auto t0 = std::chrono::steady_clock::now();

// Add 2 faults, creating a pending cluster
engine.process_fault("FIRST_ERROR", "CRITICAL", t0);
engine.process_fault("SECOND_ERROR", "ERROR", t0 + 10ms);

// Clear the representative (first fault)
engine.process_clear("FIRST_ERROR");

// Add 2 more faults to reach min_count (SECOND_ERROR + 2 new = 3)
engine.process_fault("THIRD_ERROR", "ERROR", t0 + 20ms);
engine.process_fault("FOURTH_ERROR", "ERROR", t0 + 30ms);

EXPECT_EQ(1u, engine.get_cluster_count());
auto clusters = engine.get_clusters();
ASSERT_EQ(1u, clusters.size());
// SECOND_ERROR should be the new representative (first remaining after clear)
EXPECT_EQ("SECOND_ERROR", clusters[0].representative_code);
}

TEST_F(CorrelationEngineTest, ClearAllFaultsRemovesPendingCluster) {
auto config = create_auto_cluster_config();
CorrelationEngine engine(config);

auto t0 = std::chrono::steady_clock::now();

// Add 2 faults
engine.process_fault("MOTOR_COMM_FL", "ERROR", t0);
engine.process_fault("SENSOR_TIMEOUT", "ERROR", t0 + 10ms);

// Clear both
engine.process_clear("MOTOR_COMM_FL");
engine.process_clear("SENSOR_TIMEOUT");

// Adding 2 new faults should start a fresh pending cluster, not join old one
engine.process_fault("DRIVE_COMM_NEW", "ERROR", t0 + 100ms);
engine.process_fault("MOTOR_COMM_NEW", "ERROR", t0 + 110ms);
EXPECT_EQ(0u, engine.get_cluster_count()); // Still 2 faults, below min_count=3
}

int main(int argc, char ** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
Expand Down