Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][fn] fix function failed to start if no typeClassName provided in FunctionDetails #18111

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pulsar.functions.runtime;

import static org.apache.pulsar.functions.utils.FunctionCommon.getSinkType;
import static org.apache.pulsar.functions.utils.FunctionCommon.getSourceType;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.converters.StringConverter;
Expand All @@ -37,6 +39,7 @@
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.functions.WindowConfig;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
Expand All @@ -49,6 +52,7 @@
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
import org.apache.pulsar.functions.utils.FunctionCommon;


@Slf4j
Expand Down Expand Up @@ -181,6 +185,7 @@ public void start(String[] args, ClassLoader functionInstanceClassLoader, ClassL
functionDetailsJsonString = functionDetailsJsonString.substring(0, functionDetailsJsonString.length() - 1);
}
JsonFormat.parser().merge(functionDetailsJsonString, functionDetailsBuilder);
inferringMissingTypeClassName(functionDetailsBuilder, functionInstanceClassLoader);
Function.FunctionDetails functionDetails = functionDetailsBuilder.build();
instanceConfig.setFunctionDetails(functionDetails);
instanceConfig.setPort(port);
Expand Down Expand Up @@ -259,16 +264,16 @@ public void start(String[] args, ClassLoader functionInstanceClassLoader, ClassL
if (expectedHealthCheckInterval > 0) {
healthCheckTimer =
InstanceCache.getInstanceCache().getScheduledExecutorService().scheduleAtFixedRate(() -> {
try {
if (System.currentTimeMillis() - lastHealthCheckTs
> 3 * expectedHealthCheckInterval * 1000) {
log.info("Haven't received health check from spawner in a while. Stopping instance...");
close();
}
} catch (Exception e) {
log.error("Error occurred when checking for latest health check", e);
}
}, expectedHealthCheckInterval * 1000, expectedHealthCheckInterval * 1000, TimeUnit.MILLISECONDS);
try {
if (System.currentTimeMillis() - lastHealthCheckTs
> 3 * expectedHealthCheckInterval * 1000) {
log.info("Haven't received health check from spawner in a while. Stopping instance...");
close();
}
} catch (Exception e) {
log.error("Error occurred when checking for latest health check", e);
}
}, expectedHealthCheckInterval * 1000, expectedHealthCheckInterval * 1000, TimeUnit.MILLISECONDS);
}

runtimeSpawner.join();
Expand Down Expand Up @@ -306,6 +311,84 @@ public void close() {
}
}

