From 517b0408100aaa5d83d3806458cd51b94eb14d62 Mon Sep 17 00:00:00 2001 From: wenbingshen Date: Thu, 17 Nov 2022 01:19:41 +0800 Subject: [PATCH] Add num not adhering placement ledgers replicated metric for ReplicationWorker --- .../bookkeeper/replication/ReplicationStats.java | 1 + .../replication/ReplicationWorker.java | 16 ++++++++++++++++ .../replication/TestReplicationWorker.java | 11 ++++++++++- 3 files changed, 27 insertions(+), 1 deletion(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java index 74b76b23b22..3580e1688d0 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java @@ -66,4 +66,5 @@ public interface ReplicationStats { String NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION = "NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION"; String NUM_UNDER_REPLICATED_LEDGERS_GUAGE = "NUM_UNDER_REPLICATED_LEDGERS_GUAGE"; String NUM_REPLICATED_LEDGERS = "NUM_REPLICATED_LEDGERS"; + String NUM_NOT_ADHERING_PLACEMENT_LEDGERS_REPLICATED = "NUM_NOT_ADHERING_PLACEMENT_LEDGERS_REPLICATED"; } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java index 28062ea36c2..545141bea46 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java @@ -22,6 +22,7 @@ import static org.apache.bookkeeper.replication.ReplicationStats.NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER; import static org.apache.bookkeeper.replication.ReplicationStats.NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION; import static org.apache.bookkeeper.replication.ReplicationStats.NUM_FULL_OR_PARTIAL_LEDGERS_REPLICATED; +import static org.apache.bookkeeper.replication.ReplicationStats.NUM_NOT_ADHERING_PLACEMENT_LEDGERS_REPLICATED; import static org.apache.bookkeeper.replication.ReplicationStats.REPLICATE_EXCEPTION; import static org.apache.bookkeeper.replication.ReplicationStats.REPLICATION_WORKER_SCOPE; import static org.apache.bookkeeper.replication.ReplicationStats.REREPLICATE_OP; @@ -136,6 +137,11 @@ public class ReplicationWorker implements Runnable { help = "the number of entries ReplicationWorker unable to read" ) private final Counter numEntriesUnableToReadForReplication; + @StatsDoc( + name = NUM_NOT_ADHERING_PLACEMENT_LEDGERS_REPLICATED, + help = "the number of not adhering placement policy ledgers re-replicated" + ) + private final Counter numNotAdheringPlacementLedgersReplicated; private final Map exceptionCounters; final LoadingCache replicationFailedLedgers; final LoadingCache> unableToReadEntriesForReplication; @@ -217,6 +223,8 @@ public ConcurrentSkipListSet load(Long key) throws Exception { .getCounter(NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER); this.numEntriesUnableToReadForReplication = this.statsLogger .getCounter(NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION); + this.numNotAdheringPlacementLedgersReplicated = this.statsLogger + .getCounter(NUM_NOT_ADHERING_PLACEMENT_LEDGERS_REPLICATED); this.exceptionCounters = new HashMap(); this.onReadEntryFailureCallback = (ledgerid, entryid) -> { numEntriesUnableToReadForReplication.inc(); @@ -448,6 +456,7 @@ private boolean rereplicate(long ledgerIdToReplicate) throws InterruptedExceptio boolean foundOpenFragments = false; long numFragsReplicated = 0; + long numNotAdheringPlacementFragsReplicated = 0; for (LedgerFragment ledgerFragment : fragments) { if (!ledgerFragment.isClosed()) { foundOpenFragments = true; @@ -461,6 +470,10 @@ private boolean rereplicate(long ledgerIdToReplicate) throws InterruptedExceptio try { admin.replicateLedgerFragment(lh, ledgerFragment, onReadEntryFailureCallback); numFragsReplicated++; + if (ledgerFragment.getReplicateType() == LedgerFragment + .ReplicateType.DATA_NOT_ADHERING_PLACEMENT) { + numNotAdheringPlacementFragsReplicated++; + } } catch (BKException.BKBookieHandleNotAvailableException e) { LOG.warn("BKBookieHandleNotAvailableException while replicating the fragment", e); } catch (BKException.BKLedgerRecoveryException e) { @@ -473,6 +486,9 @@ private boolean rereplicate(long ledgerIdToReplicate) throws InterruptedExceptio if (numFragsReplicated > 0) { numLedgersReplicated.inc(); } + if (numNotAdheringPlacementFragsReplicated > 0) { + numNotAdheringPlacementLedgersReplicated.inc(); + } if (foundOpenFragments || isLastSegmentOpenAndMissingBookies(lh)) { deferLedgerLockRelease = true; diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java index 510d02f8db6..5e5bf2ba462 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java @@ -21,6 +21,7 @@ import static org.apache.bookkeeper.replication.ReplicationStats.AUDITOR_SCOPE; import static org.apache.bookkeeper.replication.ReplicationStats.NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION; +import static org.apache.bookkeeper.replication.ReplicationStats.REPLICATION_SCOPE; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; @@ -1244,7 +1245,12 @@ protected EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfig return ensemblePlacementPolicy; } }; - ReplicationWorker rw = new ReplicationWorker(baseConf, bookKeeper, false, NullStatsLogger.INSTANCE); + + TestStatsProvider statsProvider = new TestStatsProvider(); + TestStatsLogger statsLogger = statsProvider.getStatsLogger(REPLICATION_SCOPE); + Counter numNotAdheringPlacementLedgersCounter = statsLogger + .getCounter(ReplicationStats.NUM_NOT_ADHERING_PLACEMENT_LEDGERS_REPLICATED); + ReplicationWorker rw = new ReplicationWorker(baseConf, bookKeeper, false, statsLogger); rw.start(); //start new bookie, the rack is /rack2 @@ -1262,6 +1268,9 @@ protected EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfig assertNull(stat1); }); + assertTrue("NUM_NOT_ADHERING_PLACEMENT_LEDGERS_REPLICATED", + numNotAdheringPlacementLedgersCounter.get() >= 1); + for (BookieId rack1Book : firstThreeBookies) { killBookie(rack1Book); }