Skip to content

Commit

Permalink
[Flink-Connector]Get PulsarClient from cache should always return an …
Browse files Browse the repository at this point in the history
…open instance (#6436)
  • Loading branch information
yjshen authored Mar 1, 2020
1 parent e6a631d commit 2ed2eb8
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public class PulsarClientImpl implements PulsarClient {
private final Timer timer;
private final ExecutorProvider externalExecutorProvider;

enum State {
public enum State {
Open, Closing, Closed
}

Expand Down Expand Up @@ -167,6 +167,10 @@ public Clock getClientClock() {
return clientClock;
}

public AtomicReference<State> getState() {
return state;
}

@Override
public ProducerBuilder<byte[]> newProducer() {
return new ProducerBuilderImpl<>(this, Schema.BYTES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,13 @@ private static PulsarClientImpl createPulsarClient(
}

public static PulsarClientImpl getOrCreate(ClientConfigurationData config) throws ExecutionException {
return guavaCache.get(config);
PulsarClientImpl instance = guavaCache.get(config);
if (instance.getState().get() == PulsarClientImpl.State.Open) {
return instance;
} else {
guavaCache.invalidate(config);
return guavaCache.get(config);
}
}

private static void close(ClientConfigurationData clientConfig, PulsarClientImpl client) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,29 @@ public void testShouldCloseTheCorrectClient() throws Exception {

assertEquals(map2.values().iterator().next(), client1);
}

@Test
public void getClientFromCacheShouldAlwaysReturnAnOpenedInstance() throws Exception {
PulsarClientImpl impl1 = Mockito.mock(PulsarClientImpl.class);

ClientConfigurationData conf1 = new ClientConfigurationData();
conf1.setServiceUrl(SERVICE_URL);

PowerMockito.whenNew(PulsarClientImpl.class)
.withArguments(conf1).thenReturn(impl1);

PulsarClientImpl client1 = CachedPulsarClient.getOrCreate(conf1);

ConcurrentMap<ClientConfigurationData, PulsarClientImpl> map1 = CachedPulsarClient.getAsMap();
assertEquals(map1.size(), 1);

client1.getState().set(PulsarClientImpl.State.Closed);

PulsarClientImpl client2 = CachedPulsarClient.getOrCreate(conf1);

assertNotEquals(client1, client2);

ConcurrentMap<ClientConfigurationData, PulsarClientImpl> map2 = CachedPulsarClient.getAsMap();
assertEquals(map2.size(), 1);
}
}

0 comments on commit 2ed2eb8

Please sign in to comment.