Skip to content

Commit

Permalink
Optimize reorderReadSequence to check WriteSet instead of entire Ense…
Browse files Browse the repository at this point in the history
…mble (#4478)
  • Loading branch information
ange-k authored Nov 13, 2024
1 parent f148f63 commit 0376bdc
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -920,8 +920,9 @@ DistributionSchedule.WriteSet reorderReadSequenceWithRegion(
if (useRegionAware || reorderReadsRandom) {
isAnyBookieUnavailable = true;
} else {
for (int i = 0; i < ensemble.size(); i++) {
BookieId bookieAddr = ensemble.get(i);
for (int i = 0; i < writeSet.size(); i++) {
int idx = writeSet.get(i);
BookieId bookieAddr = ensemble.get(idx);
if ((!knownBookies.containsKey(bookieAddr) && !readOnlyBookies.contains(bookieAddr))
|| slowBookies.getIfPresent(bookieAddr) != null) {
// Found at least one bookie not available in the ensemble, or in slowBookies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.test.TestStatsProvider;
import org.apache.bookkeeper.test.TestStatsProvider.TestOpStatsLogger;
import org.apache.bookkeeper.test.TestStatsProvider.TestStatsLogger;
import org.apache.bookkeeper.util.StaticDNSResolver;
import org.apache.commons.collections4.CollectionUtils;
Expand Down Expand Up @@ -2359,6 +2360,44 @@ public void testNodeWithFailures() throws Exception {
StaticDNSResolver.reset();
}

@Test
public void testSlowBookieInEnsembleOnly() throws Exception {
repp.uninitalize();
updateMyRack("/r1/rack1");

TestStatsProvider statsProvider = new TestStatsProvider();
TestStatsLogger statsLogger = statsProvider.getStatsLogger("");

repp = new RackawareEnsemblePlacementPolicy();
repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer,
DISABLE_ALL, statsLogger, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);

TestOpStatsLogger readRequestsReorderedCounter = (TestOpStatsLogger) statsLogger
.getOpStatsLogger(BookKeeperClientStats.READ_REQUESTS_REORDERED);

// Update cluster
Set<BookieId> addrs = new HashSet<BookieId>();
addrs.add(addr1.toBookieId());
addrs.add(addr2.toBookieId());
addrs.add(addr3.toBookieId());
addrs.add(addr4.toBookieId());
repp.onClusterChanged(addrs, new HashSet<BookieId>());
repp.registerSlowBookie(addr1.toBookieId(), 0L);
Map<BookieId, Long> bookiePendingMap = new HashMap<>();
bookiePendingMap.put(addr1.toBookieId(), 1L);
repp.onClusterChanged(addrs, new HashSet<>());

DistributionSchedule.WriteSet writeSet = writeSetFromValues(1, 2, 3);

DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
ensemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap), writeSet);

// If the slow bookie is only present in the ensemble, no reordering occurs.
assertEquals(writeSet, reorderSet);
assertEquals(0, readRequestsReorderedCounter.getSuccessCount());
}

@Test
public void testReplaceNotAvailableBookieWithDefaultRack() throws Exception {
repp.uninitalize();
Expand Down

0 comments on commit 0376bdc

Please sign in to comment.