Skip to content

Commit

Permalink
[cleanup][broker] Various cleanups, leveraging JDK 17
Browse files Browse the repository at this point in the history
  • Loading branch information
cbornet committed Jun 28, 2023
1 parent 2b01f83 commit 479b9f4
Show file tree
Hide file tree
Showing 11 changed files with 179 additions and 223 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.apache.pulsar.metadata.bookkeeper.AbstractMetadataDriver;
import org.apache.pulsar.metadata.bookkeeper.PulsarMetadataClientDriver;

@SuppressWarnings("deprecation")
@Slf4j
public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {

Expand All @@ -71,7 +70,7 @@ public BookKeeper create(ServiceConfiguration conf, MetadataStoreExtended store,

ClientConfiguration bkConf = createBkClientConfiguration(store, conf);
if (properties != null) {
properties.forEach((key, value) -> bkConf.setProperty(key, value));
properties.forEach(bkConf::setProperty);
}
if (ensemblePlacementPolicyClass.isPresent()) {
setEnsemblePlacementPolicy(bkConf, conf, store, ensemblePlacementPolicyClass.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,17 +158,15 @@ public void close() throws IOException {
// factory, however that might be introducing more unknowns.
log.warn("Encountered exceptions on closing bookkeeper client", ree);
}
if (bkEnsemblePolicyToBkClientMap != null) {
bkEnsemblePolicyToBkClientMap.forEach((policy, bk) -> {
try {
if (bk != null) {
bk.close();
}
} catch (Exception e) {
log.warn("Failed to close bookkeeper-client for policy {}", policy, e);
bkEnsemblePolicyToBkClientMap.forEach((policy, bk) -> {
try {
if (bk != null) {
bk.close();
}
});
}
} catch (Exception e) {
log.warn("Failed to close bookkeeper-client for policy {}", policy, e);
}
});
log.info("Closed BookKeeper client");
} catch (Exception e) {
log.warn(e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,12 @@
/**
* Main class for Pulsar broker service.
*/

@Getter(AccessLevel.PUBLIC)
@Setter(AccessLevel.PROTECTED)
public class PulsarService implements AutoCloseable, ShutdownService {
private static final Logger LOG = LoggerFactory.getLogger(PulsarService.class);
private static final double GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT = 0.5d;
private ServiceConfiguration config = null;
private final ServiceConfiguration config;
private NamespaceService nsService = null;
private ManagedLedgerStorage managedLedgerClientFactory = null;
private LeaderElectionService leaderElectionService = null;
Expand Down Expand Up @@ -255,7 +254,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private AdditionalServlets brokerAdditionalServlets;

// packages management service
private Optional<PackagesManagement> packagesManagement = Optional.empty();
private PackagesManagement packagesManagement = null;
private PulsarPrometheusMetricsServlet metricsServlet;
private List<PrometheusRawMetricsProvider> pendingMetricsProviders;

Expand Down Expand Up @@ -285,10 +284,8 @@ public enum State {
private Map<String, AdvertisedListener> advertisedListeners;

public PulsarService(ServiceConfiguration config) {
this(config, Optional.empty(), (exitCode) -> {
LOG.info("Process termination requested with code {}. "
+ "Ignoring, as this constructor is intended for tests. ", exitCode);
});
this(config, Optional.empty(), (exitCode) -> LOG.info("Process termination requested with code {}. "
+ "Ignoring, as this constructor is intended for tests. ", exitCode));
}

public PulsarService(ServiceConfiguration config, Optional<WorkerService> functionWorkerService,
Expand Down Expand Up @@ -370,7 +367,7 @@ public MetadataStore createConfigurationMetadataStore(PulsarMetadataEventSynchro

/**
* Close the session to the metadata service.
*
* <p>
* This will immediately release all the resource locks held by this broker on the coordination service.
*
* @throws Exception if the close operation fails
Expand Down Expand Up @@ -400,8 +397,12 @@ public void close() throws PulsarServerException {
throw (PulsarServerException) cause;
} else if (getConfiguration().getBrokerShutdownTimeoutMs() == 0
&& (cause instanceof TimeoutException || cause instanceof CancellationException)) {
// ignore shutdown timeout when timeout is 0, which is primarily used in tests
// to forcefully shutdown the broker
if (LOG.isDebugEnabled()) {
LOG.debug(
"Shutdown timeout ignored when timeout is 0, "
+ "which is primarily used in tests to forcefully shutdown the broker",
cause);
}
} else {
throw new PulsarServerException(cause);
}
Expand Down Expand Up @@ -693,7 +694,7 @@ public void start() throws PulsarServerException {
throw new PulsarServerException("Cannot start the service once it was stopped");
}

if (!config.getWebServicePort().isPresent() && !config.getWebServicePortTls().isPresent()) {
if (config.getWebServicePort().isEmpty() && config.getWebServicePortTls().isEmpty()) {
throw new IllegalArgumentException("webServicePort/webServicePortTls must be present");
}

Expand Down Expand Up @@ -722,7 +723,7 @@ public void start() throws PulsarServerException {
config.getDefaultRetentionTimeInMinutes() * 60));
}

if (!config.getLoadBalancerOverrideBrokerNicSpeedGbps().isPresent()
if (config.getLoadBalancerOverrideBrokerNicSpeedGbps().isEmpty()
&& config.isLoadBalancerEnabled()
&& LinuxInfoUtils.isLinux()
&& !LinuxInfoUtils.checkHasNicSpeeds()) {
Expand Down Expand Up @@ -896,7 +897,7 @@ public void start() throws PulsarServerException {
if (isNotBlank(config.getResourceUsageTransportClassName())) {
Class<?> clazz = Class.forName(config.getResourceUsageTransportClassName());
Constructor<?> ctor = clazz.getConstructor(PulsarService.class);
Object object = ctor.newInstance(new Object[]{this});
Object object = ctor.newInstance(this);
this.resourceUsageTransportManager = (ResourceUsageTopicTransportManager) object;
}
this.resourceGroupServiceManager = new ResourceGroupService(this);
Expand Down Expand Up @@ -1241,7 +1242,6 @@ protected void startLoadManagementService() throws PulsarServerException {
* Load all the topics contained in a namespace.
*
* @param bundle <code>NamespaceBundle</code> to identify the service unit
* @throws Exception
*/
public void loadNamespaceTopics(NamespaceBundle bundle) {
executor.submit(() -> {
Expand Down Expand Up @@ -1296,7 +1296,7 @@ public InternalConfigurationData getInternalConfigurationData() {
config.getConfigurationMetadataStoreUrl(),
new ClientConfiguration().getZkLedgersRootPath(),
config.isBookkeeperMetadataStoreSeparated() ? config.getBookkeeperMetadataStoreUrl() : null,
this.getWorkerConfig().map(wc -> wc.getStateStorageServiceUrl()).orElse(null));
this.getWorkerConfig().map(WorkerConfig::getStateStorageServiceUrl).orElse(null));
}

/**
Expand Down Expand Up @@ -1411,7 +1411,7 @@ public LedgerOffloader createManagedLedgerOffloader(OffloadPoliciesImpl offloadP
Offloaders offloaders = offloadersCache.getOrLoadOffloaders(
offloadPolicies.getOffloadersDirectory(), config.getNarExtractionDirectory());

LedgerOffloaderFactory offloaderFactory = offloaders.getOffloaderFactory(
LedgerOffloaderFactory<?> offloaderFactory = offloaders.getOffloaderFactory(
offloadPolicies.getManagedLedgerOffloadDriver());
try {
return offloaderFactory.create(
Expand Down Expand Up @@ -1699,7 +1699,8 @@ public String webAddress(ServiceConfiguration config) {
AdvertisedListener internalListener = ServiceConfigurationUtils.getInternalListener(config, "http");
return internalListener.getBrokerHttpUrl() != null
? internalListener.getBrokerHttpUrl().toString()
: webAddress(ServiceConfigurationUtils.getWebServiceAddress(config), getListenPortHTTP().get());
: webAddress(ServiceConfigurationUtils.getWebServiceAddress(config),
getListenPortHTTP().orElseThrow());
} else {
return null;
}
Expand All @@ -1714,7 +1715,8 @@ public String webAddressTls(ServiceConfiguration config) {
AdvertisedListener internalListener = ServiceConfigurationUtils.getInternalListener(config, "https");
return internalListener.getBrokerHttpsUrl() != null
? internalListener.getBrokerHttpsUrl().toString()
: webAddressTls(ServiceConfigurationUtils.getWebServiceAddress(config), getListenPortHTTPS().get());
: webAddressTls(ServiceConfigurationUtils.getWebServiceAddress(config),
getListenPortHTTPS().orElseThrow());
} else {
return null;
}
Expand All @@ -1736,7 +1738,7 @@ public String getSafeBrokerServiceUrl() {
public String getLookupServiceAddress() {
return String.format("%s:%s", advertisedAddress, config.getWebServicePort().isPresent()
? config.getWebServicePort().get()
: config.getWebServicePortTls().get());
: config.getWebServicePortTls().orElseThrow());
}

public TopicPoliciesService getTopicPoliciesService() {
Expand Down Expand Up @@ -1798,21 +1800,22 @@ private void startWorkerService(AuthenticationService authenticationService,
}

public PackagesManagement getPackagesManagement() throws UnsupportedOperationException {
return packagesManagement.orElseThrow(() -> new UnsupportedOperationException("Package Management Service "
+ "is not enabled in the broker."));
if (packagesManagement == null) {
throw new UnsupportedOperationException("Package Management Service is not enabled in the broker.");
}
return packagesManagement;
}

private void startPackagesManagementService() throws IOException {
// TODO: using provider to initialize the packages management service.
PackagesManagement packagesManagementService = new PackagesManagementImpl();
this.packagesManagement = Optional.of(packagesManagementService);
this.packagesManagement = new PackagesManagementImpl();
PackagesStorageProvider storageProvider = PackagesStorageProvider
.newProvider(config.getPackagesManagementStorageProvider());
DefaultPackagesStorageConfiguration storageConfiguration = new DefaultPackagesStorageConfiguration();
storageConfiguration.setProperty(config.getProperties());
PackagesStorage storage = storageProvider.getStorage(storageConfiguration);
storage.initialize();
packagesManagementService.initialize(storage);
this.packagesManagement.initialize(storage);
}

public Optional<Integer> getListenPortHTTP() {
Expand Down
Loading

0 comments on commit 479b9f4

Please sign in to comment.