Skip to content

Commit

Permalink
[improve][pip] PIP-366: Support to specify different config for Confi…
Browse files Browse the repository at this point in the history
…guration and Local Metadata Store (#23041)
  • Loading branch information
Demogorgon314 authored Aug 5, 2024
1 parent f3c177e commit 76f16e8
Show file tree
Hide file tree
Showing 9 changed files with 164 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -562,10 +562,16 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece

@FieldContext(
category = CATEGORY_SERVER,
doc = "Configuration file path for local metadata store. It's supported by RocksdbMetadataStore for now."
doc = "Configuration file path for local metadata store."
)
private String metadataStoreConfigPath = null;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Configuration file path for configuration metadata store."
)
private String configurationStoreConfigPath = null;

@FieldContext(
dynamic = true,
category = CATEGORY_SERVER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,11 +391,15 @@ public PulsarService(ServiceConfiguration config,
public MetadataStore createConfigurationMetadataStore(PulsarMetadataEventSynchronizer synchronizer,
OpenTelemetry openTelemetry)
throws MetadataStoreException {
String configFilePath = config.getMetadataStoreConfigPath();
if (StringUtils.isNotBlank(config.getConfigurationStoreConfigPath())) {
configFilePath = config.getConfigurationStoreConfigPath();
}
return MetadataStoreFactory.create(config.getConfigurationMetadataStoreUrl(),
MetadataStoreConfig.builder()
.sessionTimeoutMillis((int) config.getMetadataStoreSessionTimeoutMillis())
.allowReadOnlyOperations(config.isMetadataStoreAllowReadOnlyOperations())
.configFilePath(config.getMetadataStoreConfigPath())
.configFilePath(configFilePath)
.batchingEnabled(config.isMetadataStoreBatchingEnabled())
.batchingMaxDelayMillis(config.getMetadataStoreBatchingMaxDelayMillis())
.batchingMaxOperations(config.getMetadataStoreBatchingMaxOperations())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,10 @@
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.client.ZKClientConfig;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;

/**
* Provide a zookeeper client to handle session expire.
Expand Down Expand Up @@ -92,6 +94,9 @@ public class PulsarZooKeeperClient extends ZooKeeper implements Watcher, AutoClo
private final RetryPolicy connectRetryPolicy;
private final RetryPolicy operationRetryPolicy;

// Zookeeper config path
private final String configPath;

// Stats Logger
private final OpStatsLogger createStats;
private final OpStatsLogger getStats;
Expand Down Expand Up @@ -120,8 +125,9 @@ public ZooKeeper call() throws KeeperException, InterruptedException {
ZooKeeper newZk;
try {
newZk = createZooKeeper();
} catch (IOException ie) {
log.error("Failed to create zookeeper instance to " + connectString, ie);
} catch (IOException | QuorumPeerConfig.ConfigException e) {
log.error("Failed to create zookeeper instance to {} with config path {}",
connectString, configPath, e);
throw KeeperException.create(KeeperException.Code.CONNECTIONLOSS);
}
waitForConnection();
Expand Down Expand Up @@ -149,7 +155,7 @@ public String toString() {
static PulsarZooKeeperClient createConnectedZooKeeperClient(
String connectString, int sessionTimeoutMs, Set<Watcher> childWatchers,
RetryPolicy operationRetryPolicy)
throws KeeperException, InterruptedException, IOException {
throws KeeperException, InterruptedException, IOException, QuorumPeerConfig.ConfigException {
return PulsarZooKeeperClient.newBuilder()
.connectString(connectString)
.sessionTimeoutMs(sessionTimeoutMs)
Expand All @@ -171,6 +177,7 @@ public static class Builder {
int retryExecThreadCount = DEFAULT_RETRY_EXECUTOR_THREAD_COUNT;
double requestRateLimit = 0;
boolean allowReadOnlyMode = false;
String configPath = null;

private Builder() {}

Expand Down Expand Up @@ -219,7 +226,15 @@ public Builder allowReadOnlyMode(boolean allowReadOnlyMode) {
return this;
}

public PulsarZooKeeperClient build() throws IOException, KeeperException, InterruptedException {
public Builder configPath(String configPath) {
this.configPath = configPath;
return this;
}

public PulsarZooKeeperClient build() throws IOException,
KeeperException,
InterruptedException,
QuorumPeerConfig.ConfigException {
requireNonNull(connectString);
checkArgument(sessionTimeoutMs > 0);
requireNonNull(statsLogger);
Expand Down Expand Up @@ -251,7 +266,8 @@ public PulsarZooKeeperClient build() throws IOException, KeeperException, Interr
statsLogger,
retryExecThreadCount,
requestRateLimit,
allowReadOnlyMode
allowReadOnlyMode,
configPath
);
// Wait for connection to be established.
try {
Expand All @@ -273,16 +289,19 @@ public static Builder newBuilder() {
}

protected PulsarZooKeeperClient(String connectString,
int sessionTimeoutMs,
ZooKeeperWatcherBase watcherManager,
RetryPolicy connectRetryPolicy,
RetryPolicy operationRetryPolicy,
StatsLogger statsLogger,
int retryExecThreadCount,
double rate,
boolean allowReadOnlyMode) throws IOException {
super(connectString, sessionTimeoutMs, watcherManager, allowReadOnlyMode);
int sessionTimeoutMs,
ZooKeeperWatcherBase watcherManager,
RetryPolicy connectRetryPolicy,
RetryPolicy operationRetryPolicy,
StatsLogger statsLogger,
int retryExecThreadCount,
double rate,
boolean allowReadOnlyMode,
String configPath) throws IOException, QuorumPeerConfig.ConfigException {
super(connectString, sessionTimeoutMs, watcherManager, allowReadOnlyMode,
configPath == null ? null : new ZKClientConfig(configPath));
this.connectString = connectString;
this.configPath = configPath;
this.sessionTimeoutMs = sessionTimeoutMs;
this.allowReadOnlyMode = allowReadOnlyMode;
this.watcherManager = watcherManager;
Expand Down Expand Up @@ -334,7 +353,11 @@ public void waitForConnection() throws KeeperException, InterruptedException {
watcherManager.waitForConnection();
}

protected ZooKeeper createZooKeeper() throws IOException {
protected ZooKeeper createZooKeeper() throws IOException, QuorumPeerConfig.ConfigException {
if (null != configPath) {
return new ZooKeeper(connectString, sessionTimeoutMs, watcherManager, allowReadOnlyMode,
new ZKClientConfig(configPath));
}
return new ZooKeeper(connectString, sessionTimeoutMs, watcherManager, allowReadOnlyMode);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public ZKMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConf
.allowReadOnlyMode(metadataStoreConfig.isAllowReadOnlyOperations())
.sessionTimeoutMs(metadataStoreConfig.getSessionTimeoutMillis())
.watchers(Collections.singleton(this::processSessionWatcher))
.configPath(metadataStoreConfig.getConfigFilePath())
.build();
if (enableSessionWatcher) {
sessionWatcher = new ZKSessionWatcher(zkc, this::receivedSessionEvent);
Expand Down Expand Up @@ -577,6 +578,7 @@ public CompletableFuture<Void> initializeCluster() {
.connectRetryPolicy(
new BoundExponentialBackoffRetryPolicy(metadataStoreConfig.getSessionTimeoutMillis(),
metadataStoreConfig.getSessionTimeoutMillis(), 0))
.configPath(metadataStoreConfig.getConfigFilePath())
.build()) {
if (chrootZk.exists(chrootPath, false) == null) {
createFullPathOptimistic(chrootZk, chrootPath, new byte[0], CreateMode.PERSISTENT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.concurrent.CompletionStage;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataEventSynchronizer;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
Expand Down Expand Up @@ -80,15 +81,17 @@ public OxiaMetadataStore(
}
synchronizer = Optional.ofNullable(metadataStoreConfig.getSynchronizer());
identity = UUID.randomUUID().toString();
client =
OxiaClientBuilder.create(serviceAddress)
.clientIdentifier(identity)
.namespace(namespace)
.sessionTimeout(Duration.ofMillis(metadataStoreConfig.getSessionTimeoutMillis()))
.batchLinger(Duration.ofMillis(linger))
.maxRequestsPerBatch(metadataStoreConfig.getBatchingMaxOperations())
.asyncClient()
.get();
OxiaClientBuilder oxiaClientBuilder = OxiaClientBuilder
.create(serviceAddress)
.clientIdentifier(identity)
.namespace(namespace)
.sessionTimeout(Duration.ofMillis(metadataStoreConfig.getSessionTimeoutMillis()))
.batchLinger(Duration.ofMillis(linger))
.maxRequestsPerBatch(metadataStoreConfig.getBatchingMaxOperations());
if (StringUtils.isNotBlank(metadataStoreConfig.getConfigFilePath())) {
oxiaClientBuilder.loadConfig(metadataStoreConfig.getConfigFilePath());
}
client = oxiaClientBuilder.asyncClient().get();
init();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public Object[][] distributedImplementations() {
};
}

private synchronized String getOxiaServerConnectString() {
protected synchronized String getOxiaServerConnectString() {
if (oxiaServer == null) {
oxiaServer = new OxiaContainer(OxiaContainer.DEFAULT_IMAGE_NAME);
oxiaServer.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
Expand All @@ -36,7 +38,13 @@
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import io.streamnative.oxia.client.ClientConfig;
import io.streamnative.oxia.client.api.AsyncOxiaClient;
import io.streamnative.oxia.client.session.SessionFactory;
import io.streamnative.oxia.client.session.SessionManager;
import lombok.Cleanup;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -51,9 +59,15 @@
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.impl.PulsarZooKeeperClient;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.pulsar.metadata.impl.oxia.OxiaMetadataStore;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.assertj.core.util.Lists;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -457,7 +471,8 @@ public void testThreadSwitchOfZkMetadataStore(boolean hasSynchronizer, boolean e
MetadataStoreConfig config = builder.build();
@Cleanup
ZKMetadataStore store = (ZKMetadataStore) MetadataStoreFactory.create(zks.getConnectionString(), config);

ZooKeeper zkClient = store.getZkClient();
assertTrue(zkClient.getClientConfig().isSaslClientEnabled());
final Runnable verify = () -> {
String currentThreadName = Thread.currentThread().getName();
String errorMessage = String.format("Expect to switch to thread %s, but currently it is thread %s",
Expand Down Expand Up @@ -500,6 +515,49 @@ public void testThreadSwitchOfZkMetadataStore(boolean hasSynchronizer, boolean e
}).join();
}

@Test
public void testZkLoadConfigFromFile() throws Exception {
final String metadataStoreName = UUID.randomUUID().toString().replaceAll("-", "");
MetadataStoreConfig.MetadataStoreConfigBuilder builder =
MetadataStoreConfig.builder().metadataStoreName(metadataStoreName);
builder.fsyncEnable(false);
builder.batchingEnabled(true);
builder.configFilePath("src/test/resources/zk_client_disabled_sasl.conf");
MetadataStoreConfig config = builder.build();
@Cleanup
ZKMetadataStore store = (ZKMetadataStore) MetadataStoreFactory.create(zks.getConnectionString(), config);

PulsarZooKeeperClient zkClient = (PulsarZooKeeperClient) store.getZkClient();
assertFalse(zkClient.getClientConfig().isSaslClientEnabled());

zkClient.process(new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Expired, null));

var zooKeeperRef = (AtomicReference<ZooKeeper>) WhiteboxImpl.getInternalState(zkClient, "zk");
var zooKeeper = Awaitility.await().until(zooKeeperRef::get, Objects::nonNull);
assertFalse(zooKeeper.getClientConfig().isSaslClientEnabled());
}

@Test
public void testOxiaLoadConfigFromFile() throws Exception {
final String metadataStoreName = UUID.randomUUID().toString().replaceAll("-", "");
String oxia = "oxia://" + getOxiaServerConnectString();
MetadataStoreConfig.MetadataStoreConfigBuilder builder =
MetadataStoreConfig.builder().metadataStoreName(metadataStoreName);
builder.fsyncEnable(false);
builder.batchingEnabled(true);
builder.sessionTimeoutMillis(30000);
builder.configFilePath("src/test/resources/oxia_client.conf");
MetadataStoreConfig config = builder.build();

OxiaMetadataStore store = (OxiaMetadataStore) MetadataStoreFactory.create(oxia, config);
var client = (AsyncOxiaClient) WhiteboxImpl.getInternalState(store, "client");
var sessionManager = (SessionManager) WhiteboxImpl.getInternalState(client, "sessionManager");
var sessionFactory = (SessionFactory) WhiteboxImpl.getInternalState(sessionManager, "factory");
var clientConfig = (ClientConfig) WhiteboxImpl.getInternalState(sessionFactory, "config");
var sessionTimeout = clientConfig.sessionTimeout();
assertEquals(sessionTimeout, Duration.ofSeconds(60));
}

@Test(dataProvider = "impl")
public void testPersistent(String provider, Supplier<String> urlSupplier) throws Exception {
String metadataUrl = urlSupplier.get();
Expand Down
20 changes: 20 additions & 0 deletions pulsar-metadata/src/test/resources/oxia_client.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

sessionTimeout=60000
20 changes: 20 additions & 0 deletions pulsar-metadata/src/test/resources/zk_client_disabled_sasl.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

zookeeper.sasl.client=false

0 comments on commit 76f16e8

Please sign in to comment.