Skip to content

Commit 263ab46

Browse files
mukesh-ctdssrinath-ctds
authored andcommitted
Improved API for function worker liveliness probe
1 parent c9334f7 commit 263ab46

File tree

6 files changed

+20
-11
lines changed

6 files changed

+20
-11
lines changed

pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ public class FunctionMetaDataManager implements AutoCloseable {
8181
@Getter
8282
private CompletableFuture<Void> isInitialized = new CompletableFuture<>();
8383

84+
private boolean isFunctionWorkerAlive = true;
85+
8486
public FunctionMetaDataManager(WorkerConfig workerConfig,
8587
SchedulerManager schedulerManager,
8688
PulsarClient pulsarClient,
@@ -243,6 +245,10 @@ public synchronized void updateFunctionOnLeader(FunctionMetaData functionMetaDat
243245
needsScheduling = processUpdate(functionMetaData);
244246
}
245247
} catch (Exception e) {
248+
if (e.getCause() instanceof PulsarClientException.ProducerFencedException) {
249+
log.error("Function worker status has been set to false due to ProducerFencedException.");
250+
this.isFunctionWorkerAlive = false;
251+
}
246252
log.error("Could not write into Function Metadata topic", e);
247253
throw new IllegalStateException("Internal Error updating function at the leader", e);
248254
}
@@ -500,4 +506,8 @@ private void initializeTailer() throws PulsarClientException {
500506
this.functionMetaDataTopicTailer.start();
501507
log.info("MetaData Manager Tailer started");
502508
}
509+
510+
public boolean checkLiveliness() {
511+
return this.isFunctionWorkerAlive;
512+
}
503513
}

pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1885,4 +1885,10 @@ protected ValidatableFunctionPackage getBuiltinFunctionPackage(String archive) {
18851885
}
18861886
return null;
18871887
}
1888+
1889+
@Override
1890+
public boolean checkLiveliness() {
1891+
FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
1892+
return functionMetaDataManager.checkLiveliness();
1893+
}
18881894
}

pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,6 @@
6969
@Slf4j
7070
public class FunctionsImpl extends ComponentImpl implements Functions<PulsarWorkerService> {
7171

72-
private boolean isFunctionWorkerAlive = true;
73-
7472
public FunctionsImpl(Supplier<PulsarWorkerService> workerServiceSupplier) {
7573
super(workerServiceSupplier, Function.FunctionDetails.ComponentType.FUNCTION);
7674
}
@@ -695,8 +693,6 @@ public void updateFunctionOnWorkerLeader(final String tenant,
695693
try {
696694
functionMetaDataManager.updateFunctionOnLeader(functionMetaData, delete);
697695
} catch (IllegalStateException e) {
698-
log.error("Function worker status has been set to false due to ProducerFencedException.");
699-
this.isFunctionWorkerAlive = false;
700696
throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
701697
} catch (IllegalArgumentException e) {
702698
throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
@@ -792,8 +788,4 @@ private Function.FunctionDetails validateUpdateRequestParams(final String tenant
792788
}
793789
}
794790
}
795-
796-
public boolean checkLiveliness() {
797-
return this.isFunctionWorkerAlive;
798-
}
799791
}

pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -445,7 +445,8 @@ public Response healthCheck() {
445445
.entity("There is IllegalStateException, Service is not running. Need to restart.")
446446
.build();
447447
} else {
448-
return Response.ok("There is no IllegalStateException, Service is running.")
448+
return Response.status(Response.Status.OK)
449+
.entity("There is no IllegalStateException, Service is running.")
449450
.build();
450451
}
451452
}

pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Component.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,4 +90,6 @@ StreamingOutput downloadFunction(String tenant, String namespace, String compone
9090
List<ConnectorDefinition> getListOfConnectors();
9191

9292
void reloadConnectors(AuthenticationParameters authParams);
93+
94+
boolean checkLiveliness();
9395
}

pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Functions.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,4 @@ FunctionInstanceStatusData getFunctionInstanceStatus(String tenant,
102102
void reloadBuiltinFunctions(AuthenticationParameters authParams) throws IOException;
103103

104104
List<FunctionDefinition> getBuiltinFunctions(AuthenticationParameters authParams);
105-
106-
boolean checkLiveliness();
107105
}

0 commit comments

Comments
 (0)