Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handle zkCache failure: invalidate cache and zk-getData failure #377

Merged
merged 6 commits into from
Apr 25, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ public class PulsarService implements AutoCloseable {
private LocalZooKeeperConnectionService localZooKeeperConnectionProvider;
private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(20,
new DefaultThreadFactory("pulsar"));
private final ScheduledExecutorService cacheExecutor = Executors.newScheduledThreadPool(10,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a thought - can we make the pools size configurable and put all constants in one place - for all pools cacheExecutor, scheduledExecutorScheduler and orderedExecutor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we may configure number of netty-io and executor threads but I think we can create an issue and should address into different PR.

new DefaultThreadFactory("zk-cache-callback"));
private final OrderedSafeExecutor orderedExecutor = new OrderedSafeExecutor(8, "pulsar-ordered");
private ScheduledExecutorService loadManagerExecutor = null;
private ScheduledFuture<?> loadReportTask = null;
Expand Down Expand Up @@ -382,10 +384,10 @@ private void startZkCacheService() throws PulsarServerException {

LOG.info("starting configuration cache service");

this.localZkCache = new LocalZooKeeperCache(getZkClient(), getOrderedExecutor(), this.executor);
this.localZkCache = new LocalZooKeeperCache(getZkClient(), getOrderedExecutor(), this.cacheExecutor);
this.globalZkCache = new GlobalZooKeeperCache(getZooKeeperClientFactory(),
(int) config.getZooKeeperSessionTimeoutMillis(), config.getGlobalZookeeperServers(),
getOrderedExecutor(), this.executor);
getOrderedExecutor(), this.cacheExecutor);
try {
this.globalZkCache.start();
} catch (IOException e) {
Expand Down Expand Up @@ -533,6 +535,10 @@ public ScheduledExecutorService getExecutor() {
return executor;
}

public ScheduledExecutorService getCacheExecutor() {
return cacheExecutor;
}

public ScheduledExecutorService getLoadManagerExecutor() {
return loadManagerExecutor;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,11 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E

@Override
protected void handleLookup(CommandLookupTopic lookup) {
if (log.isDebugEnabled()) {
log.debug("Received Lookup from {}", remoteAddress);
}
final long requestId = lookup.getRequestId();
final String topic = lookup.getTopic();
if (log.isDebugEnabled()) {
log.debug("[{}] Received Lookup from {} for {}", topic, remoteAddress, requestId);
}
final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
if (lookupSemaphore.tryAcquire()) {
lookupDestinationAsync(getBrokerService().pulsar(), DestinationName.get(topic), lookup.getAuthoritative(),
Expand Down Expand Up @@ -187,11 +187,11 @@ protected void handleLookup(CommandLookupTopic lookup) {

@Override
protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata partitionMetadata) {
if (log.isDebugEnabled()) {
log.debug("Received PartitionMetadataLookup from {}", remoteAddress);
}
final long requestId = partitionMetadata.getRequestId();
final String topic = partitionMetadata.getTopic();
if (log.isDebugEnabled()) {
log.debug("[{}] Received PartitionMetadataLookup from {} for {}", topic, remoteAddress, requestId);
}
final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
if (lookupSemaphore.tryAcquire()) {
getPartitionedTopicMetadata(getBrokerService().pulsar(), getRole(), DestinationName.get(topic))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.zookeeper.MockZooKeeper;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

Expand All @@ -41,19 +45,28 @@ public class ResourceQuotaCacheTest {
private ZooKeeperCache zkCache;
private LocalZooKeeperCacheService localCache;
private NamespaceBundleFactory bundleFactory;
private OrderedSafeExecutor executor;
private ScheduledExecutorService scheduledExecutor;

@BeforeMethod
public void setup() throws Exception {
pulsar = mock(PulsarService.class);
OrderedSafeExecutor executor = new OrderedSafeExecutor(1, "test");
zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), executor, null);
executor = new OrderedSafeExecutor(1, "test");
scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), executor, scheduledExecutor);
localCache = new LocalZooKeeperCacheService(zkCache, null);
bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32());

doReturn(zkCache).when(pulsar).getLocalZkCache();
doReturn(localCache).when(pulsar).getLocalZkCacheService();
}

@AfterMethod
public void teardown() {
executor.shutdown();
scheduledExecutor.shutdown();
}

@Test
public void testGetSetDefaultQuota() throws Exception {
ResourceQuotaCache cache = new ResourceQuotaCache(zkCache);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.zookeeper.KeeperException;
Expand Down Expand Up @@ -60,6 +62,7 @@ public class OwnershipCacheTest {
private NamespaceService nsService;
private BrokerService brokerService;
private OrderedSafeExecutor executor;
private ScheduledExecutorService scheduledExecutor;

@BeforeMethod
public void setup() throws Exception {
Expand All @@ -68,7 +71,8 @@ public void setup() throws Exception {
pulsar = mock(PulsarService.class);
config = mock(ServiceConfiguration.class);
executor = new OrderedSafeExecutor(1, "test");
zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), executor, null);
scheduledExecutor = Executors.newScheduledThreadPool(2);
zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), executor, scheduledExecutor);
localCache = new LocalZooKeeperCacheService(zkCache, null);
bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32());
nsService = mock(NamespaceService.class);
Expand All @@ -88,6 +92,7 @@ public void setup() throws Exception {
@AfterMethod
public void teardown() throws Exception {
executor.shutdown();
scheduledExecutor.shutdown();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import com.yahoo.pulsar.zookeeper.GlobalZooKeeperCache;
import com.yahoo.pulsar.zookeeper.ZooKeeperClientFactory;

import io.netty.util.concurrent.DefaultThreadFactory;

/**
* Maintains available active broker list and returns next active broker in round-robin for discovery service.
*
Expand All @@ -52,7 +54,8 @@ public class BrokerDiscoveryProvider implements Closeable {
private final AtomicInteger counter = new AtomicInteger();

private final OrderedSafeExecutor orderedExecutor = new OrderedSafeExecutor(4, "pulsar-discovery-ordered");
private final ScheduledExecutorService scheduledExecutorScheduler = Executors.newScheduledThreadPool(1);
private final ScheduledExecutorService scheduledExecutorScheduler = Executors.newScheduledThreadPool(4,
new DefaultThreadFactory("pulsar-discovery"));

private static final String PARTITIONED_TOPIC_PATH_ZNODE = "partitioned-topics";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.slf4j.Logger;
Expand All @@ -33,6 +35,8 @@
import com.yahoo.pulsar.zookeeper.ZooKeeperClientFactory;
import com.yahoo.pulsar.zookeeper.ZooKeeperDataCache;

import io.netty.util.concurrent.DefaultThreadFactory;

/**
* Connects with ZooKeeper and sets watch to listen changes for active broker list.
*
Expand All @@ -47,7 +51,9 @@ public class ZookeeperCacheLoader implements Closeable {

private volatile List<LoadReport> availableBrokers;

private final OrderedSafeExecutor orderedExecutor = new OrderedSafeExecutor(8, "pulsar-discovery");
private final OrderedSafeExecutor orderedExecutor = new OrderedSafeExecutor(8, "pulsar-discovery-ordered-cache");
private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(8,
new DefaultThreadFactory("pulsar-discovery-cache"));

public static final String LOADBALANCE_BROKERS_ROOT = "/loadbalance/brokers";

Expand All @@ -66,7 +72,7 @@ public ZookeeperCacheLoader(ZooKeeperClientFactory zkClientFactory, String zooke
});

this.localZkCache = new LocalZooKeeperCache(localZkConnectionSvc.getLocalZooKeeper(), this.orderedExecutor,
null/* cache uses ForkJoinPool if provided scheduler is null to load data-async */);
executor);
localZkConnectionSvc.start(exitCode -> {
try {
localZkCache.getZooKeeper().close();
Expand Down Expand Up @@ -106,6 +112,7 @@ public ZooKeeperCache getLocalZkCache() {
@Override
public void close() {
orderedExecutor.shutdown();
executor.shutdown();
}

private void updateBrokerList(Set<String> brokerNodes) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.bookkeeper.util.OrderedSafeExecutor;
Expand Down Expand Up @@ -86,6 +87,8 @@ public static interface CacheUpdater<T> {
protected AtomicReference<ZooKeeper> zkSession = new AtomicReference<ZooKeeper>(null);

public ZooKeeperCache(ZooKeeper zkSession, OrderedSafeExecutor executor, ScheduledExecutorService scheduledExecutor) {
checkNotNull(executor);
checkNotNull(scheduledExecutor);
this.executor = executor;
this.scheduledExecutor = scheduledExecutor;
this.zkSession.set(zkSession);
Expand Down Expand Up @@ -166,6 +169,10 @@ private void invalidateExists(String path) {
existsCache.invalidate(path);
}

public void asyncInvalidate(String path) {
scheduledExecutor.submit(() -> invalidate(path));
}

public void invalidate(final String path) {
invalidateData(path);
invalidateChildren(path);
Expand Down Expand Up @@ -222,6 +229,7 @@ public <T> CompletableFuture<Optional<T>> getDataAsync(final String path, final
getDataAsync(path, this, deserializer).thenAccept(data -> {
future.complete(data.map(e -> e.getKey()));
}).exceptionally(ex -> {
asyncInvalidate(path);
if (ex.getCause() instanceof NoNodeException) {
future.complete(Optional.empty());
} else {
Expand Down Expand Up @@ -249,18 +257,22 @@ public <T> Optional<Entry<T, Stat>> getData(final String path, final Watcher wat
try {
return getDataAsync(path, watcher, deserializer).get(cacheTimeOutInSec, TimeUnit.SECONDS);
} catch (ExecutionException e) {
asyncInvalidate(path);
Throwable cause = e.getCause();
if (cause instanceof KeeperException) {
throw (KeeperException) cause;
} else if (cause instanceof InterruptedException) {
LOG.warn("Time-out while fetching {} zk-data in {} sec", path, cacheTimeOutInSec);
invalidate(path);
throw (InterruptedException) cause;
} else if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
} else {
throw new RuntimeException(cause);
}
} catch (TimeoutException e) {
LOG.warn("Time-out while fetching {} zk-data in {} sec", path, cacheTimeOutInSec);
asyncInvalidate(path);
throw e;
}
}

Expand All @@ -275,24 +287,30 @@ public <T> CompletableFuture<Optional<Entry<T, Stat>>> getDataAsync(final String
// Return a future for the z-node to be fetched from ZK
CompletableFuture<Entry<Object, Stat>> zkFuture = new CompletableFuture<>();

this.zkSession.get().getData(path, watcher, (rc, path1, ctx, content, stat) -> {
Executor exec = scheduledExecutor != null ? scheduledExecutor : executor;
if (rc == Code.OK.intValue()) {
try {
T obj = deserializer.deserialize(path, content);
// avoid using the zk-client thread to process the result
exec.execute(() -> zkFuture.complete(new SimpleImmutableEntry<Object, Stat>(obj, stat)));
} catch (Exception e) {
exec.execute(() -> zkFuture.completeExceptionally(e));
// Broker doesn't restart on global-zk session lost: so handling unexpected exception
try {
this.zkSession.get().getData(path, watcher, (rc, path1, ctx, content, stat) -> {
Executor exec = scheduledExecutor != null ? scheduledExecutor : executor;
if (rc == Code.OK.intValue()) {
try {
T obj = deserializer.deserialize(path, content);
// avoid using the zk-client thread to process the result
exec.execute(() -> zkFuture.complete(new SimpleImmutableEntry<Object, Stat>(obj, stat)));
} catch (Exception e) {
exec.execute(() -> zkFuture.completeExceptionally(e));
}
} else if (rc == Code.NONODE.intValue()) {
// Return null values for missing z-nodes, as this is not "exceptional" condition
exec.execute(() -> zkFuture.complete(null));
} else {
exec.execute(() -> zkFuture.completeExceptionally(KeeperException.create(rc)));
}
} else if (rc == Code.NONODE.intValue()) {
// Return null values for missing z-nodes, as this is not "exceptional" condition
exec.execute(() -> zkFuture.complete(null));
} else {
exec.execute(() -> zkFuture.completeExceptionally(KeeperException.create(rc)));
}
}, null);

}, null);
} catch (Exception e) {
LOG.warn("Failed to access zkSession for {} {}", path, e.getMessage(), e);
zkFuture.completeExceptionally(e);
}

return zkFuture;
}).thenAccept(result -> {
if (result != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -61,6 +62,7 @@ public CompletableFuture<Optional<T>> getAsync(String path) {
cache.getDataAsync(path, this, this).thenAccept(entry -> {
future.complete(entry.map(Entry::getKey));
}).exceptionally(ex -> {
cache.asyncInvalidate(path);
future.completeExceptionally(ex);
return null;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void testBasic() throws Exception {
// Case1: ZKCache is given
ZkBookieRackAffinityMapping mapping1 = new ZkBookieRackAffinityMapping();
ClientConfiguration bkClientConf1 = new ClientConfiguration();
bkClientConf1.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, null, null) {
bkClientConf1.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc) {
});
mapping1.setConf(bkClientConf1);
List<String> racks1 = mapping1.resolve(Lists.newArrayList(BOOKIE1, BOOKIE2, BOOKIE3));
Expand All @@ -104,7 +104,7 @@ public void testBasic() throws Exception {
public void testNoBookieInfo() throws Exception {
ZkBookieRackAffinityMapping mapping = new ZkBookieRackAffinityMapping();
ClientConfiguration bkClientConf = new ClientConfiguration();
bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, null, null) {
bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc) {
});
mapping.setConf(bkClientConf);
List<String> racks = mapping.resolve(Lists.newArrayList(BOOKIE1, BOOKIE2, BOOKIE3));
Expand Down Expand Up @@ -158,7 +158,7 @@ public void testBookieInfoChange() throws Exception {

ZkBookieRackAffinityMapping mapping = new ZkBookieRackAffinityMapping();
ClientConfiguration bkClientConf = new ClientConfiguration();
bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, null, null) {
bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc) {
});
mapping.setConf(bkClientConf);
List<String> racks = mapping.resolve(Lists.newArrayList(BOOKIE1, BOOKIE2, BOOKIE3));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void testBasic() throws Exception {

ZkIsolatedBookieEnsemblePlacementPolicy isolationPolicy = new ZkIsolatedBookieEnsemblePlacementPolicy();
ClientConfiguration bkClientConf = new ClientConfiguration();
bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, null, null) {
bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc) {
});
bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, isolationGroups);
isolationPolicy.initialize(bkClientConf);
Expand Down Expand Up @@ -176,7 +176,7 @@ public void testBasic() throws Exception {
public void testNoBookieInfo() throws Exception {
ZkIsolatedBookieEnsemblePlacementPolicy isolationPolicy = new ZkIsolatedBookieEnsemblePlacementPolicy();
ClientConfiguration bkClientConf = new ClientConfiguration();
bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, null, null) {
bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc) {
});
bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, isolationGroups);
isolationPolicy.initialize(bkClientConf);
Expand Down Expand Up @@ -296,7 +296,7 @@ public void testNoIsolationGroup() throws Exception {

ZkIsolatedBookieEnsemblePlacementPolicy isolationPolicy = new ZkIsolatedBookieEnsemblePlacementPolicy();
ClientConfiguration bkClientConf = new ClientConfiguration();
bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, null, null) {
bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc) {
});
isolationPolicy.initialize(bkClientConf);
isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies);
Expand Down
Loading