private void inferringMissingTypeClassName(Function.FunctionDetails.Builder functionDetailsBuilder,
ClassLoader classLoader) throws ClassNotFoundException {
switch (functionDetailsBuilder.getComponentType()) {
case FUNCTION:
if ((functionDetailsBuilder.hasSource()
&& functionDetailsBuilder.getSource().getTypeClassName().isEmpty())
|| (functionDetailsBuilder.hasSink()
&& functionDetailsBuilder.getSink().getTypeClassName().isEmpty())) {
Map<String, Object> userConfigs = new Gson().fromJson(functionDetailsBuilder.getUserConfig(),
new TypeToken<Map<String, Object>>() {
}.getType());
boolean isWindowConfigPresent = userConfigs.containsKey(WindowConfig.WINDOW_CONFIG_KEY);
String className = functionDetailsBuilder.getClassName();
if (isWindowConfigPresent) {
WindowConfig windowConfig = new Gson().fromJson(
(new Gson().toJson(userConfigs.get(WindowConfig.WINDOW_CONFIG_KEY))),
WindowConfig.class);
className = windowConfig.getActualWindowFunctionClassName();
}

Class<?>[] typeArgs = FunctionCommon.getFunctionTypes(classLoader.loadClass(className),
isWindowConfigPresent);
if (functionDetailsBuilder.hasSource()
&& functionDetailsBuilder.getSource().getTypeClassName().isEmpty()
&& typeArgs[0] != null) {
Function.SourceSpec.Builder sourceBuilder = functionDetailsBuilder.getSource().toBuilder();
sourceBuilder.setTypeClassName(typeArgs[0].getName());
functionDetailsBuilder.setSource(sourceBuilder.build());
}

if (functionDetailsBuilder.hasSink()
&& functionDetailsBuilder.getSink().getTypeClassName().isEmpty()
&& typeArgs[1] != null) {
Function.SinkSpec.Builder sinkBuilder = functionDetailsBuilder.getSink().toBuilder();
sinkBuilder.setTypeClassName(typeArgs[1].getName());
functionDetailsBuilder.setSink(sinkBuilder.build());
}
}
break;
case SINK:
if ((functionDetailsBuilder.hasSink()
&& functionDetailsBuilder.getSink().getTypeClassName().isEmpty())) {
String typeArg = getSinkType(functionDetailsBuilder.getClassName(), classLoader).getName();

Function.SinkSpec.Builder sinkBuilder =
Function.SinkSpec.newBuilder(functionDetailsBuilder.getSink());
sinkBuilder.setTypeClassName(typeArg);
functionDetailsBuilder.setSink(sinkBuilder);

Function.SourceSpec sourceSpec = functionDetailsBuilder.getSource();
if (null == sourceSpec || StringUtils.isEmpty(sourceSpec.getTypeClassName())) {
Function.SourceSpec.Builder sourceBuilder = Function.SourceSpec.newBuilder(sourceSpec);
sourceBuilder.setTypeClassName(typeArg);
functionDetailsBuilder.setSource(sourceBuilder);
}
}
break;
case SOURCE:
if ((functionDetailsBuilder.hasSource()
&& functionDetailsBuilder.getSource().getTypeClassName().isEmpty())) {
String typeArg = getSourceType(functionDetailsBuilder.getClassName(), classLoader).getName();

Function.SourceSpec.Builder sourceBuilder =
Function.SourceSpec.newBuilder(functionDetailsBuilder.getSource());
sourceBuilder.setTypeClassName(typeArg);
functionDetailsBuilder.setSource(sourceBuilder);

Function.SinkSpec sinkSpec = functionDetailsBuilder.getSink();
if (null == sinkSpec || StringUtils.isEmpty(sinkSpec.getTypeClassName())) {
Function.SinkSpec.Builder sinkBuilder = Function.SinkSpec.newBuilder(sinkSpec);
sinkBuilder.setTypeClassName(typeArg);
functionDetailsBuilder.setSink(sinkBuilder);
}
}
break;
}
}


class InstanceControlImpl extends InstanceControlGrpc.InstanceControlImplBase {
private RuntimeSpawner runtimeSpawner;
Expand All @@ -331,8 +414,8 @@ public void getFunctionStatus(Empty request,

@Override
public void getAndResetMetrics(com.google.protobuf.Empty request,
io.grpc.stub.StreamObserver<org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData>
responseObserver) {
io.grpc.stub.StreamObserver<org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData>
responseObserver) {
Runtime runtime = runtimeSpawner.getRuntime();
if (runtime != null) {
try {
Expand All @@ -348,8 +431,8 @@ public void getAndResetMetrics(com.google.protobuf.Empty request,

@Override
public void getMetrics(com.google.protobuf.Empty request,
io.grpc.stub.StreamObserver<org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData>
responseObserver) {
io.grpc.stub.StreamObserver<org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData>
responseObserver) {
Runtime runtime = runtimeSpawner.getRuntime();
if (runtime != null) {
try {
Expand Down Expand Up @@ -380,8 +463,8 @@ public void resetMetrics(com.google.protobuf.Empty request,

@Override
public void healthCheck(com.google.protobuf.Empty request,
io.grpc.stub.StreamObserver<org.apache.pulsar.functions.proto.InstanceCommunication.HealthCheckResult>
responseObserver) {
io.grpc.stub.StreamObserver<org.apache.pulsar.functions.proto.InstanceCommunication.HealthCheckResult>
responseObserver) {
log.debug("Received health check request...");
InstanceCommunication.HealthCheckResult healthCheckResult =
InstanceCommunication.HealthCheckResult.newBuilder().setSuccess(true).build();
Expand Down