Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve] Add num not adhering placement ledgers replicated metric for ReplicationWorker #3652

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,5 @@ public interface ReplicationStats {
String NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION = "NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION";
String NUM_UNDER_REPLICATED_LEDGERS_GUAGE = "NUM_UNDER_REPLICATED_LEDGERS_GUAGE";
String NUM_REPLICATED_LEDGERS = "NUM_REPLICATED_LEDGERS";
String NUM_NOT_ADHERING_PLACEMENT_LEDGERS_REPLICATED = "NUM_NOT_ADHERING_PLACEMENT_LEDGERS_REPLICATED";
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER;
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION;
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_FULL_OR_PARTIAL_LEDGERS_REPLICATED;
import static org.apache.bookkeeper.replication.ReplicationStats.NUM_NOT_ADHERING_PLACEMENT_LEDGERS_REPLICATED;
import static org.apache.bookkeeper.replication.ReplicationStats.REPLICATE_EXCEPTION;
import static org.apache.bookkeeper.replication.ReplicationStats.REPLICATION_WORKER_SCOPE;
import static org.apache.bookkeeper.replication.ReplicationStats.REREPLICATE_OP;
Expand Down Expand Up @@ -136,6 +137,11 @@ public class ReplicationWorker implements Runnable {
help = "the number of entries ReplicationWorker unable to read"
)
private final Counter numEntriesUnableToReadForReplication;
@StatsDoc(
name = NUM_NOT_ADHERING_PLACEMENT_LEDGERS_REPLICATED,
help = "the number of not adhering placement policy ledgers re-replicated"
)
private final Counter numNotAdheringPlacementLedgersReplicated;
private final Map<String, Counter> exceptionCounters;
final LoadingCache<Long, AtomicInteger> replicationFailedLedgers;
final LoadingCache<Long, ConcurrentSkipListSet<Long>> unableToReadEntriesForReplication;
Expand Down Expand Up @@ -217,6 +223,8 @@ public ConcurrentSkipListSet<Long> load(Long key) throws Exception {
.getCounter(NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER);
this.numEntriesUnableToReadForReplication = this.statsLogger
.getCounter(NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION);
this.numNotAdheringPlacementLedgersReplicated = this.statsLogger
.getCounter(NUM_NOT_ADHERING_PLACEMENT_LEDGERS_REPLICATED);
this.exceptionCounters = new HashMap<String, Counter>();
this.onReadEntryFailureCallback = (ledgerid, entryid) -> {
numEntriesUnableToReadForReplication.inc();
Expand Down Expand Up @@ -448,6 +456,7 @@ private boolean rereplicate(long ledgerIdToReplicate) throws InterruptedExceptio

boolean foundOpenFragments = false;
long numFragsReplicated = 0;
long numNotAdheringPlacementFragsReplicated = 0;
for (LedgerFragment ledgerFragment : fragments) {
if (!ledgerFragment.isClosed()) {
foundOpenFragments = true;
Expand All @@ -461,6 +470,10 @@ private boolean rereplicate(long ledgerIdToReplicate) throws InterruptedExceptio
try {
admin.replicateLedgerFragment(lh, ledgerFragment, onReadEntryFailureCallback);
numFragsReplicated++;
if (ledgerFragment.getReplicateType() == LedgerFragment
.ReplicateType.DATA_NOT_ADHERING_PLACEMENT) {
numNotAdheringPlacementFragsReplicated++;
}
} catch (BKException.BKBookieHandleNotAvailableException e) {
LOG.warn("BKBookieHandleNotAvailableException while replicating the fragment", e);
} catch (BKException.BKLedgerRecoveryException e) {
Expand All @@ -473,6 +486,9 @@ private boolean rereplicate(long ledgerIdToReplicate) throws InterruptedExceptio
if (numFragsReplicated > 0) {
numLedgersReplicated.inc();
}
if (numNotAdheringPlacementFragsReplicated > 0) {
numNotAdheringPlacementLedgersReplicated.inc();
}

if (foundOpenFragments || isLastSegmentOpenAndMissingBookies(lh)) {
deferLedgerLockRelease = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1178,6 +1178,8 @@ public void testReplicationStats() throws Exception {
statsLogger.getCounter(ReplicationStats.NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER);
final Counter numLedgersReplicatedCounter =
statsLogger.getCounter(ReplicationStats.NUM_FULL_OR_PARTIAL_LEDGERS_REPLICATED);
final Counter numNotAdheringPlacementLedgersCounter = statsLogger
.getCounter(ReplicationStats.NUM_NOT_ADHERING_PLACEMENT_LEDGERS_REPLICATED);

assertEquals("NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER",
1, numDeferLedgerLockReleaseOfFailedLedgerCounter.get().longValue());
Expand All @@ -1186,10 +1188,15 @@ public void testReplicationStats() throws Exception {
assertFalse((boolean) result);
assertEquals("NUM_FULL_OR_PARTIAL_LEDGERS_REPLICATED",
0, numLedgersReplicatedCounter.get().longValue());
assertEquals("NUM_NOT_ADHERING_PLACEMENT_LEDGERS_REPLICATED",
0, numNotAdheringPlacementLedgersCounter.get().longValue());

} else {
assertTrue((boolean) result);
assertEquals("NUM_FULL_OR_PARTIAL_LEDGERS_REPLICATED",
1, numLedgersReplicatedCounter.get().longValue());
assertEquals("NUM_NOT_ADHERING_PLACEMENT_LEDGERS_REPLICATED",
1, numNotAdheringPlacementLedgersCounter.get().longValue());
}
} catch (Exception e) {
throw new RuntimeException(e);
Expand Down