Skip to content

Commit c053fc9

Browse files
committed
Returns an exceptional future from validateChannelState
1 parent 652083e commit c053fc9

File tree

2 files changed

+95
-81
lines changed

2 files changed

+95
-81
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java

+62-62
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@
6868
import org.apache.pulsar.common.naming.NamespaceName;
6969
import org.apache.pulsar.common.naming.TopicDomain;
7070
import org.apache.pulsar.common.naming.TopicName;
71-
import org.apache.pulsar.common.util.FutureUtil;
7271
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
7372
import org.apache.pulsar.metadata.api.NotificationType;
7473
import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
@@ -190,7 +189,10 @@ public ServiceUnitStateChannelImpl(PulsarService pulsar) {
190189
}
191190

192191
public synchronized void start() throws PulsarServerException {
193-
validateChannelState(LeaderElectionServiceStarted, false);
192+
if (!validateChannelState(LeaderElectionServiceStarted, false)) {
193+
throw new IllegalStateException("Invalid channel state:" + channelState.name());
194+
}
195+
194196
try {
195197
leaderElectionService.start();
196198
this.channelState = LeaderElectionServiceStarted;
@@ -269,15 +271,24 @@ public synchronized void close() throws PulsarServerException {
269271
}
270272
}
271273

272-
private void validateChannelState(ChannelState targetState, boolean checkLowerIds) {
274+
private boolean validateChannelState(ChannelState targetState, boolean checkLowerIds) {
273275
int order = checkLowerIds ? -1 : 1;
274276
if (Integer.compare(channelState.id, targetState.id) * order > 0) {
275-
throw new IllegalStateException("Invalid channel state:" + channelState.name());
277+
return false;
276278
}
279+
return true;
280+
}
281+
282+
private boolean debug() {
283+
return pulsar.getConfiguration().isLoadBalancerDebugModeEnabled() || log.isDebugEnabled();
277284
}
278285

279286
public CompletableFuture<Optional<String>> getChannelOwnerAsync() {
280-
validateChannelState(LeaderElectionServiceStarted, true);
287+
if (!validateChannelState(LeaderElectionServiceStarted, true)) {
288+
return CompletableFuture.failedFuture(
289+
new IllegalStateException("Invalid channel state:" + channelState.name()));
290+
}
291+
281292
return leaderElectionService.readCurrentLeader().thenApply(leader -> {
282293
//expecting http://broker-xyz:port
283294
// TODO: discard this protocol prefix removal
@@ -317,7 +328,11 @@ private boolean isChannelOwner() {
317328
}
318329

319330
public CompletableFuture<Optional<String>> getOwnerAsync(String serviceUnit) {
320-
validateChannelState(Started, true);
331+
if (!validateChannelState(Started, true)) {
332+
return CompletableFuture.failedFuture(
333+
new IllegalStateException("Invalid channel state:" + channelState.name()));
334+
}
335+
321336
ServiceUnitStateData data = tableview.get(serviceUnit);
322337
ServiceUnitState state = data == null ? Free : data.state();
323338
ownerLookUpCounters.get(state).incrementAndGet();
@@ -329,81 +344,63 @@ public CompletableFuture<Optional<String>> getOwnerAsync(String serviceUnit) {
329344
return deferGetOwnerRequest(serviceUnit).thenApply(Optional::of);
330345
}
331346
case Free -> {
332-
return CompletableFuture.completedFuture(null);
347+
return CompletableFuture.completedFuture(Optional.empty());
333348
}
334349
default -> {
335350
String errorMsg = String.format("Failed to process service unit state data: %s when get owner.", data);
336351
log.error(errorMsg);
337-
return FutureUtil.failedFuture(new IllegalStateException(errorMsg));
352+
return CompletableFuture.failedFuture(new IllegalStateException(errorMsg));
338353
}
339354
}
340355
}
341356

342357
public CompletableFuture<String> publishAssignEventAsync(String serviceUnit, String broker) {
343358
EventType eventType = Assign;
344359
eventCounters.get(eventType).getTotal().incrementAndGet();
345-
try {
346-
CompletableFuture<String> getOwnerRequest = deferGetOwnerRequest(serviceUnit);
347-
pubAsync(serviceUnit, new ServiceUnitStateData(Assigned, broker))
348-
.whenComplete((__, ex) -> {
349-
if (ex != null) {
350-
getOwnerRequests.remove(serviceUnit, getOwnerRequest);
351-
if (!getOwnerRequest.isCompletedExceptionally()) {
352-
getOwnerRequest.completeExceptionally(ex);
353-
}
354-
eventCounters.get(eventType).getFailure().incrementAndGet();
360+
CompletableFuture<String> getOwnerRequest = deferGetOwnerRequest(serviceUnit);
361+
pubAsync(serviceUnit, new ServiceUnitStateData(Assigned, broker))
362+
.whenComplete((__, ex) -> {
363+
if (ex != null) {
364+
getOwnerRequests.remove(serviceUnit, getOwnerRequest);
365+
if (!getOwnerRequest.isCompletedExceptionally()) {
366+
getOwnerRequest.completeExceptionally(ex);
355367
}
356-
});
357-
return getOwnerRequest;
358-
} catch (Throwable e) {
359-
log.error("Failed to publish assign event. serviceUnit:{}, broker:{}, assignPublishFailureCount:{}",
360-
serviceUnit, broker, eventCounters.get(eventType).getFailure().incrementAndGet(), e);
361-
throw e;
362-
}
368+
eventCounters.get(eventType).getFailure().incrementAndGet();
369+
}
370+
});
371+
return getOwnerRequest;
363372
}
364373

365374
public CompletableFuture<Void> publishUnloadEventAsync(Unload unload) {
366375
EventType eventType = Unload;
367376
eventCounters.get(eventType).getTotal().incrementAndGet();
368-
try {
369-
String serviceUnit = unload.serviceUnit();
370-
CompletableFuture<MessageId> future;
371-
if (isTransferCommand(unload)) {
372-
ServiceUnitStateData next = new ServiceUnitStateData(Assigned,
373-
unload.destBroker().get(), unload.sourceBroker());
374-
future = pubAsync(serviceUnit, next);
375-
} else {
376-
future = tombstoneAsync(serviceUnit);
377-
}
378-
379-
return future.whenComplete((__, ex) -> {
380-
if (ex != null) {
381-
eventCounters.get(eventType).getFailure().incrementAndGet();
382-
}
383-
}).thenApply(__ -> null);
384-
} catch (Throwable e) {
385-
log.error("Failed to publish unload event. unload:{}. unloadPublishFailureCount:{}",
386-
unload, eventCounters.get(eventType).getFailure().incrementAndGet(), e);
387-
throw e;
377+
String serviceUnit = unload.serviceUnit();
378+
CompletableFuture<MessageId> future;
379+
if (isTransferCommand(unload)) {
380+
ServiceUnitStateData next = new ServiceUnitStateData(Assigned,
381+
unload.destBroker().get(), unload.sourceBroker());
382+
future = pubAsync(serviceUnit, next);
383+
} else {
384+
future = tombstoneAsync(serviceUnit);
388385
}
386+
387+
return future.whenComplete((__, ex) -> {
388+
if (ex != null) {
389+
eventCounters.get(eventType).getFailure().incrementAndGet();
390+
}
391+
}).thenApply(__ -> null);
389392
}
390393

391394
public CompletableFuture<Void> publishSplitEventAsync(Split split) {
392395
EventType eventType = Split;
393396
eventCounters.get(eventType).getTotal().incrementAndGet();
394-
try {
395-
String serviceUnit = split.serviceUnit();
396-
ServiceUnitStateData next = new ServiceUnitStateData(Splitting, split.sourceBroker());
397-
return pubAsync(serviceUnit, next).whenComplete((__, ex) -> {
398-
if (ex != null) {
399-
eventCounters.get(eventType).getFailure().incrementAndGet();
400-
}
401-
}).thenApply(__ -> null);
402-
} catch (Throwable e) {
403-
log.error("Failed to publish split event. split:{}, splitPublishFailureCount:{}",
404-
split, eventCounters.get(eventType).getFailure().incrementAndGet(), e);
405-
throw e;
406-
}
397+
String serviceUnit = split.serviceUnit();
398+
ServiceUnitStateData next = new ServiceUnitStateData(Splitting, split.sourceBroker());
399+
return pubAsync(serviceUnit, next).whenComplete((__, ex) -> {
400+
if (ex != null) {
401+
eventCounters.get(eventType).getFailure().incrementAndGet();
402+
}
403+
}).thenApply(__ -> null);
407404
}
408405

409406
private void handle(String serviceUnit, ServiceUnitStateData data) {
@@ -424,8 +421,8 @@ private void handle(String serviceUnit, ServiceUnitStateData data) {
424421
default -> throw new IllegalStateException("Failed to handle channel data:" + data);
425422
}
426423
} catch (Throwable e){
427-
getHandlerFailureCounter(data).incrementAndGet();
428-
log.error("Failed to handle the event. serviceUnit:{}, data:{}", serviceUnit, data, e);
424+
log.error("Failed to handle the event. serviceUnit:{}, data:{}, handlerFailureCount:{}",
425+
serviceUnit, data, getHandlerFailureCounter(data).incrementAndGet(), e);
429426
throw e;
430427
}
431428
}
@@ -455,7 +452,7 @@ private AtomicLong getHandlerFailureCounter(ServiceUnitStateData data) {
455452
}
456453

457454
private AtomicLong getHandlerCounter(ServiceUnitStateData data, boolean total) {
458-
var state = data.state() == null ? Free : data.state();
455+
var state = data == null ? Free : data.state();
459456
var counter = total
460457
? handlerCounters.get(state).getTotal() : handlerCounters.get(state).getFailure();
461458
if (counter == null) {
@@ -541,7 +538,10 @@ private void handleFreeEvent(String serviceUnit) {
541538
}
542539

543540
private CompletableFuture<MessageId> pubAsync(String serviceUnit, ServiceUnitStateData data) {
544-
validateChannelState(Started, true);
541+
if (!validateChannelState(Started, true)) {
542+
return CompletableFuture.failedFuture(
543+
new IllegalStateException("Invalid channel state:" + channelState.name()));
544+
}
545545
CompletableFuture<MessageId> future = new CompletableFuture<>();
546546
producer.newMessage()
547547
.key(serviceUnit)

pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java

+33-19
Original file line numberDiff line numberDiff line change
@@ -228,38 +228,52 @@ public void channelValidationTest()
228228
}
229229

230230
private int validateChannelStart(ServiceUnitStateChannelImpl channel)
231-
throws ExecutionException, InterruptedException, TimeoutException {
231+
throws InterruptedException, TimeoutException {
232232
int errorCnt = 0;
233233
try {
234234
channel.isChannelOwnerAsync().get(2, TimeUnit.SECONDS);
235-
} catch (IllegalStateException e) {
236-
errorCnt++;
235+
} catch (ExecutionException e) {
236+
if(e.getCause() instanceof IllegalStateException){
237+
errorCnt++;
238+
}
237239
}
238240
try {
239241
channel.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get();
240-
} catch (IllegalStateException e) {
241-
errorCnt++;
242+
} catch (ExecutionException e) {
243+
if (e.getCause() instanceof IllegalStateException) {
244+
errorCnt++;
245+
}
242246
}
243247
try {
244-
channel.getOwnerAsync(bundle);
245-
} catch (IllegalStateException e) {
246-
errorCnt++;
248+
channel.getOwnerAsync(bundle).get(2, TimeUnit.SECONDS).get();
249+
} catch (ExecutionException e) {
250+
if (e.getCause() instanceof IllegalStateException) {
251+
errorCnt++;
252+
}
247253
}
248254
try {
249-
channel.publishAssignEventAsync(bundle, lookupServiceAddress1);
250-
} catch (IllegalStateException e) {
251-
errorCnt++;
255+
channel.publishAssignEventAsync(bundle, lookupServiceAddress1).get(2, TimeUnit.SECONDS);
256+
} catch (ExecutionException e) {
257+
if (e.getCause() instanceof IllegalStateException) {
258+
errorCnt++;
259+
}
252260
}
253261
try {
254262
channel.publishUnloadEventAsync(
255-
new Unload(lookupServiceAddress1, bundle, Optional.of(lookupServiceAddress2)));
256-
} catch (IllegalStateException e) {
257-
errorCnt++;
263+
new Unload(lookupServiceAddress1, bundle, Optional.of(lookupServiceAddress2)))
264+
.get(2, TimeUnit.SECONDS);
265+
} catch (ExecutionException e) {
266+
if (e.getCause() instanceof IllegalStateException) {
267+
errorCnt++;
268+
}
258269
}
259270
try {
260-
channel.publishSplitEventAsync(new Split(bundle, lookupServiceAddress1, Map.of()));
261-
} catch (IllegalStateException e) {
262-
errorCnt++;
271+
channel.publishSplitEventAsync(new Split(bundle, lookupServiceAddress1, Map.of()))
272+
.get(2, TimeUnit.SECONDS);
273+
} catch (ExecutionException e) {
274+
if (e.getCause() instanceof IllegalStateException) {
275+
errorCnt++;
276+
}
263277
}
264278
return errorCnt;
265279
}
@@ -483,8 +497,8 @@ public void splitTest() throws Exception {
483497
waitUntilNewOwner(channel2, bundle, null);
484498

485499
// TODO: assert child bundle ownerships in the channels.
486-
validateHandlerCounters(channel1, 1, 0, 1, 0, 0, 0, 1, 0, 0, 0);
487-
validateHandlerCounters(channel2, 1, 0, 1, 0, 0, 0, 1, 0, 0, 0);
500+
validateHandlerCounters(channel1, 1, 0, 1, 0, 0, 0, 1, 0, 1, 0);
501+
validateHandlerCounters(channel2, 1, 0, 1, 0, 0, 0, 1, 0, 1, 0);
488502
validateEventCounters(channel1, 1, 0, 1, 0, 0, 0);
489503
validateEventCounters(channel2, 0, 0, 0, 0, 0, 0);
490504
}

0 commit comments

Comments
 (0)