Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
import org.apache.kafka.connect.runtime.isolation.PluginType;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo;
Expand Down Expand Up @@ -842,34 +841,26 @@ public List<ConfigKeyInfo> connectorPluginConfig(String pluginName) {

try (LoaderSwap loaderSwap = p.withClassLoader(pluginClass.getClassLoader())) {
Object plugin = p.newPlugin(pluginName);
PluginType pluginType = PluginType.from(plugin.getClass());
// Contains definitions coming from Connect framework
ConfigDef baseConfigDefs = null;
// Contains definitions specifically declared on the plugin
ConfigDef pluginConfigDefs;
switch (pluginType) {
case SINK:
baseConfigDefs = SinkConnectorConfig.configDef();
pluginConfigDefs = ((SinkConnector) plugin).config();
break;
case SOURCE:
baseConfigDefs = SourceConnectorConfig.configDef();
pluginConfigDefs = ((SourceConnector) plugin).config();
break;
case CONVERTER:
pluginConfigDefs = ((Converter) plugin).config();
break;
case HEADER_CONVERTER:
pluginConfigDefs = ((HeaderConverter) plugin).config();
break;
case TRANSFORMATION:
pluginConfigDefs = ((Transformation<?>) plugin).config();
break;
case PREDICATE:
pluginConfigDefs = ((Predicate<?>) plugin).config();
break;
default:
throw new BadRequestException("Invalid plugin type " + pluginType + ". Valid types are sink, source, converter, header_converter, transformation, predicate.");
if (plugin instanceof SinkConnector) {
baseConfigDefs = SinkConnectorConfig.configDef();
pluginConfigDefs = ((SinkConnector) plugin).config();
} else if (plugin instanceof SourceConnector) {
baseConfigDefs = SourceConnectorConfig.configDef();
pluginConfigDefs = ((SourceConnector) plugin).config();
} else if (plugin instanceof Converter) {
pluginConfigDefs = ((Converter) plugin).config();
} else if (plugin instanceof HeaderConverter) {
pluginConfigDefs = ((HeaderConverter) plugin).config();
} else if (plugin instanceof Transformation) {
pluginConfigDefs = ((Transformation<?>) plugin).config();
} else if (plugin instanceof Predicate) {
pluginConfigDefs = ((Predicate<?>) plugin).config();
} else {
throw new BadRequestException("Invalid plugin class " + pluginName + ". Valid types are sink, source, converter, header_converter, transformation, predicate.");
}

// Track config properties by name and, if the same property is defined in multiple places,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,7 @@ public class PluginClassLoader extends URLClassLoader {
*/
public PluginClassLoader(URL pluginLocation, URL[] urls, ClassLoader parent) {
super(urls, parent);
this.pluginLocation = pluginLocation;
}

/**
* Constructor that defines the system classloader as parent of this plugin classloader.
*
* @param pluginLocation the top-level location of the plugin to be loaded in isolation by this
* classloader.
* @param urls the list of urls from which to load classes and resources for this plugin.
*/
public PluginClassLoader(URL pluginLocation, URL[] urls) {
super(urls);
this.pluginLocation = pluginLocation;
this.pluginLocation = Objects.requireNonNull(pluginLocation, "Plugin location must be non-null");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import java.util.Objects;

public class PluginDesc<T> implements Comparable<PluginDesc<T>> {
public class PluginDesc<T> implements Comparable<PluginDesc<?>> {
public static final String UNDEFINED_VERSION = "undefined";
private final Class<? extends T> klass;
private final String name;
Expand All @@ -32,15 +32,16 @@ public class PluginDesc<T> implements Comparable<PluginDesc<T>> {
private final String location;
private final ClassLoader loader;

public PluginDesc(Class<? extends T> klass, String version, ClassLoader loader) {
this.klass = klass;
this.name = klass.getName();
public PluginDesc(Class<? extends T> klass, String version, PluginType type, ClassLoader loader) {
this.klass = Objects.requireNonNull(klass, "Plugin class must be non-null");
this.name = this.klass.getName();
this.version = version != null ? version : "null";
this.encodedVersion = new DefaultArtifactVersion(this.version);
this.type = PluginType.from(klass);
this.typeName = type.toString();
this.type = Objects.requireNonNull(type, "Plugin type must be non-null");
this.typeName = this.type.toString();
Objects.requireNonNull(loader, "Plugin classloader must be non-null");
this.location = loader instanceof PluginClassLoader
? ((PluginClassLoader) loader).location()
? Objects.requireNonNull(((PluginClassLoader) loader).location(), "Plugin location must be non-null")
: "classpath";
this.loader = loader;
}
Expand Down Expand Up @@ -110,11 +111,18 @@ public int hashCode() {
}

@Override
public int compareTo(PluginDesc<T> other) {
public int compareTo(PluginDesc<?> other) {
int nameComp = name.compareTo(other.name);
int versionComp = encodedVersion.compareTo(other.encodedVersion);
// isolated plugins appear after classpath plugins when they have identical versions.
int isolatedComp = Boolean.compare(other.loader instanceof PluginClassLoader, loader instanceof PluginClassLoader);
return nameComp != 0 ? nameComp : (versionComp != 0 ? versionComp : isolatedComp);
// choose an arbitrary order between different locations and types
int loaderComp = location.compareTo(other.location);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is causing an NPE during unit tests (see CI results):

Suggested change
int loaderComp = location.compareTo(other.location);
int loaderComp = Objects.compare(location, other.location, String::compareTo);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably make all of the comparisons here null-safe, or enforce that the relevant fields are non-null in the constructor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The one line change was appealing, but as all of these shouldn't be null in production, i'll force the tests to also provide non-null arguments and locations.

int typeComp = type.compareTo(other.type);
return nameComp != 0 ? nameComp :
versionComp != 0 ? versionComp :
isolatedComp != 0 ? isolatedComp :
loaderComp != 0 ? loaderComp :
typeComp;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public PluginScanResult(List<PluginScanResult> results) {
);
}

private static <R extends Comparable<R>> SortedSet<R> merge(List<PluginScanResult> results, Function<PluginScanResult, SortedSet<R>> accessor) {
private static <R extends Comparable<?>> SortedSet<R> merge(List<PluginScanResult> results, Function<PluginScanResult, SortedSet<R>> accessor) {
SortedSet<R> merged = new TreeSet<>();
for (PluginScanResult element : results) {
merged.addAll(accessor.apply(element));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,32 +120,32 @@ private void loadJdbcDrivers(final ClassLoader loader) {
}

@SuppressWarnings({"rawtypes", "unchecked"})
protected <T> PluginDesc<T> pluginDesc(Class<? extends T> plugin, String version, PluginSource source) {
return new PluginDesc(plugin, version, source.loader());
protected <T> PluginDesc<T> pluginDesc(Class<? extends T> plugin, String version, PluginType type, PluginSource source) {
return new PluginDesc(plugin, version, type, source.loader());
}

@SuppressWarnings("unchecked")
protected <T> SortedSet<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> klass, PluginSource source) {
protected <T> SortedSet<PluginDesc<T>> getServiceLoaderPluginDesc(PluginType type, PluginSource source) {
SortedSet<PluginDesc<T>> result = new TreeSet<>();
ServiceLoader<T> serviceLoader = ServiceLoader.load(klass, source.loader());
ServiceLoader<T> serviceLoader = ServiceLoader.load((Class<T>) type.superClass(), source.loader());
Iterator<T> iterator = serviceLoader.iterator();
while (handleLinkageError(klass, source, iterator::hasNext)) {
while (handleLinkageError(type, source, iterator::hasNext)) {
try (LoaderSwap loaderSwap = withClassLoader(source.loader())) {
T pluginImpl;
try {
pluginImpl = handleLinkageError(klass, source, iterator::next);
pluginImpl = handleLinkageError(type, source, iterator::next);
} catch (ServiceConfigurationError t) {
log.error("Failed to discover {} in {}{}",
klass.getSimpleName(), source.location(), reflectiveErrorDescription(t.getCause()), t);
type.simpleName(), source.location(), reflectiveErrorDescription(t.getCause()), t);
continue;
}
Class<? extends T> pluginKlass = (Class<? extends T>) pluginImpl.getClass();
if (pluginKlass.getClassLoader() != source.loader()) {
log.debug("{} from other classloader {} is visible from {}, excluding to prevent isolated loading",
pluginKlass.getSimpleName(), pluginKlass.getClassLoader(), source.location());
type.simpleName(), pluginKlass.getClassLoader(), source.location());
continue;
}
result.add(pluginDesc(pluginKlass, versionFor(pluginImpl), source));
result.add(pluginDesc(pluginKlass, versionFor(pluginImpl), type, source));
}
}
return result;
Expand All @@ -154,14 +154,13 @@ protected <T> SortedSet<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> klass
/**
* Helper to evaluate a {@link ServiceLoader} operation while handling {@link LinkageError}s.
*
* @param klass The plugin superclass which is being loaded
* @param type The plugin type which is being loaded
* @param function A function on a {@link ServiceLoader}'s {@link Iterator} which may throw {@link LinkageError}
* @return the return value of function
* @throws Error errors thrown by the passed-in function
* @param <T> Type being iterated over by the ServiceLoader
* @param <U> Return value of the passed-in function
*/
private <T, U> U handleLinkageError(Class<T> klass, PluginSource source, Supplier<U> function) {
private <U> U handleLinkageError(PluginType type, PluginSource source, Supplier<U> function) {
// It's difficult to know for sure if the iterator was able to advance past the first broken
// plugin class, or if it will continue to fail on that broken class for any subsequent calls
// to Iterator::hasNext or Iterator::next
Expand All @@ -182,7 +181,7 @@ private <T, U> U handleLinkageError(Class<T> klass, PluginSource source, Supplie
|| !Objects.equals(lastError.getClass(), t.getClass())
|| !Objects.equals(lastError.getMessage(), t.getMessage())) {
log.error("Failed to discover {} in {}{}",
klass.getSimpleName(), source.location(), reflectiveErrorDescription(t.getCause()), t);
type.simpleName(), source.location(), reflectiveErrorDescription(t.getCause()), t);
}
lastError = t;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,28 +37,22 @@ public enum PluginType {
PREDICATE(Predicate.class),
CONFIGPROVIDER(ConfigProvider.class),
REST_EXTENSION(ConnectRestExtension.class),
CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY(ConnectorClientConfigOverridePolicy.class),
UNKNOWN(Object.class);
CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY(ConnectorClientConfigOverridePolicy.class);

private final Class<?> klass;

PluginType(Class<?> klass) {
this.klass = klass;
}

public static PluginType from(Class<?> klass) {
for (PluginType type : PluginType.values()) {
if (type.klass.isAssignableFrom(klass)) {
return type;
}
}
return UNKNOWN;
}

public String simpleName() {
return klass.getSimpleName();
}

public Class<?> superClass() {
return klass;
}

@Override
public String toString() {
return super.toString().toLowerCase(Locale.ROOT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
Expand Down Expand Up @@ -196,12 +197,12 @@ public static boolean isClassFile(Path path) {
return path.toString().toLowerCase(Locale.ROOT).endsWith(".class");
}

public static List<Path> pluginLocations(String pluginPath) {
public static Set<Path> pluginLocations(String pluginPath) {
if (pluginPath == null) {
return Collections.emptyList();
return Collections.emptySet();
}
String[] pluginPathElements = COMMA_WITH_WHITESPACE.split(pluginPath.trim(), -1);
List<Path> pluginLocations = new ArrayList<>();
Set<Path> pluginLocations = new LinkedHashSet<>();
for (String path : pluginPathElements) {
try {
Path pluginPathElement = Paths.get(path).toAbsolutePath();
Expand Down Expand Up @@ -328,8 +329,8 @@ public static List<Path> pluginUrls(Path topPath) throws IOException {
return Arrays.asList(archives.toArray(new Path[0]));
}

public static Set<PluginSource> pluginSources(List<Path> pluginLocations, ClassLoader classLoader, PluginClassLoaderFactory factory) {
Set<PluginSource> pluginSources = new HashSet<>();
public static Set<PluginSource> pluginSources(Set<Path> pluginLocations, ClassLoader classLoader, PluginClassLoaderFactory factory) {
Set<PluginSource> pluginSources = new LinkedHashSet<>();
for (Path pluginLocation : pluginLocations) {

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public Plugins(Map<String, String> props) {
// VisibleForTesting
Plugins(Map<String, String> props, ClassLoader parent, ClassLoaderFactory factory) {
String pluginPath = WorkerConfig.pluginPath(props);
List<Path> pluginLocations = PluginUtils.pluginLocations(pluginPath);
Set<Path> pluginLocations = PluginUtils.pluginLocations(pluginPath);
delegatingLoader = factory.newDelegatingClassLoader(parent);
Set<PluginSource> pluginSources = PluginUtils.pluginSources(pluginLocations, delegatingLoader, factory);
scanResult = initLoaders(pluginSources);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public class ReflectionScanner extends PluginScanner {

private static final Logger log = LoggerFactory.getLogger(ReflectionScanner.class);

public static <T> String versionFor(Class<? extends T> pluginKlass) throws ReflectiveOperationException {
private static <T> String versionFor(Class<? extends T> pluginKlass) throws ReflectiveOperationException {
T pluginImpl = pluginKlass.getDeclaredConstructor().newInstance();
return versionFor(pluginImpl);
}
Expand All @@ -84,39 +84,40 @@ protected PluginScanResult scanPlugins(PluginSource source) {
Reflections reflections = new Reflections(builder);

return new PluginScanResult(
getPluginDesc(reflections, SinkConnector.class, source),
getPluginDesc(reflections, SourceConnector.class, source),
getPluginDesc(reflections, Converter.class, source),
getPluginDesc(reflections, HeaderConverter.class, source),
getPluginDesc(reflections, PluginType.SINK, source),
getPluginDesc(reflections, PluginType.SOURCE, source),
getPluginDesc(reflections, PluginType.CONVERTER, source),
getPluginDesc(reflections, PluginType.HEADER_CONVERTER, source),
getTransformationPluginDesc(source, reflections),
getPredicatePluginDesc(source, reflections),
getServiceLoaderPluginDesc(ConfigProvider.class, source),
getServiceLoaderPluginDesc(ConnectRestExtension.class, source),
getServiceLoaderPluginDesc(ConnectorClientConfigOverridePolicy.class, source)
getServiceLoaderPluginDesc(PluginType.CONFIGPROVIDER, source),
getServiceLoaderPluginDesc(PluginType.REST_EXTENSION, source),
getServiceLoaderPluginDesc(PluginType.CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY, source)
);
}

@SuppressWarnings({"unchecked"})
private SortedSet<PluginDesc<Predicate<?>>> getPredicatePluginDesc(PluginSource source, Reflections reflections) {
return (SortedSet<PluginDesc<Predicate<?>>>) (SortedSet<?>) getPluginDesc(reflections, Predicate.class, source);
return (SortedSet<PluginDesc<Predicate<?>>>) (SortedSet<?>) getPluginDesc(reflections, PluginType.PREDICATE, source);
}

@SuppressWarnings({"unchecked"})
private SortedSet<PluginDesc<Transformation<?>>> getTransformationPluginDesc(PluginSource source, Reflections reflections) {
return (SortedSet<PluginDesc<Transformation<?>>>) (SortedSet<?>) getPluginDesc(reflections, Transformation.class, source);
return (SortedSet<PluginDesc<Transformation<?>>>) (SortedSet<?>) getPluginDesc(reflections, PluginType.TRANSFORMATION, source);
}

@SuppressWarnings({"unchecked"})
private <T> SortedSet<PluginDesc<T>> getPluginDesc(
Reflections reflections,
Class<T> klass,
PluginType type,
PluginSource source
) {
Set<Class<? extends T>> plugins;
try {
plugins = reflections.getSubTypesOf(klass);
plugins = reflections.getSubTypesOf((Class<T>) type.superClass());
} catch (ReflectionsException e) {
log.debug("Reflections scanner could not find any {} in {} for URLs: {}",
klass, source.location(), source.urls(), e);
type, source.location(), source.urls(), e);
return Collections.emptySortedSet();
}

Expand All @@ -128,14 +129,14 @@ private <T> SortedSet<PluginDesc<T>> getPluginDesc(
}
if (pluginKlass.getClassLoader() != source.loader()) {
log.debug("{} from other classloader {} is visible from {}, excluding to prevent isolated loading",
pluginKlass.getSimpleName(), pluginKlass.getClassLoader(), source.location());
pluginKlass, pluginKlass.getClassLoader(), source.location());
continue;
}
try (LoaderSwap loaderSwap = withClassLoader(source.loader())) {
result.add(pluginDesc(pluginKlass, versionFor(pluginKlass), source));
result.add(pluginDesc(pluginKlass, versionFor(pluginKlass), type, source));
} catch (ReflectiveOperationException | LinkageError e) {
log.error("Failed to discover {} in {}: Unable to instantiate {}{}",
klass.getSimpleName(), source.location(), pluginKlass.getSimpleName(),
type.simpleName(), source.location(), pluginKlass.getSimpleName(),
reflectiveErrorDescription(e), e);
}
}
Expand Down
Loading