Skip to content

Commit

Permalink
[fix][ci] Fix PulsarFunctionLocalRunTest that broke after ClusterData…
Browse files Browse the repository at this point in the history
… validation changes (#19212)
  • Loading branch information
lhotari authored Jan 12, 2023
1 parent 3c22f1b commit 7f75993
Showing 1 changed file with 23 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ void setup(Method method) throws Exception {
primaryHost = pulsar.getWebServiceAddress();

// create cluster metadata
ClusterData clusterData = ClusterData.builder().serviceUrl(urlTls.toString()).build();
ClusterData clusterData = ClusterData.builder().serviceUrlTls(urlTls.toString()).build();
admin.clusters().createCluster(config.getClusterName(), clusterData);

ClientBuilder clientBuilder = PulsarClient.builder()
Expand Down Expand Up @@ -308,14 +308,30 @@ && isNotBlank(workerConfig.getBrokerClientAuthenticationParameters())) {
void shutdown() throws Exception {
try {
log.info("--- Shutting down ---");
fileServer.stop();
pulsarClient.close();
admin.close();
pulsar.close();
bkEnsemble.stop();
if (fileServer != null) {
fileServer.stop();
fileServer = null;
}
if (pulsarClient != null) {
pulsarClient.close();
pulsarClient = null;
}
if (admin != null) {
admin.close();
admin = null;
}
if (pulsar != null) {
pulsar.close();
pulsar = null;
}
if (bkEnsemble != null) {
bkEnsemble.stop();
bkEnsemble = null;
}
} finally {
if (tempDirectory != null) {
tempDirectory.delete();
tempDirectory = null;
}
}
}
Expand Down Expand Up @@ -1099,7 +1115,7 @@ public void testPulsarSinkStatsByteBufferType() throws Throwable {
public void testPulsarSinkWithFunction() throws Throwable {
testPulsarSinkLocalRun(null, 1, StatsNullSink.class.getName(), "builtin://exclamation", "org.apache.pulsar.functions.api.examples.RecordFunction");
}

public static class TestErrorSink implements Sink<byte[]> {
private Map config;
@Override
Expand Down

0 comments on commit 7f75993

Please sign in to comment.