Skip to content

Commit

Permalink
Azure Events Hub support fixed (#3540)
Browse files Browse the repository at this point in the history
 UnknownTopicOrPartitionException error suppressing added when calling describeConfigs() for brokers (Azure case).

Co-authored-by: iliax <ikuramshin@provectus.com>
  • Loading branch information
iliax and iliax authored Mar 24, 2023
1 parent 8d3bac8 commit acfe7a4
Showing 1 changed file with 13 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -212,17 +212,24 @@ private static Mono<Map<Integer, List<ConfigEntry>>> loadBrokersConfig(AdminClie
.map(brokerId -> new ConfigResource(ConfigResource.Type.BROKER, Integer.toString(brokerId)))
.collect(toList());
return toMono(client.describeConfigs(resources).all())
// some kafka backends (like MSK serverless) do not support broker's configs retrieval,
// in that case InvalidRequestException will be thrown
.onErrorResume(InvalidRequestException.class, th -> {
log.trace("Error while getting broker {} configs", brokerIds, th);
return Mono.just(Map.of());
})
// some kafka backends don't support broker's configs retrieval,
// and throw various exceptions on describeConfigs() call
.onErrorResume(th -> th instanceof InvalidRequestException // MSK Serverless
|| th instanceof UnknownTopicOrPartitionException, // Azure event hub
th -> {
log.trace("Error while getting configs for brokers {}", brokerIds, th);
return Mono.just(Map.of());
})
// there are situations when kafka-ui user has no DESCRIBE_CONFIGS permission on cluster
.onErrorResume(ClusterAuthorizationException.class, th -> {
log.trace("AuthorizationException while getting configs for brokers {}", brokerIds, th);
return Mono.just(Map.of());
})
// catching all remaining exceptions, but logging on WARN level
.onErrorResume(th -> true, th -> {
log.warn("Unexpected error while getting configs for brokers {}", brokerIds, th);
return Mono.just(Map.of());
})
.map(config -> config.entrySet().stream()
.collect(toMap(
c -> Integer.valueOf(c.getKey().name()),
Expand Down

0 comments on commit acfe7a4

Please sign in to comment.