Skip to content

Commit

Permalink
[fix][fn] fix function failed to start if no typeClassName provided…
Browse files Browse the repository at this point in the history
… in `FunctionDetails` (#18111)

(cherry picked from commit 8ad7157)
  • Loading branch information
freeznet authored and codelipenghui committed Nov 11, 2022
1 parent 702bbe9 commit cf95d12
Showing 1 changed file with 83 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.pulsar.functions.runtime;

import static org.apache.pulsar.functions.utils.FunctionCommon.getSinkType;
import static org.apache.pulsar.functions.utils.FunctionCommon.getSourceType;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.converters.StringConverter;
Expand All @@ -33,6 +35,7 @@
import io.prometheus.client.exporter.HTTPServer;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.functions.WindowConfig;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceCache;
Expand All @@ -45,13 +48,13 @@
import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
import org.apache.pulsar.common.util.Reflections;

import java.lang.reflect.Type;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.functions.utils.FunctionCommon;


@Slf4j
Expand Down Expand Up @@ -165,6 +168,7 @@ public void start(String[] args, ClassLoader functionInstanceClassLoader, ClassL
functionDetailsJsonString = functionDetailsJsonString.substring(0, functionDetailsJsonString.length() - 1);
}
JsonFormat.parser().merge(functionDetailsJsonString, functionDetailsBuilder);
inferringMissingTypeClassName(functionDetailsBuilder, functionInstanceClassLoader);
Function.FunctionDetails functionDetails = functionDetailsBuilder.build();
instanceConfig.setFunctionDetails(functionDetails);
instanceConfig.setPort(port);
Expand Down Expand Up @@ -287,6 +291,84 @@ public void close() {
}
}

private void inferringMissingTypeClassName(Function.FunctionDetails.Builder functionDetailsBuilder,
ClassLoader classLoader) throws ClassNotFoundException {
switch (functionDetailsBuilder.getComponentType()) {
case FUNCTION:
if ((functionDetailsBuilder.hasSource()
&& functionDetailsBuilder.getSource().getTypeClassName().isEmpty())
|| (functionDetailsBuilder.hasSink()
&& functionDetailsBuilder.getSink().getTypeClassName().isEmpty())) {
Map<String, Object> userConfigs = new Gson().fromJson(functionDetailsBuilder.getUserConfig(),
new TypeToken<Map<String, Object>>() {
}.getType());
boolean isWindowConfigPresent = userConfigs.containsKey(WindowConfig.WINDOW_CONFIG_KEY);
String className = functionDetailsBuilder.getClassName();
if (isWindowConfigPresent) {
WindowConfig windowConfig = new Gson().fromJson(
(new Gson().toJson(userConfigs.get(WindowConfig.WINDOW_CONFIG_KEY))),
WindowConfig.class);
className = windowConfig.getActualWindowFunctionClassName();
}

Class<?>[] typeArgs = FunctionCommon.getFunctionTypes(classLoader.loadClass(className),
isWindowConfigPresent);
if (functionDetailsBuilder.hasSource()
&& functionDetailsBuilder.getSource().getTypeClassName().isEmpty()
&& typeArgs[0] != null) {
Function.SourceSpec.Builder sourceBuilder = functionDetailsBuilder.getSource().toBuilder();
sourceBuilder.setTypeClassName(typeArgs[0].getName());
functionDetailsBuilder.setSource(sourceBuilder.build());
}

if (functionDetailsBuilder.hasSink()
&& functionDetailsBuilder.getSink().getTypeClassName().isEmpty()
&& typeArgs[1] != null) {
Function.SinkSpec.Builder sinkBuilder = functionDetailsBuilder.getSink().toBuilder();
sinkBuilder.setTypeClassName(typeArgs[1].getName());
functionDetailsBuilder.setSink(sinkBuilder.build());
}
}
break;
case SINK:
if ((functionDetailsBuilder.hasSink()
&& functionDetailsBuilder.getSink().getTypeClassName().isEmpty())) {
String typeArg = getSinkType(functionDetailsBuilder.getClassName(), classLoader).getName();

Function.SinkSpec.Builder sinkBuilder =
Function.SinkSpec.newBuilder(functionDetailsBuilder.getSink());
sinkBuilder.setTypeClassName(typeArg);
functionDetailsBuilder.setSink(sinkBuilder);

Function.SourceSpec sourceSpec = functionDetailsBuilder.getSource();
if (null == sourceSpec || StringUtils.isEmpty(sourceSpec.getTypeClassName())) {
Function.SourceSpec.Builder sourceBuilder = Function.SourceSpec.newBuilder(sourceSpec);
sourceBuilder.setTypeClassName(typeArg);
functionDetailsBuilder.setSource(sourceBuilder);
}
}
break;
case SOURCE:
if ((functionDetailsBuilder.hasSource()
&& functionDetailsBuilder.getSource().getTypeClassName().isEmpty())) {
String typeArg = getSourceType(functionDetailsBuilder.getClassName(), classLoader).getName();

Function.SourceSpec.Builder sourceBuilder =
Function.SourceSpec.newBuilder(functionDetailsBuilder.getSource());
sourceBuilder.setTypeClassName(typeArg);
functionDetailsBuilder.setSource(sourceBuilder);

Function.SinkSpec sinkSpec = functionDetailsBuilder.getSink();
if (null == sinkSpec || StringUtils.isEmpty(sinkSpec.getTypeClassName())) {
Function.SinkSpec.Builder sinkBuilder = Function.SinkSpec.newBuilder(sinkSpec);
sinkBuilder.setTypeClassName(typeArg);
functionDetailsBuilder.setSink(sinkBuilder);
}
}
break;
}
}


class InstanceControlImpl extends InstanceControlGrpc.InstanceControlImplBase {
private RuntimeSpawner runtimeSpawner;
Expand Down

0 comments on commit cf95d12

Please sign in to comment.