Skip to content

Commit efa5e8b

Browse files
merlimatlhotari
authored andcommittedJun 10, 2024
[fix] Remove blocking calls from BookieRackAffinityMapping (apache#22846)
(cherry picked from commit aece67e)
1 parent 477499d commit efa5e8b

File tree

2 files changed

+28
-18
lines changed

2 files changed

+28
-18
lines changed
 

‎pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java

+27-17
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping
7070
private BookiesRackConfiguration racksWithHost = new BookiesRackConfiguration();
7171
private Map<String, BookieInfo> bookieInfoMap = new HashMap<>();
7272

73-
public static MetadataStore createMetadataStore(Configuration conf) throws MetadataException {
73+
static MetadataStore getMetadataStore(Configuration conf) throws MetadataException {
7474
MetadataStore store;
7575
Object storeProperty = conf.getProperty(METADATA_STORE_INSTANCE);
7676
if (storeProperty != null) {
@@ -116,12 +116,20 @@ public synchronized void setConf(Configuration conf) {
116116
super.setConf(conf);
117117
MetadataStore store;
118118
try {
119-
store = createMetadataStore(conf);
120-
bookieMappingCache = store.getMetadataCache(BookiesRackConfiguration.class);
121-
store.registerListener(this::handleUpdates);
122-
racksWithHost = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get()
123-
.orElseGet(BookiesRackConfiguration::new);
124-
for (Map<String, BookieInfo> bookieMapping : racksWithHost.values()) {
119+
store = getMetadataStore(conf);
120+
} catch (MetadataException e) {
121+
throw new RuntimeException(METADATA_STORE_INSTANCE + " failed to init BookieId list");
122+
}
123+
124+
bookieMappingCache = store.getMetadataCache(BookiesRackConfiguration.class);
125+
store.registerListener(this::handleUpdates);
126+
127+
try {
128+
var racksWithHost = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH)
129+
.thenApply(optRes -> optRes.orElseGet(BookiesRackConfiguration::new))
130+
.get();
131+
132+
for (var bookieMapping : racksWithHost.values()) {
125133
for (String address : bookieMapping.keySet()) {
126134
bookieAddressListLastTime.add(BookieId.parse(address));
127135
}
@@ -131,10 +139,12 @@ public synchronized void setConf(Configuration conf) {
131139
}
132140
}
133141
updateRacksWithHost(racksWithHost);
134-
watchAvailableBookies();
135-
} catch (InterruptedException | ExecutionException | MetadataException e) {
136-
throw new RuntimeException(METADATA_STORE_INSTANCE + " failed to init BookieId list");
142+
} catch (ExecutionException | InterruptedException e) {
143+
LOG.error("Failed to update rack info. ", e);
144+
throw new RuntimeException(e);
137145
}
146+
147+
watchAvailableBookies();
138148
}
139149

140150
private void watchAvailableBookies() {
@@ -145,13 +155,13 @@ private void watchAvailableBookies() {
145155
field.setAccessible(true);
146156
RegistrationClient registrationClient = (RegistrationClient) field.get(bookieAddressResolver);
147157
registrationClient.watchWritableBookies(versioned -> {
148-
try {
149-
racksWithHost = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get()
150-
.orElseGet(BookiesRackConfiguration::new);
151-
updateRacksWithHost(racksWithHost);
152-
} catch (InterruptedException | ExecutionException e) {
153-
LOG.error("Failed to update rack info. ", e);
154-
}
158+
bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH)
159+
.thenApply(optRes -> optRes.orElseGet(BookiesRackConfiguration::new))
160+
.thenAccept(this::updateRacksWithHost)
161+
.exceptionally(ex -> {
162+
LOG.error("Failed to update rack info. ", ex);
163+
return null;
164+
});
155165
});
156166
} catch (NoSuchFieldException | IllegalAccessException e) {
157167
LOG.error("Failed watch available bookies.", e);

‎pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public RackawareEnsemblePlacementPolicyImpl initialize(ClientConfiguration conf,
7373
StatsLogger statsLogger, BookieAddressResolver bookieAddressResolver) {
7474
MetadataStore store;
7575
try {
76-
store = BookieRackAffinityMapping.createMetadataStore(conf);
76+
store = BookieRackAffinityMapping.getMetadataStore(conf);
7777
} catch (MetadataException e) {
7878
throw new RuntimeException(METADATA_STORE_INSTANCE + " failed initialized");
7979
}

0 commit comments

Comments
 (0)